This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new af42c2974 [spark] supports dynamic insert overwrite (#1756)
af42c2974 is described below

commit af42c29743d73ee64d8e49cf671bf685d317dc06
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Aug 9 10:41:40 2023 +0800

    [spark] supports dynamic insert overwrite (#1756)
---
 .../java/org/apache/paimon/spark/SparkTable.java   |   5 +
 .../org/apache/paimon/spark/PaimonAnalysis.scala   |  21 +++-
 .../spark/commands/WriteIntoPaimonTable.scala      |   2 +-
 .../PaimonDynamicPartitionOverwriteCommand.scala   |  72 ++++++++++++
 .../paimon/spark/sql/InsertOverwriteTest.scala     | 121 ++++++++++++++++++++-
 5 files changed, 218 insertions(+), 3 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index 2cd64c4bc..99d6513cb 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -61,6 +61,10 @@ public class SparkTable
         this.table = table;
     }
 
+    public Table getTable() {
+        return table;
+    }
+
     @Override
     public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
         Table newTable = table.copy(options.asCaseSensitiveMap());
@@ -83,6 +87,7 @@ public class SparkTable
         capabilities.add(TableCapability.BATCH_READ);
         capabilities.add(TableCapability.V1_BATCH_WRITE);
         capabilities.add(TableCapability.OVERWRITE_BY_FILTER);
+        capabilities.add(TableCapability.OVERWRITE_DYNAMIC);
         return capabilities;
     }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonAnalysis.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonAnalysis.scala
index df2cb3884..9b1587fc4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonAnalysis.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonAnalysis.scala
@@ -17,9 +17,13 @@
  */
 package org.apache.paimon.spark
 
+import org.apache.paimon.table.FileStoreTable
+
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
OverwritePartitionsDynamic}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import 
org.apache.spark.sql.paimon.commands.PaimonDynamicPartitionOverwriteCommand
 
 class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
 
@@ -28,6 +32,21 @@ class PaimonAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] {
     case func: PaimonTableValueFunction if func.args.forall(_.resolved) =>
       PaimonTableValuedFunctions.resolvePaimonTableValuedFunction(session, 
func)
 
+    case o @ OverwritePartitionsDynamicPaimon(r, d) if o.resolved =>
+      PaimonDynamicPartitionOverwriteCommand(r, d, o.query, o.writeOptions, 
o.isByName)
   }
+}
 
