This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new adc4a3dd600 [SPARK-43144] Scala Client DataStreamReader table() API
adc4a3dd600 is described below

commit adc4a3dd600f83283f67f32f7bf9de21c2b51593
Author: Wei Liu <wei....@databricks.com>
AuthorDate: Tue Apr 25 14:20:50 2023 +0800

    [SPARK-43144] Scala Client DataStreamReader table() API
    
    ### What changes were proposed in this pull request?
    
    Add the table() API for scala client.
    
    ### Why are the changes needed?
    
    Continuation of SS Connect development.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes
    
    ### How was this patch tested?
    
    Unit test
    
    I also changed `ProtoToParsedPlanTestSuite` a little to remove the memory 
addresses, before the change the test for streaming table would fail with:
    ```
    - streaming_table_API_with_options *** FAILED *** (8 milliseconds)
    [info]   Expected and actual plans do not match:
    [info]
    [info]   === Expected Plan ===
    [info]   SubqueryAlias primary.tempdb.myStreamingTable
    [info]   +- StreamingRelationV2 primary.tempdb.myStreamingTable, 
org.apache.spark.sql.connector.catalog.InMemoryTable752725d9, [p1=v1, p2=v2], 
[id#0L], org.apache.spark.sql.connector.catalog.InMemoryCatalog347d8e2a, 
tempdb.myStreamingTable
    [info]
    [info]
    [info]   === Actual Plan ===
    [info]   SubqueryAlias primary.tempdb.myStreamingTable
    [info]   +- StreamingRelationV2 primary.tempdb.myStreamingTable, 
org.apache.spark.sql.connector.catalog.InMemoryTablea88a5db, [p1=v1, p2=v2], 
[id#0L], org.apache.spark.sql.connector.catalog.InMemoryCatalog2c6b362e, 
tempdb.myStreamingTable
    ```
    Because the memory address (`InMemoryTable752725d9`) is different every 
time it runs. I removed these in the test suite.
    
    And verified that memory addresses doesn't exist in existing explain files:
    ```
    wei.liu:~/oss-spark$ cat 
connector/connect/common/src/test/resources/query-tests/explain-results/* | grep
    wei.liu:~/oss-spark$
    ```
    
    Closes #40887 from WweiL/SPARK-43144-scala-table-api.
    
    Authored-by: Wei Liu <wei....@databricks.com>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 .../spark/sql/streaming/DataStreamReader.scala     |  18 +++++++++++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |   5 +++
 .../spark/sql/streaming/StreamingQuerySuite.scala  |  36 +++++++++++++++++++++
 .../streaming_table_API_with_options.explain       |   2 ++
 .../queries/streaming_table_API_with_options.json  |  14 ++++++++
 .../streaming_table_API_with_options.proto.bin     | Bin 0 -> 55 bytes
 .../sql/connect/ProtoToParsedPlanTestSuite.scala   |  14 ++++++--
 7 files changed, 87 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index f6f41257417..4b0b99dd787 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -217,6 +217,24 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
    */
   def parquet(path: String): DataFrame = format("parquet").load(path)
 
+  /**
+   * Define a Streaming DataFrame on a Table. The DataSource corresponding to 
the table should
+   * support streaming mode.
+   * @param tableName
+   *   The name of the table
+   * @since 3.5.0
+   */
+  def table(tableName: String): DataFrame = {
+    require(tableName != null, "The table name can't be null")
+    sparkSession.newDataFrame { builder =>
+      builder.getReadBuilder
+        .setIsStreaming(true)
+        .getNamedTableBuilder
+        .setUnparsedIdentifier(tableName)
+        .putAllOptions(sourceBuilder.getOptionsMap)
+    }
+  }
+
   /**
    * Loads text files and returns a `DataFrame` whose schema starts with a 
string column named
    * "value", and followed by partitioned columns if there are any. The text 
files must be encoded
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 3b1537ce755..36ef3a8dbf8 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -2170,6 +2170,11 @@ class PlanGenerationTestSuite
     session.read.options(Map("p1" -> "v1", "p2" -> 
"v2")).table("tempdb.myTable")
   }
 
+  /* Stream Reader API  */
+  test("streaming table API with options") {
+    session.readStream.options(Map("p1" -> "v1", "p2" -> 
"v2")).table("tempdb.myStreamingTable")
+  }
+
   /* Avro functions */
   test("from_avro with options") {
     binary.select(
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 3659c0a5157..9061dcadd63 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -78,4 +78,40 @@ class StreamingQuerySuite extends RemoteSparkSession with 
SQLHelper {
       }
     }
   }
