This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9bb14d029 [spark] Supports Spark Streaming Sink (#1779)
9bb14d029 is described below
commit 9bb14d029893e8db43a7ec85642bef1be25aa3e4
Author: Yann Byron <[email protected]>
AuthorDate: Thu Aug 10 12:12:33 2023 +0800
[spark] Supports Spark Streaming Sink (#1779)
---
.../org/apache/paimon/spark/SparkSource.scala | 22 +-
.../apache/paimon/spark/sources/PaimonSink.scala | 50 ++++
.../main/scala/org/apache/spark/sql/Utils.scala | 43 ++++
.../org/apache/paimon/spark/PaimonSinkTest.scala | 281 +++++++++++++++++++++
.../spark/{sql => }/PaimonSparkTestBase.scala | 4 +-
.../paimon/spark/sql/DataFrameWriteTest.scala | 2 +-
.../paimon/spark/sql/InsertOverwriteTest.scala | 1 +
.../spark/sql/TableValuedFunctionsTest.scala | 2 +-
8 files changed, 399 insertions(+), 6 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index d73bb78df..d9675ca64 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -20,12 +20,15 @@ package org.apache.paimon.spark
import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.options.Options
import org.apache.paimon.spark.commands.WriteIntoPaimonTable
+import org.apache.paimon.spark.sources.PaimonSink
import org.apache.paimon.table.{FileStoreTable, FileStoreTableFactory}
import org.apache.spark.sql.{DataFrame, SaveMode => SparkSaveMode,
SparkSession, SQLContext}
import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, Table}
import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider,
DataSourceRegister}
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider,
DataSourceRegister, StreamSinkProvider}
+import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,7 +39,8 @@ import scala.collection.JavaConverters._
class SparkSource
extends DataSourceRegister
with SessionConfigSupport
- with CreatableRelationProvider {
+ with CreatableRelationProvider
+ with StreamSinkProvider {
override def shortName(): String = SparkSource.NAME
@@ -78,6 +82,20 @@ class SparkSource
SparkSession.active.sessionState.newHadoopConf())
FileStoreTableFactory.create(catalogContext)
}
+
+ override def createSink(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ partitionColumns: Seq[String],
+ outputMode: OutputMode): Sink = {
+ if (outputMode != OutputMode.Append && outputMode != OutputMode.Complete) {
+ throw new RuntimeException("Paimon supports only Complete and Append
output mode.")
+ }
+ val table = loadTable(parameters.asJava)
+ val options = Options.fromMap(parameters.asJava)
+ new PaimonSink(sqlContext, table, partitionColumns, outputMode, options)
+ }
+
}
object SparkSource {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
new file mode 100644
index 000000000..da33ad35c
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sources
+
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.{InsertInto, Overwrite}
+import org.apache.paimon.spark.commands.{PaimonCommand, SchemaHelper,
WriteIntoPaimonTable}
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.{DataFrame, SQLContext, Utils}
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.sources.AlwaysTrue
+import org.apache.spark.sql.streaming.OutputMode
+
+class PaimonSink(
+ sqlContext: SQLContext,
+ override val originTable: FileStoreTable,
+ partitionColumns: Seq[String],
+ outputMode: OutputMode,
+ options: Options)
+ extends Sink
+ with SchemaHelper
+ with PaimonCommand {
+
+ override def addBatch(batchId: Long, data: DataFrame): Unit = {
+ val saveMode = if (outputMode == OutputMode.Complete()) {
+ Overwrite(Some(AlwaysTrue))
+ } else {
+ InsertInto
+ }
+ partitionColumns.foreach(println)
+ val newData = Utils.createNewDataFrame(data)
+ WriteIntoPaimonTable(originTable, saveMode, newData,
options).run(sqlContext.sparkSession)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
new file mode 100644
index 000000000..e1bbdce7e
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+/**
+ * Some classes or methods defined in the spark project are marked as private
under
+ * [[org.apache.spark.sql]] package, Hence, use this class to adapt then so
that we can use them
+ * indirectly.
+ */
+object Utils {
+
+ /**
+ * In the streaming write case, An "Queries with streaming sources must be
executed with
+ * writeStream.start()" error will occur if we transform [[DataFrame]] first
and then use it.
+ *
+ * That's because the new [[DataFrame]] has a streaming source that is not
supported, see the
+ * detail: SPARK-14473. So we can create a new [[DataFrame]] using the
origin, planned
+ * [[org.apache.spark.sql.execution.SparkPlan]].
+ *
+ * By the way, the origin [[DataFrame]] has been planned by
+ * [[org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy]]
before call
+ * [[org.apache.spark.sql.execution.streaming.Sink.addBatch]].
+ */
+ def createNewDataFrame(data: DataFrame): DataFrame = {
+ data.sqlContext.internalCreateDataFrame(data.queryExecution.toRdd,
data.schema)
+ }
+
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
new file mode 100644
index 000000000..36df428a5
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions.{col, mean, window}
+import org.apache.spark.sql.streaming.StreamTest
+
+import java.sql.Date
+
+class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
+
+ import testImplicits._
+
+ test("Paimon Sink: forEachBatch") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // define a change-log table and test `forEachBatch` api
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().getPath
+
+ val inputData = MemoryStream[(Int, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], id: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+ try {
+ inputData.addData((1, "a"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Nil)
+
+ inputData.addData((2, "b"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+ inputData.addData((2, "b2"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
+
+ test("Paimon Sink: append mode") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // define a change-log table and sink into it in append mode
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().getPath
+
+ val inputData = MemoryStream[(Int, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .format("paimon")
+ .start(location)
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+ try {
+ inputData.addData((1, "a"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Nil)
+
+ inputData.addData((2, "b"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+ inputData.addData((2, "b2"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
+
+ test("Paimon Sink: complete mode") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // define an append-only table and sink into it in complete mode
+ spark.sql(s"""
+ |CREATE TABLE T (city String, population Long)
+ |TBLPROPERTIES ('write-mode'='append-only',
'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().getPath
+
+ val inputData = MemoryStream[(Int, String)]
+ val stream = inputData.toDS
+ .toDF("uid", "city")
+ .groupBy("city")
+ .count()
+ .toDF("city", "population")
+ .writeStream
+ .outputMode("complete")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .format("paimon")
+ .start(location)
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY city")
+
+ try {
+ inputData.addData((1, "HZ"), (2, "BJ"), (3, "BJ"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row("BJ", 2L) :: Row("HZ", 1L) :: Nil)
+
+ inputData.addData((4, "SH"), (5, "BJ"), (6, "HZ"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row("BJ", 3L) :: Row("HZ", 2L) :: Row("SH",
1L) :: Nil)
+
+ inputData.addData((7, "HZ"), (8, "SH"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row("BJ", 3L) :: Row("HZ", 3L) :: Row("SH",
2L) :: Nil)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
+
+ test("Paimon Sink: update mode") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // define a change-log table and sink into it in update mode
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().getPath
+
+ val inputData = MemoryStream[(Int, String)]
+ intercept[RuntimeException] {
+ inputData
+ .toDF()
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .outputMode("update")
+ .format("paimon")
+ .start(location)
+ }
+ }
+ }
+ }
+
+ test("Paimon Sink: aggregation and watermark") {
+ withTempDir {
+ checkpointDir =>
+ // define an append-only table and sink into it with aggregation and
watermark in append mode
+ spark.sql(s"""
+ |CREATE TABLE T (start Timestamp, stockId INT, avg_price
DOUBLE)
+ |TBLPROPERTIES ('write-mode'='append-only', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().getPath
+
+ val inputData = MemoryStream[(Long, Int, Double)]
+ val data = inputData.toDS
+ .toDF("time", "stockId", "price")
+ .selectExpr("CAST(time AS timestamp) AS timestamp", "stockId",
"price")
+ .withWatermark("timestamp", "10 seconds")
+ .groupBy(window($"timestamp", "5 seconds"), col("stockId"))
+ .agg(mean("price").as("avg_price"))
+ .select("window.start", "stockId", "avg_price")
+
+ val stream =
+ data.writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .format("paimon")
+ .start(location)
+
+ val query = () =>
+ spark.sql(
+ "SELECT CAST(start as BIGINT) AS start, stockId, avg_price FROM T
ORDER BY start, stockId")
+
+ try {
+ inputData.addData((101L, 1, 1.0d), (102, 1, 2.0d), (104, 2, 20.0d))
+ stream.processAllAvailable()
+ inputData.addData((105L, 2, 40.0d), (107, 2, 60.0d), (115, 3,
300.0d))
+ stream.processAllAvailable()
+ inputData.addData((200L, 99, 99.9d))
+ stream.processAllAvailable()
+ checkAnswer(
+ query(),
+ Row(100L, 1, 1.5d) :: Row(100L, 2, 20.0d) :: Row(105L, 2, 50.0d)
:: Row(
+ 115L,
+ 3,
+ 300.0d) :: Nil)
+ } finally {
+ if (stream != null) {
+ stream.stop()
+ }
+ }
+ }
+ }
+
+ test("Paimon Sink: enable schema evolution") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // define a change-log table and sink into it with schema evolution
in append mode
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().getPath
+
+ val date = Date.valueOf("2023-08-10")
+ spark.sql("INSERT INTO T VALUES (1, '2023-08-09'), (2,
'2023-08-09')")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a, b"),
+ Row(1, "2023-08-09") :: Row(2, "2023-08-09") :: Nil)
+
+ val inputData = MemoryStream[(Long, Date, Int)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b", "c")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .option("write.merge-schema", "true")
+ .option("write.merge-schema.explicit-cast", "true")
+ .format("paimon")
+ .start(location)
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+ try {
+ inputData.addData((1L, date, 123), (3L, date, 456))
+ stream.processAllAvailable()
+
+ checkAnswer(
+ query(),
+ Row(1L, date, 123) :: Row(2L, Date.valueOf("2023-08-09"), null)
:: Row(
+ 3L,
+ date,
+ 456) :: Nil)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSparkTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
similarity index 96%
rename from
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSparkTestBase.scala
rename to
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index d78148552..5152b506d 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSparkTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -15,12 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.paimon.spark.sql
+package org.apache.paimon.spark
import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory,
Identifier}
import org.apache.paimon.options.Options
-import org.apache.paimon.spark.{PaimonSparkSessionExtension, SparkCatalog}
import org.apache.paimon.spark.catalog.Catalogs
+import org.apache.paimon.spark.sql.WithTableOptions
import org.apache.paimon.table.AbstractFileStoreTable
import org.apache.spark.paimon.Utils
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
index 1bf21a8f3..a54e7d50e 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -18,9 +18,9 @@
package org.apache.paimon.spark.sql
import org.apache.paimon.WriteMode._
+import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.Row
-import org.scalactic.source.Position
import java.sql.Date
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
index f84fd3b27..8c638cbcd 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.sql
import org.apache.paimon.WriteMode.CHANGE_LOG
+import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
index 5877b0cfc..16d0d797a 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark.sql
import org.apache.paimon.WriteMode.CHANGE_LOG
-import org.apache.paimon.spark.PaimonSparkSessionExtension
+import org.apache.paimon.spark.{PaimonSparkSessionExtension,
PaimonSparkTestBase}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row}