+object OverwritePartitionsDynamicPaimon {
+  def unapply(o: OverwritePartitionsDynamic): Option[(DataSourceV2Relation, 
FileStoreTable)] = {
+    if (o.query.resolved) {
+      o.table match {
+        case r: DataSourceV2Relation if r.table.isInstanceOf[SparkTable] =>
+          Some((r, 
r.table.asInstanceOf[SparkTable].getTable.asInstanceOf[FileStoreTable]))
+        case _ => None
+      }
+    } else {
+      None
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 4324a797f..0954e2136 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -168,7 +168,7 @@ case class WriteIntoPaimonTable(
         }
       case DynamicOverWrite =>
         dynamicPartitionOverwriteMode = true
-        throw new UnsupportedOperationException("Dynamic Overwrite is 
unsupported for now.")
+        Map.empty[String, String]
       case _ =>
         throw new UnsupportedOperationException(s" This mode is unsupported 
for now.")
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/commands/PaimonDynamicPartitionOverwriteCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/commands/PaimonDynamicPartitionOverwriteCommand.scala
new file mode 100644
index 000000000..842472a2e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/commands/PaimonDynamicPartitionOverwriteCommand.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.paimon.commands
+
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.DynamicOverWrite
+import org.apache.paimon.spark.commands.WriteIntoPaimonTable
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.NamedRelation
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, 
V2WriteCommand}
+import org.apache.spark.sql.execution.command.RunnableCommand
+
+import scala.collection.convert.ImplicitConversions._
+
+/**
+ * A `RunnableCommand` that will execute dynamic partition overwrite using 
[[WriteIntoPaimonTable]].
+ *
+ * This is a workaround of Spark not supporting V1 fallback for dynamic 
partition overwrite. Note
+ * the following details:
+ *   - Extends [[V2WriteCommand]] so that Spark can transform this plan.
+ *   - Exposes the query as a child so that the Spark optimizer can optimize 
it.
+ */
+case class PaimonDynamicPartitionOverwriteCommand(
+    table: NamedRelation,
+    fileStoreTable: FileStoreTable,
+    query: LogicalPlan,
+    writeOptions: Map[String, String],
+    isByName: Boolean)
+  extends RunnableCommand
+  with V2WriteCommand {
+
+  override def child: LogicalPlan = query
+
+  override def withNewQuery(newQuery: LogicalPlan): 
PaimonDynamicPartitionOverwriteCommand = {
+    copy(query = newQuery)
+  }
+
+  override def withNewTable(newTable: NamedRelation): 
PaimonDynamicPartitionOverwriteCommand = {
+    copy(table = newTable)
+  }
+
+  override protected def withNewChildInternal(
+      newChild: LogicalPlan): PaimonDynamicPartitionOverwriteCommand = 
copy(query = newChild)
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    WriteIntoPaimonTable(
+      fileStoreTable,
+      DynamicOverWrite,
+      Dataset.ofRows(sparkSession, query),
+      Options.fromMap(fileStoreTable.options() ++ writeOptions)
+    ).run(sparkSession)
+  }
+
+  override def storeAnalyzedQuery(): Command = copy(query = query)
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
index 1289387b0..f84fd3b27 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
@@ -21,7 +21,6 @@ import org.apache.paimon.WriteMode.CHANGE_LOG
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types._
-import org.scalactic.source.Position
 
 class InsertOverwriteTest extends PaimonSparkTestBase {
 
@@ -215,4 +214,124 @@ class InsertOverwriteTest extends PaimonSparkTestBase {
           Row(4, "d", sv2.value) :: Row(5, "e", sv1.value) :: Row(7, "g", 
sv1.value) :: Nil)
       }
   }
+
+  writeModes.foreach {
+    writeMode =>
+      bucketModes.foreach {
+        bucket =>
+          test(
+            s"dynamic insert overwrite single-partitioned table: write-mode: 
$writeMode, bucket: $bucket") {
+            val primaryKeysProp = if (writeMode == CHANGE_LOG) {
+              "'primary-key'='a,b',"
+            } else {
+              ""
+            }
+
+            spark.sql(
+              s"""
+                 |CREATE TABLE T (a INT, b INT, c STRING)
+                 |TBLPROPERTIES ($primaryKeysProp 
'write-mode'='${writeMode.toString}', 'bucket'='$bucket')
+                 |PARTITIONED BY (a)
+                 |""".stripMargin)
+
+            spark.sql("INSERT INTO T values (1, 1, '1'), (2, 2, '2')")
+            checkAnswer(
+              spark.sql("SELECT * FROM T ORDER BY a, b"),
+              Row(1, 1, "1") :: Row(2, 2, "2") :: Nil)
+
+            // overwrite the whole table
+            spark.sql("INSERT OVERWRITE T VALUES (1, 3, '3'), (2, 4, '4')")
+            checkAnswer(
+              spark.sql("SELECT * FROM T ORDER BY a, b"),
+              Row(1, 3, "3") :: Row(2, 4, "4") :: Nil)
+
+            withSQLConf("spark.sql.sources.partitionOverwriteMode" -> 
"dynamic") {
+              // dynamic overwrite the a=1 partition
+              spark.sql("INSERT OVERWRITE T VALUES (1, 5, '5'), (1, 7, '7')")
+              checkAnswer(
+                spark.sql("SELECT * FROM T ORDER BY a, b"),
+                Row(1, 5, "5") :: Row(1, 7, "7") :: Row(2, 4, "4") :: Nil)
+            }
+          }
+      }
+  }
+
+  writeModes.foreach {
+    writeMode =>
+      bucketModes.foreach {
+        bucket =>
+          test(
+            s"dynamic insert overwrite mutil-partitioned table: write-mode: 
$writeMode, bucket: $bucket") {
+            val primaryKeysProp = if (writeMode == CHANGE_LOG) {
+              "'primary-key'='a,pt1,pt2',"
+            } else {
+              ""
+            }
+
+            spark.sql(
+              s"""
+                 |CREATE TABLE T (a INT, b STRING, pt1 STRING, pt2 INT)
+                 |TBLPROPERTIES ($primaryKeysProp 
'write-mode'='${writeMode.toString}', 'bucket'='$bucket')
+                 |PARTITIONED BY (pt1, pt2)
+                 |""".stripMargin)
+
+            spark.sql(
+              "INSERT INTO T values (1, 'a', 'ptv1', 11), (2, 'b', 'ptv1', 
11), (3, 'c', 'ptv1', 22), (4, 'd', 'ptv2', 22)")
+            checkAnswer(
+              spark.sql("SELECT * FROM T ORDER BY a, b"),
+              Row(1, "a", "ptv1", 11) :: Row(2, "b", "ptv1", 11) :: Row(3, 
"c", "ptv1", 22) :: Row(
+                4,
+                "d",
+                "ptv2",
+                22) :: Nil)
+
+            withSQLConf("spark.sql.sources.partitionOverwriteMode" -> 
"dynamic") {
+              // dynamic overwrite the pt2=22 partition
+              spark.sql(
+                "INSERT OVERWRITE T PARTITION (pt2 = 22) VALUES (3, 'c2', 
'ptv1'), (4, 'd2', 'ptv3')")
+              checkAnswer(
+                spark.sql("SELECT * FROM T ORDER BY a, b"),
+                Row(1, "a", "ptv1", 11) :: Row(2, "b", "ptv1", 11) :: Row(
+                  3,
+                  "c2",
+                  "ptv1",
+                  22) :: Row(4, "d", "ptv2", 22) :: Row(4, "d2", "ptv3", 22) 
:: Nil
+              )
+
+              // dynamic overwrite the pt1=ptv3 partition
+              spark.sql("INSERT OVERWRITE T PARTITION (pt1 = 'ptv3') VALUES 
(4, 'd3', 22)")
+              checkAnswer(
+                spark.sql("SELECT * FROM T ORDER BY a, b"),
+                Row(1, "a", "ptv1", 11) :: Row(2, "b", "ptv1", 11) :: Row(
+                  3,
+                  "c2",
+                  "ptv1",
+                  22) :: Row(4, "d", "ptv2", 22) :: Row(4, "d3", "ptv3", 22) 
:: Nil
+              )
+
+              // dynamic overwrite the pt1=ptv1, pt2=11 partition
+              spark.sql("INSERT OVERWRITE T PARTITION (pt1 = 'ptv1', pt2=11) 
VALUES (5, 'e')")
+              checkAnswer(
+                spark.sql("SELECT * FROM T ORDER BY a, b"),
+                Row(3, "c2", "ptv1", 22) :: Row(4, "d", "ptv2", 22) :: Row(
+                  4,
+                  "d3",
+                  "ptv3",
+                  22) :: Row(5, "e", "ptv1", 11) :: Nil)
+
+              // dynamic overwrite the whole table
+              spark.sql(
+                "INSERT OVERWRITE T VALUES (1, 'a5', 'ptv1', 11), (3, 'c5', 
'ptv1', 22), (4, 'd5', 'ptv3', 22)")
+              checkAnswer(
+                spark.sql("SELECT * FROM T ORDER BY a, b"),
+                Row(1, "a5", "ptv1", 11) :: Row(3, "c5", "ptv1", 22) :: Row(
+                  4,
+                  "d",
+                  "ptv2",
+                  22) :: Row(4, "d5", "ptv3", 22) :: Nil)
+            }
+          }
+      }
+  }
+
 }

Reply via email to