This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new af42c2974 [spark] supports dynamic insert overwrite (#1756)
af42c2974 is described below
commit af42c29743d73ee64d8e49cf671bf685d317dc06
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Aug 9 10:41:40 2023 +0800
[spark] supports dynamic insert overwrite (#1756)
---
.../java/org/apache/paimon/spark/SparkTable.java | 5 +
.../org/apache/paimon/spark/PaimonAnalysis.scala | 21 +++-
.../spark/commands/WriteIntoPaimonTable.scala | 2 +-
.../PaimonDynamicPartitionOverwriteCommand.scala | 72 ++++++++++++
.../paimon/spark/sql/InsertOverwriteTest.scala | 121 ++++++++++++++++++++-
5 files changed, 218 insertions(+), 3 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index 2cd64c4bc..99d6513cb 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -61,6 +61,10 @@ public class SparkTable
this.table = table;
}
+ public Table getTable() {
+ return table;
+ }
+
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
Table newTable = table.copy(options.asCaseSensitiveMap());
@@ -83,6 +87,7 @@ public class SparkTable
capabilities.add(TableCapability.BATCH_READ);
capabilities.add(TableCapability.V1_BATCH_WRITE);
capabilities.add(TableCapability.OVERWRITE_BY_FILTER);
+ capabilities.add(TableCapability.OVERWRITE_DYNAMIC);
return capabilities;
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonAnalysis.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonAnalysis.scala
index df2cb3884..9b1587fc4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonAnalysis.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonAnalysis.scala
@@ -17,9 +17,13 @@
*/
package org.apache.paimon.spark
+import org.apache.paimon.table.FileStoreTable
+
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
OverwritePartitionsDynamic}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import
org.apache.spark.sql.paimon.commands.PaimonDynamicPartitionOverwriteCommand
class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
@@ -28,6 +32,21 @@ class PaimonAnalysis(session: SparkSession) extends
Rule[LogicalPlan] {
case func: PaimonTableValueFunction if func.args.forall(_.resolved) =>
PaimonTableValuedFunctions.resolvePaimonTableValuedFunction(session,
func)
+ case o @ OverwritePartitionsDynamicPaimon(r, d) if o.resolved =>
+ PaimonDynamicPartitionOverwriteCommand(r, d, o.query, o.writeOptions,
o.isByName)
}
+}
+object OverwritePartitionsDynamicPaimon {
+ def unapply(o: OverwritePartitionsDynamic): Option[(DataSourceV2Relation,
FileStoreTable)] = {
+ if (o.query.resolved) {
+ o.table match {
+ case r: DataSourceV2Relation if r.table.isInstanceOf[SparkTable] =>
+ Some((r,
r.table.asInstanceOf[SparkTable].getTable.asInstanceOf[FileStoreTable]))
+ case _ => None
+ }
+ } else {
+ None
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 4324a797f..0954e2136 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -168,7 +168,7 @@ case class WriteIntoPaimonTable(
}
case DynamicOverWrite =>
dynamicPartitionOverwriteMode = true
- throw new UnsupportedOperationException("Dynamic Overwrite is
unsupported for now.")
+ Map.empty[String, String]
case _ =>
throw new UnsupportedOperationException(s" This mode is unsupported
for now.")
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/commands/PaimonDynamicPartitionOverwriteCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/commands/PaimonDynamicPartitionOverwriteCommand.scala
new file mode 100644
index 000000000..842472a2e
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/commands/PaimonDynamicPartitionOverwriteCommand.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.paimon.commands
+
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.DynamicOverWrite
+import org.apache.paimon.spark.commands.WriteIntoPaimonTable
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.NamedRelation
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan,
V2WriteCommand}
+import org.apache.spark.sql.execution.command.RunnableCommand
+
+import scala.collection.convert.ImplicitConversions._
+
+/**
+ * A `RunnableCommand` that will execute dynamic partition overwrite using
[[WriteIntoPaimonTable]].
+ *
+ * This is a workaround of Spark not supporting V1 fallback for dynamic
partition overwrite. Note
+ * the following details:
+ * - Extends [[V2WriteCommand]] so that Spark can transform this plan.
+ * - Exposes the query as a child so that the Spark optimizer can optimize
it.
+ */
+case class PaimonDynamicPartitionOverwriteCommand(
+ table: NamedRelation,
+ fileStoreTable: FileStoreTable,
+ query: LogicalPlan,
+ writeOptions: Map[String, String],
+ isByName: Boolean)
+ extends RunnableCommand
+ with V2WriteCommand {
+
+ override def child: LogicalPlan = query
+
+ override def withNewQuery(newQuery: LogicalPlan):
PaimonDynamicPartitionOverwriteCommand = {
+ copy(query = newQuery)
+ }
+
+ override def withNewTable(newTable: NamedRelation):
PaimonDynamicPartitionOverwriteCommand = {
+ copy(table = newTable)
+ }
+
+ override protected def withNewChildInternal(
+ newChild: LogicalPlan): PaimonDynamicPartitionOverwriteCommand =
copy(query = newChild)
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ WriteIntoPaimonTable(
+ fileStoreTable,
+ DynamicOverWrite,
+ Dataset.ofRows(sparkSession, query),
+ Options.fromMap(fileStoreTable.options() ++ writeOptions)
+ ).run(sparkSession)
+ }
+
+ override def storeAnalyzedQuery(): Command = copy(query = query)
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
index 1289387b0..f84fd3b27 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
@@ -21,7 +21,6 @@ import org.apache.paimon.WriteMode.CHANGE_LOG
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
-import org.scalactic.source.Position
class InsertOverwriteTest extends PaimonSparkTestBase {
@@ -215,4 +214,124 @@ class InsertOverwriteTest extends PaimonSparkTestBase {
Row(4, "d", sv2.value) :: Row(5, "e", sv1.value) :: Row(7, "g",
sv1.value) :: Nil)
}
}
+
+ writeModes.foreach {
+ writeMode =>
+ bucketModes.foreach {
+ bucket =>
+ test(
+ s"dynamic insert overwrite single-partitioned table: write-mode:
$writeMode, bucket: $bucket") {
+ val primaryKeysProp = if (writeMode == CHANGE_LOG) {
+ "'primary-key'='a,b',"
+ } else {
+ ""
+ }
+
+ spark.sql(
+ s"""
+ |CREATE TABLE T (a INT, b INT, c STRING)
+ |TBLPROPERTIES ($primaryKeysProp
'write-mode'='${writeMode.toString}', 'bucket'='$bucket')
+ |PARTITIONED BY (a)
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T values (1, 1, '1'), (2, 2, '2')")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a, b"),
+ Row(1, 1, "1") :: Row(2, 2, "2") :: Nil)
+
+ // overwrite the whole table
+ spark.sql("INSERT OVERWRITE T VALUES (1, 3, '3'), (2, 4, '4')")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a, b"),
+ Row(1, 3, "3") :: Row(2, 4, "4") :: Nil)
+
+ withSQLConf("spark.sql.sources.partitionOverwriteMode" ->
"dynamic") {
+ // dynamic overwrite the a=1 partition
+ spark.sql("INSERT OVERWRITE T VALUES (1, 5, '5'), (1, 7, '7')")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a, b"),
+ Row(1, 5, "5") :: Row(1, 7, "7") :: Row(2, 4, "4") :: Nil)
+ }
+ }
+ }
+ }
+
+ writeModes.foreach {
+ writeMode =>
+ bucketModes.foreach {
+ bucket =>
+ test(
+ s"dynamic insert overwrite mutil-partitioned table: write-mode:
$writeMode, bucket: $bucket") {
+ val primaryKeysProp = if (writeMode == CHANGE_LOG) {
+ "'primary-key'='a,pt1,pt2',"
+ } else {
+ ""
+ }
+
+ spark.sql(
+ s"""
+ |CREATE TABLE T (a INT, b STRING, pt1 STRING, pt2 INT)
+ |TBLPROPERTIES ($primaryKeysProp
'write-mode'='${writeMode.toString}', 'bucket'='$bucket')
+ |PARTITIONED BY (pt1, pt2)
+ |""".stripMargin)
+
+ spark.sql(
+ "INSERT INTO T values (1, 'a', 'ptv1', 11), (2, 'b', 'ptv1',
11), (3, 'c', 'ptv1', 22), (4, 'd', 'ptv2', 22)")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a, b"),
+ Row(1, "a", "ptv1", 11) :: Row(2, "b", "ptv1", 11) :: Row(3,
"c", "ptv1", 22) :: Row(
+ 4,
+ "d",
+ "ptv2",
+ 22) :: Nil)
+
+ withSQLConf("spark.sql.sources.partitionOverwriteMode" ->
"dynamic") {
+ // dynamic overwrite the pt2=22 partition
+ spark.sql(
+ "INSERT OVERWRITE T PARTITION (pt2 = 22) VALUES (3, 'c2',
'ptv1'), (4, 'd2', 'ptv3')")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a, b"),
+ Row(1, "a", "ptv1", 11) :: Row(2, "b", "ptv1", 11) :: Row(
+ 3,
+ "c2",
+ "ptv1",
+ 22) :: Row(4, "d", "ptv2", 22) :: Row(4, "d2", "ptv3", 22)
:: Nil
+ )
+
+ // dynamic overwrite the pt1=ptv3 partition
+ spark.sql("INSERT OVERWRITE T PARTITION (pt1 = 'ptv3') VALUES
(4, 'd3', 22)")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a, b"),
+ Row(1, "a", "ptv1", 11) :: Row(2, "b", "ptv1", 11) :: Row(
+ 3,
+ "c2",
+ "ptv1",
+ 22) :: Row(4, "d", "ptv2", 22) :: Row(4, "d3", "ptv3", 22)
:: Nil
+ )
+
+ // dynamic overwrite the pt1=ptv1, pt2=11 partition
+ spark.sql("INSERT OVERWRITE T PARTITION (pt1 = 'ptv1', pt2=11)
VALUES (5, 'e')")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a, b"),
+ Row(3, "c2", "ptv1", 22) :: Row(4, "d", "ptv2", 22) :: Row(
+ 4,
+ "d3",
+ "ptv3",
+ 22) :: Row(5, "e", "ptv1", 11) :: Nil)
+
+ // dynamic overwrite the whole table
+ spark.sql(
+ "INSERT OVERWRITE T VALUES (1, 'a5', 'ptv1', 11), (3, 'c5',
'ptv1', 22), (4, 'd5', 'ptv3', 22)")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a, b"),
+ Row(1, "a5", "ptv1", 11) :: Row(3, "c5", "ptv1", 22) :: Row(
+ 4,
+ "d",
+ "ptv2",
+ 22) :: Row(4, "d5", "ptv3", 22) :: Nil)
+ }
+ }
+ }
+ }
+
}