This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new adc4a3dd600 [SPARK-43144] Scala Client DataStreamReader table() API adc4a3dd600 is described below commit adc4a3dd600f83283f67f32f7bf9de21c2b51593 Author: Wei Liu <wei....@databricks.com> AuthorDate: Tue Apr 25 14:20:50 2023 +0800 [SPARK-43144] Scala Client DataStreamReader table() API ### What changes were proposed in this pull request? Add the table() API for scala client. ### Why are the changes needed? Continuation of SS Connect development. ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Unit test I also changed `ProtoToParsedPlanTestSuite` a little to remove the memory addresses, before the change the test for streaming table would fail with: ``` - streaming_table_API_with_options *** FAILED *** (8 milliseconds) [info] Expected and actual plans do not match: [info] [info] === Expected Plan === [info] SubqueryAlias primary.tempdb.myStreamingTable [info] +- StreamingRelationV2 primary.tempdb.myStreamingTable, org.apache.spark.sql.connector.catalog.InMemoryTable752725d9, [p1=v1, p2=v2], [id#0L], org.apache.spark.sql.connector.catalog.InMemoryCatalog347d8e2a, tempdb.myStreamingTable [info] [info] [info] === Actual Plan === [info] SubqueryAlias primary.tempdb.myStreamingTable [info] +- StreamingRelationV2 primary.tempdb.myStreamingTable, org.apache.spark.sql.connector.catalog.InMemoryTablea88a5db, [p1=v1, p2=v2], [id#0L], org.apache.spark.sql.connector.catalog.InMemoryCatalog2c6b362e, tempdb.myStreamingTable ``` Because the memory address (`InMemoryTable752725d9`) is different every time it runs. I removed these in the test suite. And verified that memory addresses doesn't exist in existing explain files: ``` wei.liu:~/oss-spark$ cat connector/connect/common/src/test/resources/query-tests/explain-results/* | grep wei.liu:~/oss-spark$ ``` Closes #40887 from WweiL/SPARK-43144-scala-table-api. Authored-by: Wei Liu <wei....@databricks.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../spark/sql/streaming/DataStreamReader.scala | 18 +++++++++++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 5 +++ .../spark/sql/streaming/StreamingQuerySuite.scala | 36 +++++++++++++++++++++ .../streaming_table_API_with_options.explain | 2 ++ .../queries/streaming_table_API_with_options.json | 14 ++++++++ .../streaming_table_API_with_options.proto.bin | Bin 0 -> 55 bytes .../sql/connect/ProtoToParsedPlanTestSuite.scala | 14 ++++++-- 7 files changed, 87 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index f6f41257417..4b0b99dd787 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -217,6 +217,24 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L */ def parquet(path: String): DataFrame = format("parquet").load(path) + /** + * Define a Streaming DataFrame on a Table. The DataSource corresponding to the table should + * support streaming mode. + * @param tableName + * The name of the table + * @since 3.5.0 + */ + def table(tableName: String): DataFrame = { + require(tableName != null, "The table name can't be null") + sparkSession.newDataFrame { builder => + builder.getReadBuilder + .setIsStreaming(true) + .getNamedTableBuilder + .setUnparsedIdentifier(tableName) + .putAllOptions(sourceBuilder.getOptionsMap) + } + } + /** * Loads text files and returns a `DataFrame` whose schema starts with a string column named * "value", and followed by partitioned columns if there are any. The text files must be encoded diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala index 3b1537ce755..36ef3a8dbf8 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala @@ -2170,6 +2170,11 @@ class PlanGenerationTestSuite session.read.options(Map("p1" -> "v1", "p2" -> "v2")).table("tempdb.myTable") } + /* Stream Reader API */ + test("streaming table API with options") { + session.readStream.options(Map("p1" -> "v1", "p2" -> "v2")).table("tempdb.myStreamingTable") + } + /* Avro functions */ test("from_avro with options") { binary.select( diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 3659c0a5157..9061dcadd63 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -78,4 +78,40 @@ class StreamingQuerySuite extends RemoteSparkSession with SQLHelper { } } } + + test("Streaming table API") { + withSQLConf( + "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers. + ) { + spark.sql("DROP TABLE IF EXISTS my_table") + + withTempPath { ckpt => + val q1 = spark.readStream + .format("rate") + .load() + .writeStream + .option("checkpointLocation", ckpt.getCanonicalPath) + .toTable("my_table") + + val q2 = spark.readStream + .table("my_table") + .writeStream + .format("memory") + .queryName("my_sink") + .start() + + try { + q1.processAllAvailable() + q2.processAllAvailable() + eventually(timeout(10.seconds)) { + assert(spark.table("my_sink").count() > 0) + } + } finally { + q1.stop() + q2.stop() + spark.sql("DROP TABLE my_table") + } + } + } + } } diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain new file mode 100644 index 00000000000..2a20daaefa8 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain @@ -0,0 +1,2 @@ +SubqueryAlias primary.tempdb.myStreamingTable ++- StreamingRelationV2 primary.tempdb.myStreamingTable, org.apache.spark.sql.connector.catalog.InMemoryTable, [p1=v1, p2=v2], [id#0L], org.apache.spark.sql.connector.catalog.InMemoryCatalog, tempdb.myStreamingTable diff --git a/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.json b/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.json new file mode 100644 index 00000000000..bcfb885aed8 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.json @@ -0,0 +1,14 @@ +{ + "common": { + "planId": "0" + }, + "read": { + "namedTable": { + "unparsedIdentifier": "tempdb.myStreamingTable", + "options": { + "p1": "v1", + "p2": "v2" + } + }, + "isStreaming": true + } \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.proto.bin new file mode 100644 index 00000000000..baa14c6b4a6 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/streaming_table_API_with_options.proto.bin differ diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala index 7071e5300d8..64cbfbb6952 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala @@ -122,6 +122,11 @@ class ProtoToParsedPlanTestSuite new StructType().add("id", "long"), Array.empty[Transform], emptyProps) + inMemoryCatalog.createTable( + Identifier.of(Array("tempdb"), "myStreamingTable"), + new StructType().add("id", "long"), + Array.empty[Transform], + emptyProps) val catalogManager = new CatalogManager( inMemoryCatalog, @@ -160,7 +165,8 @@ class ProtoToParsedPlanTestSuite val planner = new SparkConnectPlanner(spark) val catalystPlan = analyzer.executeAndCheck(planner.transformRelation(relation), new QueryPlanningTracker) - val actual = normalizeExprIds(ReplaceExpressions(catalystPlan)).treeString + val actual = + removeMemoryAddress(normalizeExprIds(ReplaceExpressions(catalystPlan)).treeString) val goldenFile = goldenFilePath.resolve(relativePath).getParent.resolve(name + ".explain") Try(readGoldenFile(goldenFile)) match { case Success(expected) if expected == actual => // Test passes. @@ -188,6 +194,10 @@ class ProtoToParsedPlanTestSuite } } + private def removeMemoryAddress(expr: String): String = { + expr.replaceAll("@[0-9a-f]+,", ",") + } + private def readRelation(path: Path): proto.Relation = { val input = Files.newInputStream(path) try proto.Relation.parseFrom(input) @@ -197,7 +207,7 @@ class ProtoToParsedPlanTestSuite } private def readGoldenFile(path: Path): String = { - new String(Files.readAllBytes(path), StandardCharsets.UTF_8) + removeMemoryAddress(new String(Files.readAllBytes(path), StandardCharsets.UTF_8)) } private def writeGoldenFile(path: Path, value: String): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org