This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new adc4a3dd600 [SPARK-43144] Scala Client DataStreamReader table() API
adc4a3dd600 is described below
commit adc4a3dd600f83283f67f32f7bf9de21c2b51593
Author: Wei Liu <[email protected]>
AuthorDate: Tue Apr 25 14:20:50 2023 +0800
[SPARK-43144] Scala Client DataStreamReader table() API
### What changes were proposed in this pull request?
Add the table() API for scala client.
### Why are the changes needed?
Continuation of SS Connect development.
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Unit test
I also changed `ProtoToParsedPlanTestSuite` a little to remove the memory
addresses, before the change the test for streaming table would fail with:
```
- streaming_table_API_with_options *** FAILED *** (8 milliseconds)
[info] Expected and actual plans do not match:
[info]
[info] === Expected Plan ===
[info] SubqueryAlias primary.tempdb.myStreamingTable
[info] +- StreamingRelationV2 primary.tempdb.myStreamingTable,
org.apache.spark.sql.connector.catalog.InMemoryTable752725d9, [p1=v1, p2=v2],
[id#0L], org.apache.spark.sql.connector.catalog.InMemoryCatalog347d8e2a,
tempdb.myStreamingTable
[info]
[info]
[info] === Actual Plan ===
[info] SubqueryAlias primary.tempdb.myStreamingTable
[info] +- StreamingRelationV2 primary.tempdb.myStreamingTable,
org.apache.spark.sql.connector.catalog.InMemoryTablea88a5db, [p1=v1, p2=v2],
[id#0L], org.apache.spark.sql.connector.catalog.InMemoryCatalog2c6b362e,
tempdb.myStreamingTable
```
Because the memory address (`InMemoryTable752725d9`) is different every
time it runs. I removed these in the test suite.
And verified that memory addresses doesn't exist in existing explain files:
```
wei.liu:~/oss-spark$ cat
connector/connect/common/src/test/resources/query-tests/explain-results/* | grep
wei.liu:~/oss-spark$
```
Closes #40887 from WweiL/SPARK-43144-scala-table-api.
Authored-by: Wei Liu <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../spark/sql/streaming/DataStreamReader.scala | 18 +++++++++++
.../apache/spark/sql/PlanGenerationTestSuite.scala | 5 +++
.../spark/sql/streaming/StreamingQuerySuite.scala | 36 +++++++++++++++++++++
.../streaming_table_API_with_options.explain | 2 ++
.../queries/streaming_table_API_with_options.json | 14 ++++++++
.../streaming_table_API_with_options.proto.bin | Bin 0 -> 55 bytes
.../sql/connect/ProtoToParsedPlanTestSuite.scala | 14 ++++++--
7 files changed, 87 insertions(+), 2 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index f6f41257417..4b0b99dd787 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -217,6 +217,24 @@ final class DataStreamReader private[sql] (sparkSession:
SparkSession) extends L
*/
def parquet(path: String): DataFrame = format("parquet").load(path)
+ /**
+ * Define a Streaming DataFrame on a Table. The DataSource corresponding to
the table should
+ * support streaming mode.
+ * @param tableName
+ * The name of the table
+ * @since 3.5.0
+ */
+ def table(tableName: String): DataFrame = {
+ require(tableName != null, "The table name can't be null")
+ sparkSession.newDataFrame { builder =>
+ builder.getReadBuilder
+ .setIsStreaming(true)
+ .getNamedTableBuilder
+ .setUnparsedIdentifier(tableName)
+ .putAllOptions(sourceBuilder.getOptionsMap)
+ }
+ }
+
/**
* Loads text files and returns a `DataFrame` whose schema starts with a
string column named
* "value", and followed by partitioned columns if there are any. The text
files must be encoded
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 3b1537ce755..36ef3a8dbf8 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -2170,6 +2170,11 @@ class PlanGenerationTestSuite
session.read.options(Map("p1" -> "v1", "p2" ->
"v2")).table("tempdb.myTable")
}
+ /* Stream Reader API */
+ test("streaming table API with options") {
+ session.readStream.options(Map("p1" -> "v1", "p2" ->
"v2")).table("tempdb.myStreamingTable")
+ }
+
/* Avro functions */
test("from_avro with options") {
binary.select(
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 3659c0a5157..9061dcadd63 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -78,4 +78,40 @@ class StreamingQuerySuite extends RemoteSparkSession with
SQLHelper {
}
}
}
+
+ test("Streaming table API") {
+ withSQLConf(
+ "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+ ) {
+ spark.sql("DROP TABLE IF EXISTS my_table")
+
+ withTempPath { ckpt =>
+ val q1 = spark.readStream
+ .format("rate")
+ .load()
+ .writeStream
+ .option("checkpointLocation", ckpt.getCanonicalPath)
+ .toTable("my_table")
+
+ val q2 = spark.readStream
+ .table("my_table")
+ .writeStream
+ .format("memory")
+ .queryName("my_sink")
+ .start()
+
+ try {
+ q1.processAllAvailable()
+ q2.processAllAvailable()
+ eventually(timeout(10.seconds)) {
+ assert(spark.table("my_sink").count() > 0)
+ }
+ } finally {
+ q1.stop()
+ q2.stop()
+ spark.sql("DROP TABLE my_table")
+ }
+ }
+ }
+ }
}
diff --git
a/connector/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain
b/connector/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain
new file mode 100644
index 00000000000..2a20daaefa8
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain
@@ -0,0 +1,2 @@
+SubqueryAlias primary.tempdb.myStreamingTable
++- StreamingRelationV2 primary.tempdb.myStreamingTable,
org.apache.spark.sql.connector.catalog.InMemoryTable, [p1=v1, p2=v2], [id#0L],
org.apache.spark.sql.connector.catalog.InMemoryCatalog, tempdb.myStreamingTable
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.json
b/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.json
new file mode 100644
index 00000000000..bcfb885aed8
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.json
@@ -0,0 +1,14 @@
+{
+ "common": {
+ "planId": "0"
+ },
+ "read": {
+ "namedTable": {
+ "unparsedIdentifier": "tempdb.myStreamingTable",
+ "options": {
+ "p1": "v1",
+ "p2": "v2"
+ }
+ },
+ "isStreaming": true
+ }
\ No newline at end of file
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.proto.bin
b/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.proto.bin
new file mode 100644
index 00000000000..baa14c6b4a6
Binary files /dev/null and
b/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.proto.bin
differ
diff --git
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
index 7071e5300d8..64cbfbb6952 100644
---
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
+++
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
@@ -122,6 +122,11 @@ class ProtoToParsedPlanTestSuite
new StructType().add("id", "long"),
Array.empty[Transform],
emptyProps)
+ inMemoryCatalog.createTable(
+ Identifier.of(Array("tempdb"), "myStreamingTable"),
+ new StructType().add("id", "long"),
+ Array.empty[Transform],
+ emptyProps)
val catalogManager = new CatalogManager(
inMemoryCatalog,
@@ -160,7 +165,8 @@ class ProtoToParsedPlanTestSuite
val planner = new SparkConnectPlanner(spark)
val catalystPlan =
analyzer.executeAndCheck(planner.transformRelation(relation), new
QueryPlanningTracker)
- val actual =
normalizeExprIds(ReplaceExpressions(catalystPlan)).treeString
+ val actual =
+
removeMemoryAddress(normalizeExprIds(ReplaceExpressions(catalystPlan)).treeString)
val goldenFile =
goldenFilePath.resolve(relativePath).getParent.resolve(name + ".explain")
Try(readGoldenFile(goldenFile)) match {
case Success(expected) if expected == actual => // Test passes.
@@ -188,6 +194,10 @@ class ProtoToParsedPlanTestSuite
}
}
+ private def removeMemoryAddress(expr: String): String = {
+ expr.replaceAll("@[0-9a-f]+,", ",")
+ }
+
private def readRelation(path: Path): proto.Relation = {
val input = Files.newInputStream(path)
try proto.Relation.parseFrom(input)
@@ -197,7 +207,7 @@ class ProtoToParsedPlanTestSuite
}
private def readGoldenFile(path: Path): String = {
- new String(Files.readAllBytes(path), StandardCharsets.UTF_8)
+ removeMemoryAddress(new String(Files.readAllBytes(path),
StandardCharsets.UTF_8))
}
private def writeGoldenFile(path: Path, value: String): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]