soumilshah1995 commented on issue #8977:
URL: https://github.com/apache/hudi/issues/8977#issuecomment-1602559011

   Aditya gave me JAR file on Slack and i am trying the sample code out 
   i am still confused i was expecting throw an error when message is null 
   
   here is sample code for you to replicate 
   ```
   
   try:
       from pyspark.sql import SparkSession
       import os
       import sys
       import uuid
       from datetime import datetime
       from faker import Faker
   except Exception as e:
       print("Error: ", e)
   
   hudi_version = '0.13.1'
   jar_file = 'hudi-spark3.3-bundle_2.12-0.14.0-SNAPSHOT.jar'
   
   os.environ['PYSPARK_SUBMIT_ARGS'] = f"--jars {jar_file} pyspark-shell"
   os.environ['PYSPARK_PYTHON'] = sys.executable
   os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
   
   spark = SparkSession.builder \
       .config('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer') \
       .config('spark.jars', jar_file) \
       .config('spark.sql.hive.convertMetastoreParquet', 'false') \
       .getOrCreate()
   
   
   # Create sample data
   
   db_name = "hudidb"
   table_name = "test"
   recordkey = 'uuid'
   precombine = 'precomb'
   
   method = 'upsert'
   table_type = "COPY_ON_WRITE"
   validator_query = """SELECT COUNT(*) FROM <TABLE_NAME> WHERE message IS NOT 
NULL;"""
   path = f"file:///C:/tmp/{db_name}/{table_name}"
   
   hudi_options = {
       'hoodie.table.name': table_name,
       'hoodie.datasource.write.recordkey.field': recordkey,
       'hoodie.datasource.write.table.name': table_name,
       'hoodie.datasource.write.operation': method,
       'hoodie.datasource.write.precombine.field': precombine,
       'hoodie.upsert.shuffle.parallelism': 2,
       'hoodie.insert.shuffle.parallelism': 2,
       "hoodie.precommit.validators": 
"org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator",
       "hoodie.precommit.validators.equality.sql.queries": validator_query
   }
   
   try:
       print("Trying Append 1")
       spark_df = spark.createDataFrame(
           data=[
               (1, "This is APPEND 1", 111, "1"),
               (2, "This is APPEND 2", 222, "2"),
               (3, "This is APPEND 5", 222, "3"),
           ],
           schema=["uuid", "message", "precomb", "partition"])
       spark_df.show()
       spark_df.write.format("hudi"). \
           options(**hudi_options). \
           mode("append"). \
           save(path)
       print("Append 1 Success....")
   except Exception as e:
       print("Failed to UPSERT", e)
   
   try:
       print("Trying Append 2")
       spark_df = spark.createDataFrame(
           data=[
               (4, None, 444, None),
               (5, "This is APPEND 5", 555, "5"),
           ],
           schema=["uuid", "message", "precomb", "partition"])
       spark_df.show()
       spark_df.write.format("hudi"). \
           options(**hudi_options). \
           mode("append"). \
           save(path)
       print("Append 2 Success....")
   
   except Exception as e:
       print("Failed to UPSERT", e)
   
   ```
   # logs 
   ```
   C:\Users\s.shah\IdeaProjects\DBT\venv\Scripts\python.exe 
"C:\Users\s.shah\IdeaProjects\DBT\Hudi Ut\sam.py" 
   23/06/22 08:31:45 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
   Setting default log level to "WARN".
   To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
   Trying Append 1
   23/06/22 08:32:01 WARN ProcfsMetricsGetter: Exception when trying to compute 
pagesize, as a result reporting of ProcessTree metrics is stopped
   +----+----------------+-------+---------+
   |uuid|         message|precomb|partition|
   +----+----------------+-------+---------+
   |   1|This is APPEND 1|    111|        1|
   |   2|This is APPEND 2|    222|        2|
   |   3|This is APPEND 5|    222|        3|
   +----+----------------+-------+---------+
   
   23/06/22 08:32:13 WARN DFSPropertiesConfiguration: Cannot find 
HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
   23/06/22 08:32:13 WARN DFSPropertiesConfiguration: Properties file 
file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
   23/06/22 08:32:14 WARN HoodieBackedTableMetadata: Metadata table was not 
found at path file:/C:/tmp/hudidb/test/.hoodie/metadata
   23/06/22 08:32:19 WARN MetricsConfig: Cannot locate configuration: tried 
hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
   23/06/22 08:32:21 WARN HoodieTimelineArchiver: Error parsing instant time: 
00000000000000010
   # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add 
this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
   # WARNING: Unable to attach Serviceability Agent. Unable to attach even with 
module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: 
Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense 
failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense 
failed.]
   23/06/22 08:32:46 WARN BaseSparkCommitActionExecutor: Cannot get table 
schema from before state.
   java.lang.IllegalArgumentException: Could not find any data file written for 
commit, so could not get schema for table file:/C:/tmp/hudidb/test
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableParquetSchemaFromDataFile(TableSchemaResolver.java:275)
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromDataFile(TableSchemaResolver.java:116)
        at 
org.apache.hudi.common.table.TableSchemaResolver.lambda$getTableAvroSchemaInternal$3(TableSchemaResolver.java:206)
        at org.apache.hudi.common.util.Option.orElseGet(Option.java:149)
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaInternal(TableSchemaResolver.java:205)
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:137)
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:126)
        at 
org.apache.hudi.client.utils.SparkValidatorUtils.getRecordsFromCommittedFiles(SparkValidatorUtils.java:142)
        at 
org.apache.hudi.client.utils.SparkValidatorUtils.runValidators(SparkValidatorUtils.java:83)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.runPrecommitValidators(BaseSparkCommitActionExecutor.java:434)
        at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:180)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:279)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:184)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:86)
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:67)
        at 
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:109)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98)
        at 
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:141)
        at 
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:215)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:382)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:153)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
        at java.base/java.lang.reflect.Method.invoke(Method.java:578)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:1589)
   23/06/22 08:32:46 WARN BaseSparkCommitActionExecutor: Use the schema from 
after state (current transaction) to create the empty Spark dataframe: 
StructType(StructField(_hoodie_commit_time,StringType,true),StructField(_hoodie_commit_seqno,StringType,true),StructField(_hoodie_record_key,StringType,true),StructField(_hoodie_partition_path,StringType,true),StructField(_hoodie_file_name,StringType,true),StructField(uuid,LongType,true),StructField(message,StringType,true),StructField(precomb,LongType,true),StructField(partition,StringType,true))
   23/06/22 08:32:49 ERROR SqlQueryEqualityPreCommitValidator: query validation 
failed. See stdout for sample query results. Query: SELECT COUNT(*) FROM 
<TABLE_NAME> WHERE message IS NOT NULL
   Expected result (sample records only):
   +--------+
   |count(1)|
   +--------+
   |       0|
   +--------+
   
   Actual result (sample records only):
   +--------+
   |count(1)|
   +--------+
   |       3|
   +--------+
   
   23/06/22 08:32:49 ERROR BaseSparkCommitActionExecutor: validation failed for 
org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator
   org.apache.hudi.exception.HoodieValidationException: Query validation failed 
for 'SELECT COUNT(*) FROM <TABLE_NAME> WHERE message IS NOT NULL'. See stdout 
for expected vs actual records
        at 
org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator.validateUsingQuery(SqlQueryEqualityPreCommitValidator.java:73)
        at 
org.apache.hudi.client.validator.SqlQueryPreCommitValidator.lambda$validateRecordsBeforeAndAfter$0(SqlQueryPreCommitValidator.java:69)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:1006)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
        at 
java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
        at 
java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
        at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
        at 
java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:667)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
        at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
        at 
java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:765)
        at 
org.apache.hudi.client.validator.SqlQueryPreCommitValidator.validateRecordsBeforeAndAfter(SqlQueryPreCommitValidator.java:68)
        at 
org.apache.hudi.client.validator.SparkPreCommitValidator.validate(SparkPreCommitValidator.java:82)
        at 
org.apache.hudi.client.utils.SparkValidatorUtils.lambda$runValidatorAsync$2(SparkValidatorUtils.java:109)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
        at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
        at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1311)
        at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1840)
        at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1806)
        at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
   23/06/22 08:32:49 ERROR BaseSparkCommitActionExecutor: At least one 
pre-commit validation failed
   Failed to UPSERT An error occurred while calling o58.save.
   : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for 
commit time 20230622083213832
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:74)
        at 
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:109)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98)
        at 
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:141)
        at 
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:215)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:382)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:153)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
        at java.base/java.lang.reflect.Method.invoke(Method.java:578)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:1589)
   Caused by: org.apache.hudi.exception.HoodieValidationException: At least one 
pre-commit validation failed
        at 
org.apache.hudi.client.utils.SparkValidatorUtils.runValidators(SparkValidatorUtils.java:97)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.runPrecommitValidators(BaseSparkCommitActionExecutor.java:434)
        at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:180)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:279)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:184)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:86)
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:67)
        ... 46 more
   
   Trying Append 2
   +----+----------------+-------+---------+
   |uuid|         message|precomb|partition|
   +----+----------------+-------+---------+
   |   4|            null|    444|     null|
   |   5|This is APPEND 5|    555|        5|
   +----+----------------+-------+---------+
   
   23/06/22 08:33:28 WARN BaseSparkCommitActionExecutor: Cannot get table 
schema from before state.
   java.lang.IllegalArgumentException: Could not find any data file written for 
commit, so could not get schema for table file:/C:/tmp/hudidb/test
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableParquetSchemaFromDataFile(TableSchemaResolver.java:275)
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromDataFile(TableSchemaResolver.java:116)
        at 
org.apache.hudi.common.table.TableSchemaResolver.lambda$getTableAvroSchemaInternal$3(TableSchemaResolver.java:206)
        at org.apache.hudi.common.util.Option.orElseGet(Option.java:149)
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaInternal(TableSchemaResolver.java:205)
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:137)
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:126)
        at 
org.apache.hudi.client.utils.SparkValidatorUtils.getRecordsFromCommittedFiles(SparkValidatorUtils.java:142)
        at 
org.apache.hudi.client.utils.SparkValidatorUtils.runValidators(SparkValidatorUtils.java:83)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.runPrecommitValidators(BaseSparkCommitActionExecutor.java:434)
        at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:180)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:279)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:184)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:86)
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:67)
        at 
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:109)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98)
        at 
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:141)
        at 
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:215)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:382)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:153)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
        at java.base/java.lang.reflect.Method.invoke(Method.java:578)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:1589)
   23/06/22 08:33:28 WARN BaseSparkCommitActionExecutor: Use the schema from 
after state (current transaction) to create the empty Spark dataframe: 
StructType(StructField(_hoodie_commit_time,StringType,true),StructField(_hoodie_commit_seqno,StringType,true),StructField(_hoodie_record_key,StringType,true),StructField(_hoodie_partition_path,StringType,true),StructField(_hoodie_file_name,StringType,true),StructField(uuid,LongType,true),StructField(message,StringType,true),StructField(precomb,LongType,true),StructField(partition,StringType,true))
   23/06/22 08:33:29 ERROR SqlQueryEqualityPreCommitValidator: query validation 
failed. See stdout for sample query results. Query: SELECT COUNT(*) FROM 
<TABLE_NAME> WHERE message IS NOT NULL
   Expected result (sample records only):
   +--------+
   |count(1)|
   +--------+
   |       0|
   +--------+
   
   Actual result (sample records only):
   +--------+
   |count(1)|
   +--------+
   |       1|
   +--------+
   
   23/06/22 08:33:29 ERROR BaseSparkCommitActionExecutor: validation failed for 
org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator
   org.apache.hudi.exception.HoodieValidationException: Query validation failed 
for 'SELECT COUNT(*) FROM <TABLE_NAME> WHERE message IS NOT NULL'. See stdout 
for expected vs actual records
        at 
org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator.validateUsingQuery(SqlQueryEqualityPreCommitValidator.java:73)
        at 
org.apache.hudi.client.validator.SqlQueryPreCommitValidator.lambda$validateRecordsBeforeAndAfter$0(SqlQueryPreCommitValidator.java:69)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:1006)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
        at 
java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
        at 
java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
        at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
        at 
java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:667)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
        at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
        at 
java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:765)
        at 
org.apache.hudi.client.validator.SqlQueryPreCommitValidator.validateRecordsBeforeAndAfter(SqlQueryPreCommitValidator.java:68)
        at 
org.apache.hudi.client.validator.SparkPreCommitValidator.validate(SparkPreCommitValidator.java:82)
        at 
org.apache.hudi.client.utils.SparkValidatorUtils.lambda$runValidatorAsync$2(SparkValidatorUtils.java:109)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
        at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
        at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1311)
        at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1840)
        at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1806)
        at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
   23/06/22 08:33:29 ERROR BaseSparkCommitActionExecutor: At least one 
pre-commit validation failed
   Failed to UPSERT An error occurred while calling o88.save.
   : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for 
commit time 20230622083306135
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:74)
        at 
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:109)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98)
        at 
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:141)
        at 
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:215)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:382)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:153)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
        at java.base/java.lang.reflect.Method.invoke(Method.java:578)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:1589)
   Caused by: org.apache.hudi.exception.HoodieValidationException: At least one 
pre-commit validation failed
        at 
org.apache.hudi.client.utils.SparkValidatorUtils.runValidators(SparkValidatorUtils.java:97)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.runPrecommitValidators(BaseSparkCommitActionExecutor.java:434)
        at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:180)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:279)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:184)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:86)
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:67)
        ... 46 more
   
   23/06/22 08:33:29 WARN DiskMap: Error while deleting the disk map 
directory=/tmp//hudi-BITCASK-699636ae-d2e5-49f2-89f8-01424ebc6d34
   java.io.IOException: Unable to delete directory 
\tmp\hudi-BITCASK-699636ae-d2e5-49f2-89f8-01424ebc6d34
        at 
org.apache.hudi.common.util.FileIOUtils.deleteDirectory(FileIOUtils.java:59)
        at 
org.apache.hudi.common.util.collection.DiskMap.cleanup(DiskMap.java:100)
        at 
org.apache.hudi.common.util.collection.DiskMap.cleanup(DiskMap.java:92)
        at java.base/java.lang.Thread.run(Thread.java:1589)
   23/06/22 08:33:29 WARN DiskMap: Error while deleting the disk map 
directory=/tmp//hudi-BITCASK-6612ee96-763f-4aa4-aa49-6e70e9f54153
   java.io.IOException: Unable to delete directory 
\tmp\hudi-BITCASK-6612ee96-763f-4aa4-aa49-6e70e9f54153
        at 
org.apache.hudi.common.util.FileIOUtils.deleteDirectory(FileIOUtils.java:59)
        at 
org.apache.hudi.common.util.collection.DiskMap.cleanup(DiskMap.java:100)
        at 
org.apache.hudi.common.util.collection.DiskMap.cleanup(DiskMap.java:92)
        at java.base/java.lang.Thread.run(Thread.java:1589)
   
   Process finished with exit code 0
   
   ```
   
   # jar links 
   
https://apache-hudi.slack.com/files/U04U11G3LK0/F05DMRB5KPT/hudi-spark3.3-bundle_2.12-0.14.0-snapshot.jar
   
   CC @ad1happy2go 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to