brandonstanleyappfolio opened a new issue, #7627: URL: https://github.com/apache/iceberg/issues/7627
### Query engine Spark 3.3.0 Iceberg 1.2.0 Debezium 2.0.1 ### Question Hi Iceberg Community, I am reaching out to ask if anyone has succeeded in using Spark Structured Streaming (PySpark) jobs to create upserted Iceberg tables via the [`MERGE INTO`](https://iceberg.apache.org/docs/1.2.0/spark-writes/#merge-into) SQL commands using a `forEachBatch`. In trying to achieve this, I ran into a couple of issues, and I would like to know if anyone has succeeded or had the same experience. I was able to perform an initial write to the Iceberg Table. However, any subsequent insert, update or delete to a record resulted in the following exception being thrown: ``` Caused by: org.apache.iceberg.exceptions.ValidationException: Found conflicting files that can contain records matching true: [s3a://<PATH_TO_PARQUET_FILE>] ``` I tried to simulate the same functionality using Spark in batch mode and did not get the same error. Hence I'm led to believe this is caused by the `foreachBatch` operation. Any help or guidance would be greatly appreciated! For additional context, I ran the following steps: 1. Started a PySpark shell ``` pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.0,za.co.absa:abris_2.12:6.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-hadoop-cloud_2.12:3.3.0 --repositories https://packages.confluent.io/maven/ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hadoop --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.local.type=hadoop --conf spark.sql.catalog.local.warehouse=$PWD/warehouse --conf spark.sql.catalog.spark_catalog.warehouse=s3a://<PATH_TO_WAREHOUSE> --conf spark.sql.catalog.spark_catalog.cache-enabled=false --conf spark.sql.catalog.spark_catalog.local.cache-enabled=false ``` 2. Created an empty Iceberg table on S3: ``` spark.sql("CREATE OR REPLACE TABLE my_iceberg_table (id int, first_name string, last_name string, age int) USING iceberg LOCATION 's3a://<PATH_TO_TABLE>'") ``` 3. Started a Spark Structured streaming application to consume CDC (Debezium) records from Kafka and upsert/delete the records into my Apache Iceberg table on S3. The following code is a sample of what I executed. I've excluded components irrelevant to the issue that I am trying to solve (ex: deserialization of Kafka messages using Schema Registry). ``` TABLE_NAME = "<TABLE_NAME>" CHECKPOINT_LOCATION = "<CHECKPOINT_LOCATION>" TABLE_LOCATION = "<TABLE_LOCATION>" TOPIC_NAME = "<TOPIC_NAME>" KAFKA_BOOTSTRAP_SERVERS = "<KAFKA_BOOTSTRAP_SERVERS>" def write_to_s3(spark, df, target_table): df.createOrReplaceTempView(f"tmp_{target_table}") merge_sql = f""" MERGE INTO spark_catalog.default.{target_table} t USING ( select id, first_name, last_name, age, op from ( SELECT id, first_name, last_name, row_number() OVER (PARTITION BY id ORDER BY offset desc) row_num FROM tmp_{target_table} ) where row_num=1 ) s ON t.id = s.id WHEN MATCHED AND s.op = 'd' THEN DELETE WHEN MATCHED AND s.op = 'u' THEN UPDATE SET t.id = s.id, t.first_name=s.first_name, t.last_name = s.last_name, t.age = s.age WHEN NOT MATCHED AND s.op IN ('c', 'r') THEN INSERT (id, first_name, last_name, age) VALUES (s.id, s.first_name, s.last_name, s.age) """ df._jdf.sparkSession().sql(merge_sql) spark = SparkSession.builder.appName(app_name).getOrCreate() df = ( spark.readStream.format("kafka") .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) .option("startingOffsets", "earliest") .option("subscribe", TOPIC_NAME) .load() ) query = df.writeStream.option("checkpointLocation", CHECKPOINT_LOCATION) write_to_s3_iceberg = query.foreachBatch( lambda batch_df, batch_id: write_to_s3(spark=spark, df=batch_df, target_table=TABLE_NAME) ).start() ``` Here is the full stack trace: ``` py4j.protocol.Py4JJavaError: An error occurred while calling o283.sql. : org.apache.spark.SparkException: Writing job aborted at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:749) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:409) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:353) at org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:290) at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:332) at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:331) at org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.run(WriteToDataSourceV2Exec.scala:290) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) at jdk.internal.reflect.GeneratedMethodAccessor144.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244) at py4j.CallbackClient.sendCommand(CallbackClient.java:384) at py4j.CallbackClient.sendCommand(CallbackClient.java:356) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106) at com.sun.proxy.$Proxy37.call(Unknown Source) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:660) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:658) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:658) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:255) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:218) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:212) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208) Caused by: org.apache.iceberg.exceptions.ValidationException: Found conflicting files that can contain records matching true: [s3a://<PATH_TO_PARQUET_FILE>] at org.apache.iceberg.MergingSnapshotProducer.validateAddedDataFiles(MergingSnapshotProducer.java:350) at org.apache.iceberg.BaseOverwriteFiles.validate(BaseOverwriteFiles.java:142) at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:215) at org.apache.iceberg.BaseOverwriteFiles.apply(BaseOverwriteFiles.java:31) at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:365) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:363) at org.apache.iceberg.BaseOverwriteFiles.commit(BaseOverwriteFiles.java:31) at org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:213) at org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:83) at org.apache.iceberg.spark.source.SparkWrite$CopyOnWriteOperation.commitWithSerializableIsolation(SparkWrite.java:437) at org.apache.iceberg.spark.source.SparkWrite$CopyOnWriteOperation.commit(SparkWrite.java:407) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:392) ... 75 more at py4j.Protocol.getReturnValue(Protocol.java:476) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) at com.sun.proxy.$Proxy37.call(Unknown Source) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:660) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:658) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:658) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:255) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:218) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:212) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