+
+  test("Streaming table API") {
+    withSQLConf(
+      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+    ) {
+      spark.sql("DROP TABLE IF EXISTS my_table")
+
+      withTempPath { ckpt =>
+        val q1 = spark.readStream
+          .format("rate")
+          .load()
+          .writeStream
+          .option("checkpointLocation", ckpt.getCanonicalPath)
+          .toTable("my_table")
+
+        val q2 = spark.readStream
+          .table("my_table")
+          .writeStream
+          .format("memory")
+          .queryName("my_sink")
+          .start()
+
+        try {
+          q1.processAllAvailable()
+          q2.processAllAvailable()
+          eventually(timeout(10.seconds)) {
+            assert(spark.table("my_sink").count() > 0)
+          }
+        } finally {
+          q1.stop()
+          q2.stop()
+          spark.sql("DROP TABLE my_table")
+        }
+      }
+    }
+  }
 }
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain
new file mode 100644
index 00000000000..2a20daaefa8
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain
@@ -0,0 +1,2 @@
+SubqueryAlias primary.tempdb.myStreamingTable
++- StreamingRelationV2 primary.tempdb.myStreamingTable, 
org.apache.spark.sql.connector.catalog.InMemoryTable, [p1=v1, p2=v2], [id#0L], 
org.apache.spark.sql.connector.catalog.InMemoryCatalog, tempdb.myStreamingTable
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.json
new file mode 100644
index 00000000000..bcfb885aed8
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.json
@@ -0,0 +1,14 @@
+{
+  "common": {
+    "planId": "0"
+  },
+  "read": {
+    "namedTable": {
+      "unparsedIdentifier": "tempdb.myStreamingTable",
+      "options": {
+        "p1": "v1",
+        "p2": "v2"
+      }
+    },
+    "isStreaming": true
+  }
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.proto.bin
new file mode 100644
index 00000000000..baa14c6b4a6
Binary files /dev/null and 
b/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.proto.bin
 differ
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
index 7071e5300d8..64cbfbb6952 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
@@ -122,6 +122,11 @@ class ProtoToParsedPlanTestSuite
       new StructType().add("id", "long"),
       Array.empty[Transform],
       emptyProps)
+    inMemoryCatalog.createTable(
+      Identifier.of(Array("tempdb"), "myStreamingTable"),
+      new StructType().add("id", "long"),
+      Array.empty[Transform],
+      emptyProps)
 
     val catalogManager = new CatalogManager(
       inMemoryCatalog,
@@ -160,7 +165,8 @@ class ProtoToParsedPlanTestSuite
       val planner = new SparkConnectPlanner(spark)
       val catalystPlan =
         analyzer.executeAndCheck(planner.transformRelation(relation), new 
QueryPlanningTracker)
-      val actual = 
normalizeExprIds(ReplaceExpressions(catalystPlan)).treeString
+      val actual =
+        
removeMemoryAddress(normalizeExprIds(ReplaceExpressions(catalystPlan)).treeString)
       val goldenFile = 
goldenFilePath.resolve(relativePath).getParent.resolve(name + ".explain")
       Try(readGoldenFile(goldenFile)) match {
         case Success(expected) if expected == actual => // Test passes.
@@ -188,6 +194,10 @@ class ProtoToParsedPlanTestSuite
     }
   }
 
+  private def removeMemoryAddress(expr: String): String = {
+    expr.replaceAll("@[0-9a-f]+,", ",")
+  }
+
   private def readRelation(path: Path): proto.Relation = {
     val input = Files.newInputStream(path)
     try proto.Relation.parseFrom(input)
@@ -197,7 +207,7 @@ class ProtoToParsedPlanTestSuite
   }
 
   private def readGoldenFile(path: Path): String = {
-    new String(Files.readAllBytes(path), StandardCharsets.UTF_8)
+    removeMemoryAddress(new String(Files.readAllBytes(path), 
StandardCharsets.UTF_8))
   }
 
   private def writeGoldenFile(path: Path, value: String): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to