brandonstanleyappfolio opened a new issue, #7627:
URL: https://github.com/apache/iceberg/issues/7627

   ### Query engine
   
   Spark 3.3.0
   Iceberg 1.2.0
   Debezium 2.0.1
   
   ### Question
   
   Hi Iceberg Community, 
   
   I am reaching out to ask if anyone has succeeded in using Spark Structured 
Streaming (PySpark) jobs to create upserted Iceberg tables via the [`MERGE 
INTO`](https://iceberg.apache.org/docs/1.2.0/spark-writes/#merge-into) SQL 
commands using a `forEachBatch`. In trying to achieve this, I ran into a couple 
of issues, and I would like to know if anyone has succeeded or had the same 
experience. 
   
   
   I was able to perform an initial write to the Iceberg Table. However, any 
subsequent insert, update or delete to a record resulted in the following 
exception being thrown: 
   
   ```
   Caused by: org.apache.iceberg.exceptions.ValidationException: Found 
conflicting files that can contain records matching true: 
[s3a://<PATH_TO_PARQUET_FILE>]
   ```
   I tried to simulate the same functionality using Spark in batch mode and did 
not get the same error. Hence I'm led to believe this is caused by the 
`foreachBatch` operation. Any help or guidance would be greatly appreciated!
   
   For additional context, I ran the following steps: 
   
   1. Started a PySpark shell
   ```
   pyspark --packages 
org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.0,za.co.absa:abris_2.12:6.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-hadoop-cloud_2.12:3.3.0
 --repositories https://packages.confluent.io/maven/ --conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
 --conf 
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog 
--conf spark.sql.catalog.spark_catalog.type=hadoop --conf 
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf 
spark.sql.catalog.local.type=hadoop --conf 
spark.sql.catalog.local.warehouse=$PWD/warehouse --conf 
spark.sql.catalog.spark_catalog.warehouse=s3a://<PATH_TO_WAREHOUSE> --conf 
spark.sql.catalog.spark_catalog.cache-enabled=false --conf 
spark.sql.catalog.spark_catalog.local.cache-enabled=false
   ```
   
   2. Created an empty Iceberg table on S3:
   ```
   spark.sql("CREATE OR REPLACE TABLE my_iceberg_table (id int, first_name 
string, last_name string, age int) USING iceberg LOCATION 
's3a://<PATH_TO_TABLE>'")
   ```
   
   3. Started a Spark Structured streaming application to consume CDC 
(Debezium) records from Kafka and upsert/delete the records into my Apache 
Iceberg table on S3. The following code is a sample of what I executed. I've 
excluded components irrelevant to the issue that I am trying to solve (ex: 
deserialization of Kafka messages using Schema Registry).
   ```
   TABLE_NAME = "<TABLE_NAME>"
   CHECKPOINT_LOCATION = "<CHECKPOINT_LOCATION>"
   TABLE_LOCATION = "<TABLE_LOCATION>"
   TOPIC_NAME = "<TOPIC_NAME>"
   KAFKA_BOOTSTRAP_SERVERS = "<KAFKA_BOOTSTRAP_SERVERS>"
   
   def write_to_s3(spark, df, target_table):
       df.createOrReplaceTempView(f"tmp_{target_table}")
       merge_sql = f"""
       MERGE INTO spark_catalog.default.{target_table} t
       USING (
           select id, first_name, last_name, age, op
           from (
               SELECT id, first_name, last_name, row_number() OVER (PARTITION 
BY id ORDER BY offset desc) row_num 
               FROM tmp_{target_table}
           ) 
           where row_num=1
       ) s
       ON t.id = s.id
       WHEN MATCHED AND s.op = 'd' THEN DELETE
       WHEN MATCHED AND s.op = 'u' THEN UPDATE SET t.id = s.id, 
t.first_name=s.first_name, t.last_name = s.last_name, t.age = s.age
       WHEN NOT MATCHED AND s.op IN ('c', 'r') THEN INSERT (id, first_name, 
last_name, age) VALUES (s.id, s.first_name, s.last_name, s.age)
       """
       df._jdf.sparkSession().sql(merge_sql)
   
   spark = SparkSession.builder.appName(app_name).getOrCreate()
   
   df = (
       spark.readStream.format("kafka")
       .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
       .option("startingOffsets", "earliest")
       .option("subscribe", TOPIC_NAME)
       .load()
   )
   
   query = df.writeStream.option("checkpointLocation", CHECKPOINT_LOCATION)
   
   write_to_s3_iceberg = query.foreachBatch(
       lambda batch_df, batch_id: write_to_s3(spark=spark, df=batch_df, 
target_table=TABLE_NAME)
   ).start()
   ```
   
   
   Here is the full stack trace:
   ```
   py4j.protocol.Py4JJavaError: An error occurred while calling o283.sql.
   : org.apache.spark.SparkException: Writing job aborted
        at 
org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:749)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:409)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:353)
        at 
org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:290)
        at 
org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:332)
        at 
org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:331)
        at 
org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.run(WriteToDataSourceV2Exec.scala:290)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
        at jdk.internal.reflect.GeneratedMethodAccessor144.invoke(Unknown 
Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
        at 
py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
        at com.sun.proxy.$Proxy37.call(Unknown Source)
        at 
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51)
        at 
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51)
        at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:660)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:658)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:658)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:255)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:218)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:212)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
   Caused by: org.apache.iceberg.exceptions.ValidationException: Found 
conflicting files that can contain records matching true: 
[s3a://<PATH_TO_PARQUET_FILE>]
        at 
org.apache.iceberg.MergingSnapshotProducer.validateAddedDataFiles(MergingSnapshotProducer.java:350)
        at 
org.apache.iceberg.BaseOverwriteFiles.validate(BaseOverwriteFiles.java:142)
        at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:215)
        at 
org.apache.iceberg.BaseOverwriteFiles.apply(BaseOverwriteFiles.java:31)
        at 
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:365)
        at 
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
        at 
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
        at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:363)
        at 
org.apache.iceberg.BaseOverwriteFiles.commit(BaseOverwriteFiles.java:31)
        at 
org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:213)
        at 
org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:83)
        at 
org.apache.iceberg.spark.source.SparkWrite$CopyOnWriteOperation.commitWithSerializableIsolation(SparkWrite.java:437)
        at 
org.apache.iceberg.spark.source.SparkWrite$CopyOnWriteOperation.commit(SparkWrite.java:407)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:392)
        ... 75 more
   
   
        at py4j.Protocol.getReturnValue(Protocol.java:476)
        at 
py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
        at com.sun.proxy.$Proxy37.call(Unknown Source)
        at 
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51)
        at 
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51)
        at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:660)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:658)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:658)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:255)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:218)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:212)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to