This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 239bf4b10 [spark] supports saving a DataFrame to a paimon location
(#1609)
239bf4b10 is described below
commit 239bf4b10d5f55024bfc8975652a933b3678616f
Author: Yann Byron <[email protected]>
AuthorDate: Thu Jul 20 13:44:04 2023 +0800
[spark] supports saving a DataFrame to a paimon location (#1609)
---
.../java/org/apache/paimon/spark/SparkSource.java | 79 -------------------
.../scala/org/apache/paimon/spark/SaveMode.scala | 18 ++++-
.../org/apache/paimon/spark/SparkSource.scala | 92 ++++++++++++++++++++++
.../spark/commands/WriteIntoPaimonTable.scala | 5 +-
.../paimon/spark/sql/DataFrameWriteTest.scala | 74 +++++++++++++++++
.../paimon/spark/sql/InsertOverwriteTest.scala | 11 +--
.../paimon/spark/sql/PaimonSparkTestBase.scala | 30 +++++--
.../spark/sql/TableValuedFunctionsTest.scala | 7 +-
.../paimon/spark/sql/WithTableOptions.scala} | 14 ++--
9 files changed, 225 insertions(+), 105 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkSource.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkSource.java
deleted file mode 100644
index a30e26dc1..000000000
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkSource.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark;
-
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.FileStoreTableFactory;
-
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.catalog.SessionConfigSupport;
-import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.expressions.Transform;
-import org.apache.spark.sql.sources.DataSourceRegister;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-import java.util.Map;
-
-/** The spark source for paimon. */
-public class SparkSource implements DataSourceRegister, SessionConfigSupport {
-
- /** Not use 'paimon' here, the '-' is not allowed in SQL. */
- private static final String SHORT_NAME = "paimon";
-
- @Override
- public String shortName() {
- return SHORT_NAME;
- }
-
- @Override
- public StructType inferSchema(CaseInsensitiveStringMap options) {
- // ignore schema.
- // getTable will get schema by itself.
- return null;
- }
-
- @Override
- public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
- // ignore partition.
- // getTable will get partition by itself.
- return null;
- }
-
- @Override
- public boolean supportsExternalMetadata() {
- return true;
- }
-
- @Override
- public Table getTable(
- StructType schema, Transform[] partitioning, Map<String, String>
options) {
- CatalogContext catalogContext =
- CatalogContext.create(
- Options.fromMap(options),
- SparkSession.active().sessionState().newHadoopConf());
- return new SparkTable(FileStoreTableFactory.create(catalogContext));
- }
-
- @Override
- public String keyPrefix() {
- return SHORT_NAME;
- }
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SaveMode.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SaveMode.scala
index b4230d00f..9badc628b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SaveMode.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SaveMode.scala
@@ -17,7 +17,8 @@
*/
package org.apache.paimon.spark
-import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.{SaveMode => SparkSaveMode}
+import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
sealed trait SaveMode extends Serializable
@@ -26,3 +27,18 @@ object InsertInto extends SaveMode
case class Overwrite(filters: Option[Filter]) extends SaveMode
object DynamicOverWrite extends SaveMode
+
+object ErrorIfExists extends SaveMode
+
+object Ignore extends SaveMode
+
+object SaveMode {
+ def transform(saveMode: SparkSaveMode): SaveMode = {
+ saveMode match {
+ case SparkSaveMode.Overwrite => Overwrite(Some(AlwaysTrue))
+ case SparkSaveMode.Ignore => Ignore
+ case SparkSaveMode.Append => InsertInto
+ case SparkSaveMode.ErrorIfExists => ErrorIfExists
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
new file mode 100644
index 000000000..ae940c469
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.catalog.CatalogContext
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.commands.WriteIntoPaimonTable
+import org.apache.paimon.table.{FileStoreTable, FileStoreTableFactory}
+
+import org.apache.spark.sql.{DataFrame, SaveMode => SparkSaveMode,
SparkSession, SQLContext}
+import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, Table}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider,
DataSourceRegister}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+class SparkSource
+ extends DataSourceRegister
+ with SessionConfigSupport
+ with CreatableRelationProvider {
+
+ override def shortName(): String = SparkSource.NAME
+
+ override def keyPrefix(): String = SparkSource.NAME
+
+ override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+ // ignore schema.
+ // getTable will get schema by itself.
+ null
+ }
+
+ override def inferPartitioning(options: CaseInsensitiveStringMap):
Array[Transform] = {
+ // ignore partition.
+ // getTable will get partition by itself.
+ null
+ }
+
+ override def getTable(
+ schema: StructType,
+ partitioning: Array[Transform],
+ properties: util.Map[String, String]): Table = {
+ new SparkTable(loadTable(properties))
+ }
+
+ override def createRelation(
+ sqlContext: SQLContext,
+ mode: SparkSaveMode,
+ parameters: Map[String, String],
+ data: DataFrame): BaseRelation = {
+ val table = loadTable(parameters.asJava)
+ WriteIntoPaimonTable(table, SaveMode.transform(mode),
data).run(sqlContext.sparkSession)
+ SparkSource.toBaseRelation(table, sqlContext)
+ }
+
+ private def loadTable(options: util.Map[String, String]): FileStoreTable = {
+ val catalogContext = CatalogContext.create(
+ Options.fromMap(options),
+ SparkSession.active.sessionState.newHadoopConf())
+ FileStoreTableFactory.create(catalogContext)
+ }
+}
+
+object SparkSource {
+
+ val NAME = "paimon"
+
+ def toBaseRelation(table: FileStoreTable, _sqlContext: SQLContext):
BaseRelation = {
+ new BaseRelation {
+ override def sqlContext: SQLContext = _sqlContext
+ override def schema: StructType =
SparkTypeUtils.fromPaimonRowType(table.rowType())
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 0a482fdad..6ec4987d4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -20,8 +20,7 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.index.PartitionIndex
-import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite,
SaveMode}
-import org.apache.paimon.spark.SparkRow
+import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite,
SaveMode, SparkRow}
import org.apache.paimon.spark.SparkUtils.createIOManager
import org.apache.paimon.table.{BucketMode, FileStoreTable, Table}
import org.apache.paimon.table.sink.{BatchWriteBuilder,
CommitMessageSerializer, DynamicBucketRow, RowPartitionKeyExtractor}
@@ -159,6 +158,8 @@ case class WriteIntoPaimonTable(_table: FileStoreTable,
saveMode: SaveMode, data
case DynamicOverWrite =>
dynamicPartitionOverwriteMode = true
throw new UnsupportedOperationException("Dynamic Overwrite is
unsupported for now.")
+ case _ =>
+ throw new UnsupportedOperationException(s" This mode is unsupported
for now.")
}
(dynamicPartitionOverwriteMode, overwritePartition)
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
new file mode 100644
index 000000000..ae95e505d
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.WriteMode._
+
+import org.apache.spark.sql.Row
+import org.scalactic.source.Position
+
+class DataFrameWriteTest extends PaimonSparkTestBase {
+
+ writeModes.foreach {
+ writeMode =>
+ bucketModes.foreach {
+ bucket =>
+ test(s"Write data into Paimon directly: write-mode: $writeMode,
bucket: $bucket") {
+
+ val _spark = spark
+ import _spark.implicits._
+
+ val primaryKeysProp = if (writeMode == CHANGE_LOG) {
+ "'primary-key'='a',"
+ } else {
+ ""
+ }
+
+ spark.sql(
+ s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ($primaryKeysProp
'write-mode'='${writeMode.toString}', 'bucket'='$bucket')
+ |""".stripMargin)
+
+ val paimonTable = loadTable("T")
+ val location = paimonTable.location().getPath
+
+ val df1 = Seq((1, "a"), (2, "b")).toDF("a", "b")
+ df1.write.format("paimon").mode("append").save(location)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a, b"),
+ Row(1, "a") :: Row(2, "b") :: Nil)
+
+ val df2 = Seq((1, "a2"), (3, "c")).toDF("a", "b")
+ df2.write.format("paimon").mode("append").save(location)
+ val expected = if (writeMode == CHANGE_LOG) {
+ Row(1, "a2") :: Row(2, "b") :: Row(3, "c") :: Nil
+ } else {
+ Row(1, "a") :: Row(1, "a2") :: Row(2, "b") :: Row(3, "c") :: Nil
+ }
+ checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected)
+
+ val df3 = Seq((4, "d"), (5, "e")).toDF("a", "b")
+ df3.write.format("paimon").mode("overwrite").save(location)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a, b"),
+ Row(4, "d") :: Row(5, "e") :: Nil)
+ }
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
index 832a96d83..1289387b0 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
@@ -17,7 +17,7 @@
*/
package org.apache.paimon.spark.sql
-import org.apache.paimon.WriteMode.{APPEND_ONLY, CHANGE_LOG}
+import org.apache.paimon.WriteMode.CHANGE_LOG
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
@@ -25,10 +25,7 @@ import org.scalactic.source.Position
class InsertOverwriteTest extends PaimonSparkTestBase {
- // 3: fixed bucket, -1: dynamic bucket
- private val bucketModes = Seq(3, -1)
-
- Seq(APPEND_ONLY, CHANGE_LOG).foreach {
+ writeModes.foreach {
writeMode =>
bucketModes.foreach {
bucket =>
@@ -58,7 +55,7 @@ class InsertOverwriteTest extends PaimonSparkTestBase {
}
}
- Seq(APPEND_ONLY, CHANGE_LOG).foreach {
+ writeModes.foreach {
writeMode =>
bucketModes.foreach {
bucket =>
@@ -98,7 +95,7 @@ class InsertOverwriteTest extends PaimonSparkTestBase {
}
}
- Seq(APPEND_ONLY, CHANGE_LOG).foreach {
+ writeModes.foreach {
writeMode =>
bucketModes.foreach {
bucket =>
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSparkTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSparkTestBase.scala
index 797c24f69..d78148552 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSparkTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSparkTestBase.scala
@@ -17,7 +17,11 @@
*/
package org.apache.paimon.spark.sql
-import org.apache.paimon.spark.{PaimonSparkSessionExtension, SparkCatalog,
SparkGenericCatalog}
+import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory,
Identifier}
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.{PaimonSparkSessionExtension, SparkCatalog}
+import org.apache.paimon.spark.catalog.Catalogs
+import org.apache.paimon.table.AbstractFileStoreTable
import org.apache.spark.paimon.Utils
import org.apache.spark.sql.QueryTest
@@ -27,10 +31,14 @@ import org.scalatest.Tag
import java.io.File
-class PaimonSparkTestBase extends QueryTest with SharedSparkSession {
+class PaimonSparkTestBase extends QueryTest with SharedSparkSession with
WithTableOptions {
protected lazy val tempDBDir: File = Utils.createTempDir
+ protected lazy val catalog: Catalog = initCatalog()
+
+ protected val dbName0: String = "test"
+
protected val tableName0: String = "T"
override protected def sparkConf = {
@@ -42,14 +50,14 @@ class PaimonSparkTestBase extends QueryTest with
SharedSparkSession {
override protected def beforeAll(): Unit = {
super.beforeAll()
- spark.sql("CREATE DATABASE paimon.db")
- spark.sql("USE paimon.db")
+ spark.sql(s"CREATE DATABASE paimon.$dbName0")
+ spark.sql(s"USE paimon.$dbName0")
}
override protected def afterAll(): Unit = {
try {
spark.sql("USE default")
- spark.sql("DROP DATABASE paimon.db CASCADE")
+ spark.sql(s"DROP DATABASE paimon.$dbName0 CASCADE")
} finally {
super.afterAll()
}
@@ -65,4 +73,16 @@ class PaimonSparkTestBase extends QueryTest with
SharedSparkSession {
println(testName)
super.test(testName, testTags: _*)(testFun)(pos)
}
+
+ private def initCatalog(): Catalog = {
+ val currentCatalog =
spark.sessionState.catalogManager.currentCatalog.name()
+ val options = Catalogs.catalogOptions(currentCatalog,
spark.sessionState.conf)
+ val catalogContext =
+ CatalogContext.create(Options.fromMap(options),
spark.sessionState.newHadoopConf());
+ CatalogFactory.createCatalog(catalogContext);
+ }
+
+ def loadTable(tableName: String): AbstractFileStoreTable = {
+ catalog.getTable(Identifier.create(dbName0,
tableName)).asInstanceOf[AbstractFileStoreTable]
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
index 9df31ca37..5877b0cfc 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
@@ -17,7 +17,7 @@
*/
package org.apache.paimon.spark.sql
-import org.apache.paimon.WriteMode.{APPEND_ONLY, CHANGE_LOG}
+import org.apache.paimon.WriteMode.CHANGE_LOG
import org.apache.paimon.spark.PaimonSparkSessionExtension
import org.apache.spark.SparkConf
@@ -31,10 +31,7 @@ class TableValuedFunctionsTest extends PaimonSparkTestBase {
.set("spark.sql.extensions",
classOf[PaimonSparkSessionExtension].getName)
}
- // 3: fixed bucket, -1: dynamic bucket
- private val bucketModes = Seq(3, -1)
-
- Seq(APPEND_ONLY, CHANGE_LOG).foreach {
+ writeModes.foreach {
writeMode =>
bucketModes.foreach {
bucket =>
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SaveMode.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/WithTableOptions.scala
similarity index 72%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SaveMode.scala
copy to
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/WithTableOptions.scala
index b4230d00f..fe309db00 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SaveMode.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/WithTableOptions.scala
@@ -15,14 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.sql
-import org.apache.spark.sql.sources.Filter
+import org.apache.paimon.WriteMode
+import org.apache.paimon.WriteMode._
-sealed trait SaveMode extends Serializable
+trait WithTableOptions {
-object InsertInto extends SaveMode
+ // 3: fixed bucket, -1: dynamic bucket
+ protected val bucketModes: Seq[Int] = Seq(3, -1)
-case class Overwrite(filters: Option[Filter]) extends SaveMode
+ protected val writeModes: Seq[WriteMode] = Seq(CHANGE_LOG, APPEND_ONLY)
-object DynamicOverWrite extends SaveMode
+}