This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bb14d029 [spark] Supports Spark Streaming Sink (#1779)
9bb14d029 is described below

commit 9bb14d029893e8db43a7ec85642bef1be25aa3e4
Author: Yann Byron <[email protected]>
AuthorDate: Thu Aug 10 12:12:33 2023 +0800

    [spark] Supports Spark Streaming Sink (#1779)
---
 .../org/apache/paimon/spark/SparkSource.scala      |  22 +-
 .../apache/paimon/spark/sources/PaimonSink.scala   |  50 ++++
 .../main/scala/org/apache/spark/sql/Utils.scala    |  43 ++++
 .../org/apache/paimon/spark/PaimonSinkTest.scala   | 281 +++++++++++++++++++++
 .../spark/{sql => }/PaimonSparkTestBase.scala      |   4 +-
 .../paimon/spark/sql/DataFrameWriteTest.scala      |   2 +-
 .../paimon/spark/sql/InsertOverwriteTest.scala     |   1 +
 .../spark/sql/TableValuedFunctionsTest.scala       |   2 +-
 8 files changed, 399 insertions(+), 6 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index d73bb78df..d9675ca64 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -20,12 +20,15 @@ package org.apache.paimon.spark
 import org.apache.paimon.catalog.CatalogContext
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.commands.WriteIntoPaimonTable
+import org.apache.paimon.spark.sources.PaimonSink
 import org.apache.paimon.table.{FileStoreTable, FileStoreTableFactory}
 
 import org.apache.spark.sql.{DataFrame, SaveMode => SparkSaveMode, 
SparkSession, SQLContext}
 import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, Table}
 import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, 
DataSourceRegister}
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, 
DataSourceRegister, StreamSinkProvider}
+import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -36,7 +39,8 @@ import scala.collection.JavaConverters._
 class SparkSource
   extends DataSourceRegister
   with SessionConfigSupport
-  with CreatableRelationProvider {
+  with CreatableRelationProvider
+  with StreamSinkProvider {
 
   override def shortName(): String = SparkSource.NAME
 
@@ -78,6 +82,20 @@ class SparkSource
       SparkSession.active.sessionState.newHadoopConf())
     FileStoreTableFactory.create(catalogContext)
   }
