This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 239bf4b10 [spark] supports saving a DataFrame to a paimon location 
(#1609)
239bf4b10 is described below

commit 239bf4b10d5f55024bfc8975652a933b3678616f
Author: Yann Byron <[email protected]>
AuthorDate: Thu Jul 20 13:44:04 2023 +0800

    [spark] supports saving a DataFrame to a paimon location (#1609)
---
 .../java/org/apache/paimon/spark/SparkSource.java  | 79 -------------------
 .../scala/org/apache/paimon/spark/SaveMode.scala   | 18 ++++-
 .../org/apache/paimon/spark/SparkSource.scala      | 92 ++++++++++++++++++++++
 .../spark/commands/WriteIntoPaimonTable.scala      |  5 +-
 .../paimon/spark/sql/DataFrameWriteTest.scala      | 74 +++++++++++++++++
 .../paimon/spark/sql/InsertOverwriteTest.scala     | 11 +--
 .../paimon/spark/sql/PaimonSparkTestBase.scala     | 30 +++++--
 .../spark/sql/TableValuedFunctionsTest.scala       |  7 +-
 .../paimon/spark/sql/WithTableOptions.scala}       | 14 ++--
 9 files changed, 225 insertions(+), 105 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkSource.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkSource.java
deleted file mode 100644
index a30e26dc1..000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkSource.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark;
-
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.FileStoreTableFactory;
-
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.catalog.SessionConfigSupport;
-import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.expressions.Transform;
-import org.apache.spark.sql.sources.DataSourceRegister;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-import java.util.Map;
-
-/** The spark source for paimon. */
-public class SparkSource implements DataSourceRegister, SessionConfigSupport {
-
-    /** Not use 'paimon' here, the '-' is not allowed in SQL. */
-    private static final String SHORT_NAME = "paimon";
-
-    @Override
-    public String shortName() {
-        return SHORT_NAME;
-    }
-
-    @Override
-    public StructType inferSchema(CaseInsensitiveStringMap options) {
-        // ignore schema.
-        // getTable will get schema by itself.
-        return null;
-    }
-
-    @Override
-    public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
-        // ignore partition.
-        // getTable will get partition by itself.
-        return null;
-    }
-
-    @Override
-    public boolean supportsExternalMetadata() {
-        return true;
-    }
-
-    @Override
-    public Table getTable(
-            StructType schema, Transform[] partitioning, Map<String, String> 
options) {
-        CatalogContext catalogContext =
-                CatalogContext.create(
-                        Options.fromMap(options),
-                        SparkSession.active().sessionState().newHadoopConf());
-        return new SparkTable(FileStoreTableFactory.create(catalogContext));
-    }
-
-    @Override
-    public String keyPrefix() {
-        return SHORT_NAME;
-    }
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SaveMode.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SaveMode.scala
index b4230d00f..9badc628b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SaveMode.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SaveMode.scala
@@ -17,7 +17,8 @@
  */
 package org.apache.paimon.spark
 
-import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.{SaveMode => SparkSaveMode}
+import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
 
 sealed trait SaveMode extends Serializable
 
@@ -26,3 +27,18 @@ object InsertInto extends SaveMode
 case class Overwrite(filters: Option[Filter]) extends SaveMode
 
 object DynamicOverWrite extends SaveMode
+
+object ErrorIfExists extends SaveMode
+
+object Ignore extends SaveMode
+
+object SaveMode {
+  def transform(saveMode: SparkSaveMode): SaveMode = {
+    saveMode match {
+      case SparkSaveMode.Overwrite => Overwrite(Some(AlwaysTrue))
+      case SparkSaveMode.Ignore => Ignore
+      case SparkSaveMode.Append => InsertInto
+      case SparkSaveMode.ErrorIfExists => ErrorIfExists
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
new file mode 100644
index 000000000..ae940c469
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.catalog.CatalogContext
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.commands.WriteIntoPaimonTable
+import org.apache.paimon.table.{FileStoreTable, FileStoreTableFactory}
+
+import org.apache.spark.sql.{DataFrame, SaveMode => SparkSaveMode, 
SparkSession, SQLContext}
+import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, Table}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, 
DataSourceRegister}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+class SparkSource
+  extends DataSourceRegister
+  with SessionConfigSupport
+  with CreatableRelationProvider {
+
+  override def shortName(): String = SparkSource.NAME
+
+  override def keyPrefix(): String = SparkSource.NAME
+
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    // ignore schema.
+    // getTable will get schema by itself.
+    null
+  }
+
+  override def inferPartitioning(options: CaseInsensitiveStringMap): 
Array[Transform] = {
+    // ignore partition.
+    // getTable will get partition by itself.
+    null
+  }
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: util.Map[String, String]): Table = {
+    new SparkTable(loadTable(properties))
+  }
+
+  override def createRelation(
+      sqlContext: SQLContext,
+      mode: SparkSaveMode,
+      parameters: Map[String, String],
+      data: DataFrame): BaseRelation = {
+    val table = loadTable(parameters.asJava)
+    WriteIntoPaimonTable(table, SaveMode.transform(mode), 
data).run(sqlContext.sparkSession)
+    SparkSource.toBaseRelation(table, sqlContext)
+  }
+
+  private def loadTable(options: util.Map[String, String]): FileStoreTable = {
+    val catalogContext = CatalogContext.create(
+      Options.fromMap(options),
+      SparkSession.active.sessionState.newHadoopConf())
+    FileStoreTableFactory.create(catalogContext)
+  }
+}
+
+object SparkSource {
+
+  val NAME = "paimon"
+
+  def toBaseRelation(table: FileStoreTable, _sqlContext: SQLContext): 
BaseRelation = {
+    new BaseRelation {
+      override def sqlContext: SQLContext = _sqlContext
+      override def schema: StructType = 
SparkTypeUtils.fromPaimonRowType(table.rowType())
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 0a482fdad..6ec4987d4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -20,8 +20,7 @@ package org.apache.paimon.spark.commands
 import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE
 import org.apache.paimon.data.BinaryRow
 import org.apache.paimon.index.PartitionIndex
-import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite, 
SaveMode}
-import org.apache.paimon.spark.SparkRow
+import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite, 
SaveMode, SparkRow}
 import org.apache.paimon.spark.SparkUtils.createIOManager
 import org.apache.paimon.table.{BucketMode, FileStoreTable, Table}
 import org.apache.paimon.table.sink.{BatchWriteBuilder, 
CommitMessageSerializer, DynamicBucketRow, RowPartitionKeyExtractor}
@@ -159,6 +158,8 @@ case class WriteIntoPaimonTable(_table: FileStoreTable, 
saveMode: SaveMode, data
       case DynamicOverWrite =>
         dynamicPartitionOverwriteMode = true
         throw new UnsupportedOperationException("Dynamic Overwrite is 
unsupported for now.")
+      case _ =>
+        throw new UnsupportedOperationException(s" This mode is unsupported 
for now.")
     }
     (dynamicPartitionOverwriteMode, overwritePartition)
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
new file mode 100644
index 000000000..ae95e505d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.WriteMode._
+
+import org.apache.spark.sql.Row
+import org.scalactic.source.Position
+
+class DataFrameWriteTest extends PaimonSparkTestBase {
+
+  writeModes.foreach {
+    writeMode =>
+      bucketModes.foreach {
+        bucket =>
+          test(s"Write data into Paimon directly: write-mode: $writeMode, 
bucket: $bucket") {
+
+            val _spark = spark
+            import _spark.implicits._
+
+            val primaryKeysProp = if (writeMode == CHANGE_LOG) {
+              "'primary-key'='a',"
+            } else {
+              ""
+            }
+
+            spark.sql(
+              s"""
+                 |CREATE TABLE T (a INT, b STRING)
+                 |TBLPROPERTIES ($primaryKeysProp 
'write-mode'='${writeMode.toString}', 'bucket'='$bucket')
+                 |""".stripMargin)
+
+            val paimonTable = loadTable("T")
+            val location = paimonTable.location().getPath
+
+            val df1 = Seq((1, "a"), (2, "b")).toDF("a", "b")
+            df1.write.format("paimon").mode("append").save(location)
+            checkAnswer(
+              spark.sql("SELECT * FROM T ORDER BY a, b"),
+              Row(1, "a") :: Row(2, "b") :: Nil)
+
+            val df2 = Seq((1, "a2"), (3, "c")).toDF("a", "b")
+            df2.write.format("paimon").mode("append").save(location)
+            val expected = if (writeMode == CHANGE_LOG) {
+              Row(1, "a2") :: Row(2, "b") :: Row(3, "c") :: Nil
+            } else {
+              Row(1, "a") :: Row(1, "a2") :: Row(2, "b") :: Row(3, "c") :: Nil
+            }
+            checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected)
+
+            val df3 = Seq((4, "d"), (5, "e")).toDF("a", "b")
+            df3.write.format("paimon").mode("overwrite").save(location)
+            checkAnswer(
+              spark.sql("SELECT * FROM T ORDER BY a, b"),
+              Row(4, "d") :: Row(5, "e") :: Nil)
+          }
+      }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
index 832a96d83..1289387b0 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.paimon.spark.sql
 
-import org.apache.paimon.WriteMode.{APPEND_ONLY, CHANGE_LOG}
+import org.apache.paimon.WriteMode.CHANGE_LOG
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types._
@@ -25,10 +25,7 @@ import org.scalactic.source.Position
 
 class InsertOverwriteTest extends PaimonSparkTestBase {
 
-  // 3: fixed bucket, -1: dynamic bucket
-  private val bucketModes = Seq(3, -1)
-
-  Seq(APPEND_ONLY, CHANGE_LOG).foreach {
+  writeModes.foreach {
     writeMode =>
       bucketModes.foreach {
         bucket =>
@@ -58,7 +55,7 @@ class InsertOverwriteTest extends PaimonSparkTestBase {
       }
   }
 
-  Seq(APPEND_ONLY, CHANGE_LOG).foreach {
+  writeModes.foreach {
     writeMode =>
       bucketModes.foreach {
         bucket =>
@@ -98,7 +95,7 @@ class InsertOverwriteTest extends PaimonSparkTestBase {
       }
   }
 
-  Seq(APPEND_ONLY, CHANGE_LOG).foreach {
+  writeModes.foreach {
     writeMode =>
       bucketModes.foreach {
         bucket =>
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSparkTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSparkTestBase.scala
index 797c24f69..d78148552 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSparkTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSparkTestBase.scala
@@ -17,7 +17,11 @@
  */
 package org.apache.paimon.spark.sql
 
-import org.apache.paimon.spark.{PaimonSparkSessionExtension, SparkCatalog, 
SparkGenericCatalog}
+import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory, 
Identifier}
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.{PaimonSparkSessionExtension, SparkCatalog}
+import org.apache.paimon.spark.catalog.Catalogs
+import org.apache.paimon.table.AbstractFileStoreTable
 
 import org.apache.spark.paimon.Utils
 import org.apache.spark.sql.QueryTest
@@ -27,10 +31,14 @@ import org.scalatest.Tag
 
 import java.io.File
 
-class PaimonSparkTestBase extends QueryTest with SharedSparkSession {
+class PaimonSparkTestBase extends QueryTest with SharedSparkSession with 
WithTableOptions {
 
   protected lazy val tempDBDir: File = Utils.createTempDir
 
+  protected lazy val catalog: Catalog = initCatalog()
+
+  protected val dbName0: String = "test"
+
   protected val tableName0: String = "T"
 
   override protected def sparkConf = {
@@ -42,14 +50,14 @@ class PaimonSparkTestBase extends QueryTest with 
SharedSparkSession {
 
   override protected def beforeAll(): Unit = {
     super.beforeAll()
-    spark.sql("CREATE DATABASE paimon.db")
-    spark.sql("USE paimon.db")
+    spark.sql(s"CREATE DATABASE paimon.$dbName0")
+    spark.sql(s"USE paimon.$dbName0")
   }
 
   override protected def afterAll(): Unit = {
     try {
       spark.sql("USE default")
-      spark.sql("DROP DATABASE paimon.db CASCADE")
+      spark.sql(s"DROP DATABASE paimon.$dbName0 CASCADE")
     } finally {
       super.afterAll()
     }
@@ -65,4 +73,16 @@ class PaimonSparkTestBase extends QueryTest with 
SharedSparkSession {
     println(testName)
     super.test(testName, testTags: _*)(testFun)(pos)
   }
+
+  private def initCatalog(): Catalog = {
+    val currentCatalog = 
spark.sessionState.catalogManager.currentCatalog.name()
+    val options = Catalogs.catalogOptions(currentCatalog, 
spark.sessionState.conf)
+    val catalogContext =
+      CatalogContext.create(Options.fromMap(options), 
spark.sessionState.newHadoopConf());
+    CatalogFactory.createCatalog(catalogContext);
+  }
+
+  def loadTable(tableName: String): AbstractFileStoreTable = {
+    catalog.getTable(Identifier.create(dbName0, 
tableName)).asInstanceOf[AbstractFileStoreTable]
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
index 9df31ca37..5877b0cfc 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.paimon.spark.sql
 
-import org.apache.paimon.WriteMode.{APPEND_ONLY, CHANGE_LOG}
+import org.apache.paimon.WriteMode.CHANGE_LOG
 import org.apache.paimon.spark.PaimonSparkSessionExtension
 
 import org.apache.spark.SparkConf
@@ -31,10 +31,7 @@ class TableValuedFunctionsTest extends PaimonSparkTestBase {
       .set("spark.sql.extensions", 
classOf[PaimonSparkSessionExtension].getName)
   }
 
-  // 3: fixed bucket, -1: dynamic bucket
-  private val bucketModes = Seq(3, -1)
-
-  Seq(APPEND_ONLY, CHANGE_LOG).foreach {
+  writeModes.foreach {
     writeMode =>
       bucketModes.foreach {
         bucket =>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SaveMode.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/WithTableOptions.scala
similarity index 72%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SaveMode.scala
copy to 
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/WithTableOptions.scala
index b4230d00f..fe309db00 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SaveMode.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/WithTableOptions.scala
@@ -15,14 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.paimon.spark
+package org.apache.paimon.spark.sql
 
-import org.apache.spark.sql.sources.Filter
+import org.apache.paimon.WriteMode
+import org.apache.paimon.WriteMode._
 
-sealed trait SaveMode extends Serializable
+trait WithTableOptions {
 
-object InsertInto extends SaveMode
+  // 3: fixed bucket, -1: dynamic bucket
+  protected val bucketModes: Seq[Int] = Seq(3, -1)
 
-case class Overwrite(filters: Option[Filter]) extends SaveMode
+  protected val writeModes: Seq[WriteMode] = Seq(CHANGE_LOG, APPEND_ONLY)
 
-object DynamicOverWrite extends SaveMode
+}

Reply via email to