This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4d8000bc85 [spark] Harden dynamic overwrite against optimized child
plans (#8052)
4d8000bc85 is described below
commit 4d8000bc8537cb2f07d1bc5cd1897ca84a27b5a3
Author: Kerwin Zhang <[email protected]>
AuthorDate: Thu Jun 4 12:14:43 2026 +0800
[spark] Harden dynamic overwrite against optimized child plans (#8052)
`PaimonDynamicPartitionOverwriteCommand` exposes its child query to
Spark optimizer through `V2WriteCommand`, but later wraps the same query
back into a Dataset in `run()` before passing it to
`WriteIntoPaimonTable`.This is fragile when the child query has already
been optimized by Spark. The optimized plan may contain
optimizer/planner-side placeholders, such as `DynamicPruningSubquery`,
which are not ideal to expose again to writer-side Dataset operations.
This PR makes the command-to-writer boundary more robust for the dynamic
partition overwrite fallback path. Before passing the query to
`WriteIntoPaimonTable`, it converts the child query into an RDD-backed
DataFrame via `createNewDataFrame(createDataset(...))`. As a result, the
writer consumes a clean logical plan instead of directly consuming the
possibly optimized child plan.
---
.../PaimonDynamicPartitionOverwriteCommand.scala | 4 +-
...aimonDynamicPartitionOverwriteCommandTest.scala | 113 +++++++++++++++++++++
2 files changed, 115 insertions(+), 2 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommand.scala
index 1edcc99b8e..0f722d4c80 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommand.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.spark.DynamicOverWrite
import org.apache.paimon.table.FileStoreTable
import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.PaimonUtils.createDataset
+import org.apache.spark.sql.PaimonUtils.{createDataset, createNewDataFrame}
import org.apache.spark.sql.catalyst.analysis.NamedRelation
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan,
V2WriteCommand}
import org.apache.spark.sql.execution.command.RunnableCommand
@@ -64,7 +64,7 @@ case class PaimonDynamicPartitionOverwriteCommand(
WriteIntoPaimonTable(
fileStoreTable,
DynamicOverWrite,
- createDataset(sparkSession, query),
+ createNewDataFrame(createDataset(sparkSession, query)),
Options.fromMap(fileStoreTable.options() ++ writeOptions)
).run(sparkSession)
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommandTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommandTest.scala
new file mode 100644
index 0000000000..ef4046ccc9
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommandTest.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.commands
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.PaimonUtils.{createDataset, createNewDataFrame}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
+import org.apache.spark.sql.catalyst.expressions.DynamicPruningSubquery
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import java.io.File
+
+class PaimonDynamicPartitionOverwriteCommandTest extends PaimonSparkTestBase {
+
+ import testImplicits._
+
+ test("dynamic overwrite consumes optimizer-safe child plan") {
+ withTempDir {
+ tempDir =>
+ withTable("paimon_target") {
+ try {
+ sql("CREATE TABLE paimon_target (id INT, pt STRING) PARTITIONED BY
(pt)")
+ sql("INSERT INTO paimon_target VALUES (3, 'p3')")
+
+ val srcPath = new File(tempDir, "parquet_src").getCanonicalPath
+ val dimPath = new File(tempDir, "dim").getCanonicalPath
+ Seq((1, "p1"), (2, "p2"), (3, "p3"))
+ .toDF("id", "pt")
+ .write
+ .partitionBy("pt")
+ .parquet(srcPath)
+ spark.read.parquet(srcPath).createOrReplaceTempView("parquet_src")
+
+ Seq(("p1", "use"), ("p2", "use"), ("p3", "skip"))
+ .toDF("pt", "tag")
+ .write
+ .parquet(dimPath)
+ spark.read.parquet(dimPath).createOrReplaceTempView("dim")
+
+ withSparkSQLConf(
+ "spark.sql.sources.partitionOverwriteMode" -> "dynamic",
+ "spark.paimon.write.use-v2-write" -> "false",
+ "spark.sql.optimizer.dynamicPartitionPruning.enabled" -> "true",
+ "spark.sql.optimizer.dynamicPartitionPruning.useStats" ->
"false",
+
"spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio" -> "1.0",
+ "spark.sql.autoBroadcastJoinThreshold" -> "10485760",
+ "spark.sql.adaptive.enabled" -> "false"
+ ) {
+ val insertSql =
+ """
+ |INSERT OVERWRITE paimon_target
+ |SELECT /*+ BROADCAST(dim) */ s.id + 100 AS id, s.pt
+ |FROM parquet_src s JOIN dim ON s.pt = dim.pt
+ |WHERE dim.tag = 'use'
+ |""".stripMargin
+
+ val parsed = spark.sessionState.sqlParser.parsePlan(insertSql)
+ val analyzed =
+ spark.sessionState.analyzer.executeAndCheck(parsed, new
QueryPlanningTracker)
+ val cmd =
analyzed.asInstanceOf[PaimonDynamicPartitionOverwriteCommand]
+
+ val optimizedChild =
spark.sessionState.optimizer.execute(cmd.query)
+ assert(
+ hasDynamicPruningSubquery(optimizedChild),
+ s"Expected dynamic pruning in optimized child,
got:\n$optimizedChild")
+
+ val cmdWithOptimizedQuery = cmd.copy(query = optimizedChild)
+ val writeDataFrame =
+ createNewDataFrame(createDataset(spark,
cmdWithOptimizedQuery.query))
+ assert(
+
!hasDynamicPruningSubquery(writeDataFrame.queryExecution.logical),
+ s"Expected writer DataFrame to be free of dynamic pruning,
got:\n" +
+ writeDataFrame.queryExecution.logical
+ )
+
+ cmdWithOptimizedQuery.run(spark)
+ checkAnswer(
+ sql("SELECT * FROM paimon_target ORDER BY id"),
+ Seq(Row(3, "p3"), Row(101, "p1"), Row(102, "p2")))
+ }
+ } finally {
+ spark.catalog.dropTempView("parquet_src")
+ spark.catalog.dropTempView("dim")
+ }
+ }
+ }
+ }
+
+ private def hasDynamicPruningSubquery(plan: LogicalPlan): Boolean = {
+ plan.exists(_.expressions.exists(_.exists {
+ case _: DynamicPruningSubquery => true
+ case _ => false
+ }))
+ }
+}