+
+  override def createSink(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      partitionColumns: Seq[String],
+      outputMode: OutputMode): Sink = {
+    if (outputMode != OutputMode.Append && outputMode != OutputMode.Complete) {
+      throw new RuntimeException("Paimon supports only Complete and Append 
output mode.")
+    }
+    val table = loadTable(parameters.asJava)
+    val options = Options.fromMap(parameters.asJava)
+    new PaimonSink(sqlContext, table, partitionColumns, outputMode, options)
+  }
+
 }
 
 object SparkSource {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
new file mode 100644
index 000000000..da33ad35c
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sources
+
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.{InsertInto, Overwrite}
+import org.apache.paimon.spark.commands.{PaimonCommand, SchemaHelper, 
WriteIntoPaimonTable}
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.{DataFrame, SQLContext, Utils}
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.sources.AlwaysTrue
+import org.apache.spark.sql.streaming.OutputMode
+
+class PaimonSink(
+    sqlContext: SQLContext,
+    override val originTable: FileStoreTable,
+    partitionColumns: Seq[String],
+    outputMode: OutputMode,
+    options: Options)
+  extends Sink
+  with SchemaHelper
+  with PaimonCommand {
+
+  override def addBatch(batchId: Long, data: DataFrame): Unit = {
+    val saveMode = if (outputMode == OutputMode.Complete()) {
+      Overwrite(Some(AlwaysTrue))
+    } else {
+      InsertInto
+    }
+    partitionColumns.foreach(println)
+    val newData = Utils.createNewDataFrame(data)
+    WriteIntoPaimonTable(originTable, saveMode, newData, 
options).run(sqlContext.sparkSession)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
new file mode 100644
index 000000000..e1bbdce7e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+/**
+ * Some classes or methods defined in the spark project are marked as private 
under
+ * [[org.apache.spark.sql]] package, Hence, use this class to adapt then so 
that we can use them
+ * indirectly.
+ */
+object Utils {
+
+  /**
+   * In the streaming write case, An "Queries with streaming sources must be 
executed with
+   * writeStream.start()" error will occur if we transform [[DataFrame]] first 
and then use it.
+   *
+   * That's because the new [[DataFrame]] has a streaming source that is not 
supported, see the
+   * detail: SPARK-14473. So we can create a new [[DataFrame]] using the 
origin, planned
+   * [[org.apache.spark.sql.execution.SparkPlan]].
+   *
+   * By the way, the origin [[DataFrame]] has been planned by
+   * [[org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy]] 
before call
+   * [[org.apache.spark.sql.execution.streaming.Sink.addBatch]].
+   */
+  def createNewDataFrame(data: DataFrame): DataFrame = {
+    data.sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, 
data.schema)
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
new file mode 100644
index 000000000..36df428a5
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions.{col, mean, window}
+import org.apache.spark.sql.streaming.StreamTest
+
+import java.sql.Date
+
+class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
+
+  import testImplicits._
+
+  test("Paimon Sink: forEachBatch") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // define a change-log table and test `forEachBatch` api
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().getPath
+
+          val inputData = MemoryStream[(Int, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], id: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+          try {
+            inputData.addData((1, "a"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Nil)
+
+            inputData.addData((2, "b"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+            inputData.addData((2, "b2"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+
+  test("Paimon Sink: append mode") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // define a change-log table and sink into it in append mode
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().getPath
+
+          val inputData = MemoryStream[(Int, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .format("paimon")
+            .start(location)
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+          try {
+            inputData.addData((1, "a"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Nil)
+
+            inputData.addData((2, "b"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+            inputData.addData((2, "b2"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+
+  test("Paimon Sink: complete mode") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // define an append-only table and sink into it in complete mode
+          spark.sql(s"""
+                       |CREATE TABLE T (city String, population Long)
+                       |TBLPROPERTIES ('write-mode'='append-only', 
'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().getPath
+
+          val inputData = MemoryStream[(Int, String)]
+          val stream = inputData.toDS
+            .toDF("uid", "city")
+            .groupBy("city")
+            .count()
+            .toDF("city", "population")
+            .writeStream
+            .outputMode("complete")
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .format("paimon")
+            .start(location)
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY city")
+
+          try {
+            inputData.addData((1, "HZ"), (2, "BJ"), (3, "BJ"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row("BJ", 2L) :: Row("HZ", 1L) :: Nil)
+
+            inputData.addData((4, "SH"), (5, "BJ"), (6, "HZ"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row("BJ", 3L) :: Row("HZ", 2L) :: Row("SH", 
1L) :: Nil)
+
+            inputData.addData((7, "HZ"), (8, "SH"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row("BJ", 3L) :: Row("HZ", 3L) :: Row("SH", 
2L) :: Nil)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+
+  test("Paimon Sink: update mode") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // define a change-log table and sink into it in update mode
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().getPath
+
+          val inputData = MemoryStream[(Int, String)]
+          intercept[RuntimeException] {
+            inputData
+              .toDF()
+              .writeStream
+              .option("checkpointLocation", checkpointDir.getCanonicalPath)
+              .outputMode("update")
+              .format("paimon")
+              .start(location)
+          }
+      }
+    }
+  }
+
+  test("Paimon Sink: aggregation and watermark") {
+    withTempDir {
+      checkpointDir =>
+        // define an append-only table and sink into it with aggregation and 
watermark in append mode
+        spark.sql(s"""
+                     |CREATE TABLE T (start Timestamp, stockId INT, avg_price 
DOUBLE)
+                     |TBLPROPERTIES ('write-mode'='append-only', 'bucket'='3')
+                     |""".stripMargin)
+        val location = loadTable("T").location().getPath
+
+        val inputData = MemoryStream[(Long, Int, Double)]
+        val data = inputData.toDS
+          .toDF("time", "stockId", "price")
+          .selectExpr("CAST(time AS timestamp) AS timestamp", "stockId", 
"price")
+          .withWatermark("timestamp", "10 seconds")
+          .groupBy(window($"timestamp", "5 seconds"), col("stockId"))
+          .agg(mean("price").as("avg_price"))
+          .select("window.start", "stockId", "avg_price")
+
+        val stream =
+          data.writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .format("paimon")
+            .start(location)
+
+        val query = () =>
+          spark.sql(
+            "SELECT CAST(start as BIGINT) AS start, stockId, avg_price FROM T 
ORDER BY start, stockId")
+
+        try {
+          inputData.addData((101L, 1, 1.0d), (102, 1, 2.0d), (104, 2, 20.0d))
+          stream.processAllAvailable()
+          inputData.addData((105L, 2, 40.0d), (107, 2, 60.0d), (115, 3, 
300.0d))
+          stream.processAllAvailable()
+          inputData.addData((200L, 99, 99.9d))
+          stream.processAllAvailable()
+          checkAnswer(
+            query(),
+            Row(100L, 1, 1.5d) :: Row(100L, 2, 20.0d) :: Row(105L, 2, 50.0d) 
:: Row(
+              115L,
+              3,
+              300.0d) :: Nil)
+        } finally {
+          if (stream != null) {
+            stream.stop()
+          }
+        }
+    }
+  }
+
+  test("Paimon Sink: enable schema evolution") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // define a change-log table and sink into it with schema evolution 
in append mode
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().getPath
+
+          val date = Date.valueOf("2023-08-10")
+          spark.sql("INSERT INTO T VALUES (1, '2023-08-09'), (2, 
'2023-08-09')")
+          checkAnswer(
+            spark.sql("SELECT * FROM T ORDER BY a, b"),
+            Row(1, "2023-08-09") :: Row(2, "2023-08-09") :: Nil)
+
+          val inputData = MemoryStream[(Long, Date, Int)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b", "c")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .option("write.merge-schema", "true")
+            .option("write.merge-schema.explicit-cast", "true")
+            .format("paimon")
+            .start(location)
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+          try {
+            inputData.addData((1L, date, 123), (3L, date, 456))
+            stream.processAllAvailable()
+
+            checkAnswer(
+              query(),
+              Row(1L, date, 123) :: Row(2L, Date.valueOf("2023-08-09"), null) 
:: Row(
+                3L,
+                date,
+                456) :: Nil)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSparkTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
similarity index 96%
rename from 
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSparkTestBase.scala
rename to 
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index d78148552..5152b506d 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSparkTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.paimon.spark.sql
+package org.apache.paimon.spark
 
 import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory, 
Identifier}
 import org.apache.paimon.options.Options
-import org.apache.paimon.spark.{PaimonSparkSessionExtension, SparkCatalog}
 import org.apache.paimon.spark.catalog.Catalogs
+import org.apache.paimon.spark.sql.WithTableOptions
 import org.apache.paimon.table.AbstractFileStoreTable
 
 import org.apache.spark.paimon.Utils
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
index 1bf21a8f3..a54e7d50e 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -18,9 +18,9 @@
 package org.apache.paimon.spark.sql
 
 import org.apache.paimon.WriteMode._
+import org.apache.paimon.spark.PaimonSparkTestBase
 
 import org.apache.spark.sql.Row
-import org.scalactic.source.Position
 
 import java.sql.Date
 
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
index f84fd3b27..8c638cbcd 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTest.scala
@@ -18,6 +18,7 @@
 package org.apache.paimon.spark.sql
 
 import org.apache.paimon.WriteMode.CHANGE_LOG
+import org.apache.paimon.spark.PaimonSparkTestBase
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types._
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
index 5877b0cfc..16d0d797a 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
@@ -18,7 +18,7 @@
 package org.apache.paimon.spark.sql
 
 import org.apache.paimon.WriteMode.CHANGE_LOG
-import org.apache.paimon.spark.PaimonSparkSessionExtension
+import org.apache.paimon.spark.{PaimonSparkSessionExtension, 
PaimonSparkTestBase}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{DataFrame, Row}

Reply via email to