This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d8000bc85 [spark] Harden dynamic overwrite against optimized child 
plans (#8052)
4d8000bc85 is described below

commit 4d8000bc8537cb2f07d1bc5cd1897ca84a27b5a3
Author: Kerwin Zhang <[email protected]>
AuthorDate: Thu Jun 4 12:14:43 2026 +0800

    [spark] Harden dynamic overwrite against optimized child plans (#8052)
    
    `PaimonDynamicPartitionOverwriteCommand` exposes its child query to
    Spark optimizer through `V2WriteCommand`, but later wraps the same query
    back into a Dataset in `run()` before passing it to
    `WriteIntoPaimonTable`.This is fragile when the child query has already
    been optimized by Spark. The optimized plan may contain
    optimizer/planner-side placeholders, such as `DynamicPruningSubquery`,
    which are not ideal to expose again to writer-side Dataset operations.
    
    This PR makes the command-to-writer boundary more robust for the dynamic
    partition overwrite fallback path. Before passing the query to
    `WriteIntoPaimonTable`, it converts the child query into an RDD-backed
    DataFrame via `createNewDataFrame(createDataset(...))`. As a result, the
    writer consumes a clean logical plan instead of directly consuming the
    possibly optimized child plan.
---
 .../PaimonDynamicPartitionOverwriteCommand.scala   |   4 +-
 ...aimonDynamicPartitionOverwriteCommandTest.scala | 113 +++++++++++++++++++++
 2 files changed, 115 insertions(+), 2 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommand.scala
index 1edcc99b8e..0f722d4c80 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommand.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.spark.DynamicOverWrite
 import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.PaimonUtils.createDataset
+import org.apache.spark.sql.PaimonUtils.{createDataset, createNewDataFrame}
 import org.apache.spark.sql.catalyst.analysis.NamedRelation
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, 
V2WriteCommand}
 import org.apache.spark.sql.execution.command.RunnableCommand
@@ -64,7 +64,7 @@ case class PaimonDynamicPartitionOverwriteCommand(
     WriteIntoPaimonTable(
       fileStoreTable,
       DynamicOverWrite,
-      createDataset(sparkSession, query),
+      createNewDataFrame(createDataset(sparkSession, query)),
       Options.fromMap(fileStoreTable.options() ++ writeOptions)
     ).run(sparkSession)
   }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommandTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommandTest.scala
new file mode 100644
index 0000000000..ef4046ccc9
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommandTest.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.commands
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.PaimonUtils.{createDataset, createNewDataFrame}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
+import org.apache.spark.sql.catalyst.expressions.DynamicPruningSubquery
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import java.io.File
+
+class PaimonDynamicPartitionOverwriteCommandTest extends PaimonSparkTestBase {
+
+  import testImplicits._
+
+  test("dynamic overwrite consumes optimizer-safe child plan") {
+    withTempDir {
+      tempDir =>
+        withTable("paimon_target") {
+          try {
+            sql("CREATE TABLE paimon_target (id INT, pt STRING) PARTITIONED BY 
(pt)")
+            sql("INSERT INTO paimon_target VALUES (3, 'p3')")
+
+            val srcPath = new File(tempDir, "parquet_src").getCanonicalPath
+            val dimPath = new File(tempDir, "dim").getCanonicalPath
+            Seq((1, "p1"), (2, "p2"), (3, "p3"))
+              .toDF("id", "pt")
+              .write
+              .partitionBy("pt")
+              .parquet(srcPath)
+            spark.read.parquet(srcPath).createOrReplaceTempView("parquet_src")
+
+            Seq(("p1", "use"), ("p2", "use"), ("p3", "skip"))
+              .toDF("pt", "tag")
+              .write
+              .parquet(dimPath)
+            spark.read.parquet(dimPath).createOrReplaceTempView("dim")
+
+            withSparkSQLConf(
+              "spark.sql.sources.partitionOverwriteMode" -> "dynamic",
+              "spark.paimon.write.use-v2-write" -> "false",
+              "spark.sql.optimizer.dynamicPartitionPruning.enabled" -> "true",
+              "spark.sql.optimizer.dynamicPartitionPruning.useStats" -> 
"false",
+              
"spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio" -> "1.0",
+              "spark.sql.autoBroadcastJoinThreshold" -> "10485760",
+              "spark.sql.adaptive.enabled" -> "false"
+            ) {
+              val insertSql =
+                """
+                  |INSERT OVERWRITE paimon_target
+                  |SELECT /*+ BROADCAST(dim) */ s.id + 100 AS id, s.pt
+                  |FROM parquet_src s JOIN dim ON s.pt = dim.pt
+                  |WHERE dim.tag = 'use'
+                  |""".stripMargin
+
+              val parsed = spark.sessionState.sqlParser.parsePlan(insertSql)
+              val analyzed =
+                spark.sessionState.analyzer.executeAndCheck(parsed, new 
QueryPlanningTracker)
+              val cmd = 
analyzed.asInstanceOf[PaimonDynamicPartitionOverwriteCommand]
+
+              val optimizedChild = 
spark.sessionState.optimizer.execute(cmd.query)
+              assert(
+                hasDynamicPruningSubquery(optimizedChild),
+                s"Expected dynamic pruning in optimized child, 
got:\n$optimizedChild")
+
+              val cmdWithOptimizedQuery = cmd.copy(query = optimizedChild)
+              val writeDataFrame =
+                createNewDataFrame(createDataset(spark, 
cmdWithOptimizedQuery.query))
+              assert(
+                
!hasDynamicPruningSubquery(writeDataFrame.queryExecution.logical),
+                s"Expected writer DataFrame to be free of dynamic pruning, 
got:\n" +
+                  writeDataFrame.queryExecution.logical
+              )
+
+              cmdWithOptimizedQuery.run(spark)
+              checkAnswer(
+                sql("SELECT * FROM paimon_target ORDER BY id"),
+                Seq(Row(3, "p3"), Row(101, "p1"), Row(102, "p2")))
+            }
+          } finally {
+            spark.catalog.dropTempView("parquet_src")
+            spark.catalog.dropTempView("dim")
+          }
+        }
+    }
+  }
+
+  private def hasDynamicPruningSubquery(plan: LogicalPlan): Boolean = {
+    plan.exists(_.expressions.exists(_.exists {
+      case _: DynamicPruningSubquery => true
+      case _ => false
+    }))
+  }
+}

Reply via email to