This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c09039ad2a95 [SPARK-48597][SQL] Introduce a marker for isStreaming
property in text representation of logical plan
c09039ad2a95 is described below
commit c09039ad2a9514bb37148c0c6e8ecfe7afabfa1d
Author: Jungtaek Lim <[email protected]>
AuthorDate: Mon Jun 17 08:18:34 2024 +0800
[SPARK-48597][SQL] Introduce a marker for isStreaming property in text
representation of logical plan
### What changes were proposed in this pull request?
This PR proposes to introduce a marker for isStreaming property in text
representation of logical plan.
The marker will be `~`, along with `!` (invalid) and `'` (unresolved).
This PR proposes to retain the prefix marker as single character (opposed
to up to two characters). This would be OK in practice, since the moment the
marker for isStreaming would be useful is to look into the plan which is
already analyzed - that said, it’s unlikely that we need to see the both one of
existing marker and the marker for streaming.
### Why are the changes needed?
This would help tracking down QO issues happening with streaming query much
easier. For example, here is the example of the rule which triggered
[SPARK-47305](https://issues.apache.org/jira/browse/SPARK-47305):
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PruneFilters ===
WriteToMicroBatchDataSource MemorySink,
49358516-bb57-4f10-badf-00c9f5e2484b, Append, 1 WriteToMicroBatchDataSource
MemorySink, 49358516-bb57-4f10-badf-00c9f5e2484b, Append, 1
+- Project [value#45]
+- Project [value#45]
+- Join Inner
+- Join Inner
:- Project [value#45]
:- Project [value#45]
: +- StreamingDataSourceV2ScanRelation[value#45]
MemoryStreamDataSource : +-
StreamingDataSourceV2ScanRelation[value#45] MemoryStreamDataSource
+- Project
+- Project
! +- Filter false
+- LocalRelation <empty>, [id#54L]
! +- Range (1, 5, step=1, splits=Some(2))
```
The bug of SPARK-47305 was, LocalRelation in above was "incorrectly" marked
as `streaming=true` where it should be `streaming=false`. There is no notion of
isStreaming flag in the text representation of LocalRelation, hence from the
text plan we would never know the rule had a bug. Even though we assume we show
the value of isStreaming in LocalRelation, the depth of subtree could be huge
in practice and it's not friendly to go down to the leaf node to figure out the
isStreaming value [...]
After this PR, the above rule information will be changed as below:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PruneFilters ===
~WriteToMicroBatchDataSource MemorySink,
dcf936d0-8580-47e9-b898-4fffffc21db5, Append, 1 ~WriteToMicroBatchDataSource
MemorySink, dcf936d0-8580-47e9-b898-4fffffc21db5, Append, 1
+- ~Project [value#45]
+- ~Project [value#45]
+- ~Join Inner
+- ~Join Inner
:- ~Project [value#45]
:- ~Project [value#45]
: +- StreamingDataSourceV2ScanRelation[value#45]
MemoryStreamDataSource : +-
StreamingDataSourceV2ScanRelation[value#45] MemoryStreamDataSource
! +- Project
+- ~Project
! +- Filter false
+- ~LocalRelation <empty>, [id#54L]
! +- Range (1, 5, step=1, splits=Some(2))
```
Now it's obvious that isStreaming flag of leaf node had changed. Also, to
check the isStreaming flag of children for Join, we just need to look at the
first node of subtree for children, instead of going down to leaf nodes.
### Does this PR introduce _any_ user-facing change?
Yes, since the textual representation of logical plan will be changed a
bit. But it's only applied to the streaming Dataset, and also the textual
representation of logical plan is arguably not a public API. (Keeping backward
compatibility of the text is technically very hard.)
### How was this patch tested?
Existing UTs for regression test on batch and streaming query. For
streaming query, this PR updated the golden file to match with the change.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #46953 from HeartSaVioR/SPARK-48597.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../streaming_table_API_with_options.explain | 4 ++--
.../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 15 ++++++++++++++-
2 files changed, 16 insertions(+), 3 deletions(-)
diff --git
a/connector/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain
b/connector/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain
index 2a20daaefa8c..2cc166efa99e 100644
---
a/connector/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain
+++
b/connector/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain
@@ -1,2 +1,2 @@
-SubqueryAlias primary.tempdb.myStreamingTable
-+- StreamingRelationV2 primary.tempdb.myStreamingTable,
org.apache.spark.sql.connector.catalog.InMemoryTable, [p1=v1, p2=v2], [id#0L],
org.apache.spark.sql.connector.catalog.InMemoryCatalog, tempdb.myStreamingTable
+~SubqueryAlias primary.tempdb.myStreamingTable
++- ~StreamingRelationV2 primary.tempdb.myStreamingTable,
org.apache.spark.sql.connector.catalog.InMemoryTable, [p1=v1, p2=v2], [id#0L],
org.apache.spark.sql.connector.catalog.InMemoryCatalog, tempdb.myStreamingTable
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index a2ede8ac735c..938a8ffe9e44 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -104,7 +104,20 @@ abstract class LogicalPlan
*/
lazy val resolved: Boolean = expressions.forall(_.resolved) &&
childrenResolved
- override protected def statePrefix = if (!resolved) "'" else
super.statePrefix
+ override protected def statePrefix = {
+ if (!resolved) {
+ "'"
+ } else {
+ val prefixFromSuper = super.statePrefix
+ // Ancestor class could mark something on the prefix, including
'invalid'. Add a marker for
+ // `streaming` only when there is no marker from ancestor class.
+ if (prefixFromSuper.isEmpty && isStreaming) {
+ "~"
+ } else {
+ prefixFromSuper
+ }
+ }
+ }
/**
* Returns true if all its children of this query plan have been resolved.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]