This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new e87d166a81c [SPARK-46062][SQL] Sync the isStreaming flag between CTE
definition and reference
e87d166a81c is described below
commit e87d166a81c620c15cc94dfb15c17c9cacbbc9b6
Author: Jungtaek Lim <[email protected]>
AuthorDate: Thu Nov 23 22:32:16 2023 +0900
[SPARK-46062][SQL] Sync the isStreaming flag between CTE definition and
reference
This PR proposes to sync the flag `isStreaming` from CTE definition to CTE
reference.
The essential issue is that CTE reference node cannot determine the flag
`isStreaming` by itself, and never be able to have a proper value and always
takes the default as it does not have a parameter in constructor. The other
flag `resolved` is handled, and we need to do the same for `isStreaming`.
Once we add the parameter to the constructor, we will also need to make
sure the flag is in sync with CTE definition. We have a rule `ResolveWithCTE`
doing the sync, hence we add the logic to sync the flag `isStreaming` as well.
The bug may impact some rules which behaves differently depending on
isStreaming flag. It would no longer be a problem once CTE reference is
replaced with CTE definition at some point in "optimization phase", but all
rules in analyzer and optimizer being triggered before the rule takes effect
may misbehave based on incorrect isStreaming flag.
No.
New UT.
No.
Closes #43966 from HeartSaVioR/SPARK-46062.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 43046631a5d4ac7201361a00473cc87fa52ab5a7)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/catalyst/analysis/CTESubstitution.scala | 2 +-
.../sql/catalyst/analysis/ResolveWithCTE.scala | 2 +-
.../catalyst/optimizer/MergeScalarSubqueries.scala | 3 +-
...ushdownPredicatesAndPruneColumnsForCTEDef.scala | 2 +-
.../plans/logical/basicLogicalOperators.scala | 1 +
.../sql/catalyst/analysis/AnalysisSuite.scala | 15 +++++++
.../optimizer/MergeScalarSubqueriesSuite.scala | 3 +-
.../spark/sql/streaming/StreamingQuerySuite.scala | 47 +++++++++++++++++++++-
8 files changed, 69 insertions(+), 6 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
index 77c687843c3..f047483b20f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
@@ -261,7 +261,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
d.child
} else {
// Add a `SubqueryAlias` for hint-resolving rules to match
relation names.
- SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output))
+ SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output,
d.isStreaming))
}
}.getOrElse(u)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala
index 78b776f12f0..f1077378b2d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala
@@ -51,7 +51,7 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
case ref: CTERelationRef if !ref.resolved =>
cteDefMap.get(ref.cteId).map { cteDef =>
- CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output)
+ CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output,
cteDef.isStreaming)
}.getOrElse {
ref
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala
index 6184160829b..ff0bc5e66d7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala
@@ -381,7 +381,8 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
val subqueryCTE = header.plan.asInstanceOf[CTERelationDef]
GetStructField(
ScalarSubquery(
- CTERelationRef(subqueryCTE.id, _resolved = true,
subqueryCTE.output),
+ CTERelationRef(subqueryCTE.id, _resolved = true,
subqueryCTE.output,
+ subqueryCTE.isStreaming),
exprId = ssr.exprId),
ssr.headerIndex)
} else {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala
index f351ba0b39a..41859673616 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala
@@ -141,7 +141,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends
Rule[LogicalPlan] {
cteDef
}
- case cteRef @ CTERelationRef(cteId, _, output, _) =>
+ case cteRef @ CTERelationRef(cteId, _, output, _, _) =>
val (cteDef, _, _, newAttrSet) = cteMap(cteId)
if (newAttrSet.size < output.size) {
val indices = newAttrSet.toSeq.map(cteDef.output.indexOf)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index b5a2f097424..d775b72a5da 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -837,6 +837,7 @@ case class CTERelationRef(
cteId: Long,
_resolved: Boolean,
override val output: Seq[Attribute],
+ override val isStreaming: Boolean,
statsOpt: Option[Statistics] = None) extends LeafNode with
MultiInstanceRelation {
final override val nodePatterns: Seq[TreePattern] = Seq(CTE)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 9d51c41a6d8..8d9f9abc00b 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -1466,4 +1466,19 @@ class AnalysisSuite extends AnalysisTest with Matchers {
// EventTimeWatermark node is NOT eliminated.
assert(analyzed.exists(_.isInstanceOf[EventTimeWatermark]))
}
+
+ test("SPARK-46062: isStreaming flag is synced from CTE definition to CTE
reference") {
+ val cteDef = CTERelationDef(streamingRelation.select($"a", $"ts"))
+ // Intentionally marking the flag _resolved to false, so that analyzer has
a chance to sync
+ // the flag isStreaming on syncing the flag _resolved.
+ val cteRef = CTERelationRef(cteDef.id, _resolved = false, Nil, isStreaming
= false)
+ val plan = WithCTE(cteRef, Seq(cteDef)).analyze
+
+ val refs = plan.collect {
+ case r: CTERelationRef => r
+ }
+ assert(refs.length == 1)
+ assert(refs.head.resolved)
+ assert(refs.head.isStreaming)
+ }
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala
index 8af0e02855b..13e13841478 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala
@@ -42,7 +42,8 @@ class MergeScalarSubqueriesSuite extends PlanTest {
}
private def extractorExpression(cteIndex: Int, output: Seq[Attribute],
fieldIndex: Int) = {
- GetStructField(ScalarSubquery(CTERelationRef(cteIndex, _resolved = true,
output)), fieldIndex)
+ GetStructField(ScalarSubquery(
+ CTERelationRef(cteIndex, _resolved = true, output, isStreaming =
false)), fieldIndex)
.as("scalarsubquery()")
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index b889ac18974..da9b579b397 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset,
Row, SaveMode}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn,
Shuffle, Uuid}
-import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef,
CTERelationRef, LocalRelation}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
@@ -1317,6 +1317,51 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
}
}
+ test("SPARK-46062: streaming query reading from CTE, which refers to temp
view from " +
+ "streaming source") {
+ val inputStream = MemoryStream[Int]
+ inputStream.toDF().createOrReplaceTempView("tv")
+
+ val df = spark.sql(
+ """
+ |WITH w as (
+ | SELECT * FROM tv
+ |)
+ |SELECT value from w
+ |""".stripMargin)
+
+ testStream(df)(
+ AddData(inputStream, 1, 2, 3),
+ CheckAnswer(1, 2, 3),
+ Execute { q =>
+ var isStreamingForCteDef: Option[Boolean] = None
+ var isStreamingForCteRef: Option[Boolean] = None
+
+ q.analyzedPlan.foreach {
+ case d: CTERelationDef =>
+ assert(d.resolved, "The definition node must be resolved after
analysis.")
+ isStreamingForCteDef = Some(d.isStreaming)
+
+ case d: CTERelationRef =>
+ assert(d.resolved, "The reference node must be marked as resolved
after analysis.")
+ isStreamingForCteRef = Some(d.isStreaming)
+
+ case _ =>
+ }
+
+ assert(isStreamingForCteDef.isDefined &&
isStreamingForCteRef.isDefined,
+ "Both definition and reference for CTE should be available in
analyzed plan.")
+
+ assert(isStreamingForCteDef.get, "Expected isStreaming=true for CTE
definition, but " +
+ "isStreaming is set to false.")
+
+ assert(isStreamingForCteDef === isStreamingForCteRef,
+ "isStreaming flag should be carried over from definition to
reference, " +
+ s"definition: ${isStreamingForCteDef.get}, reference:
${isStreamingForCteRef.get}.")
+ }
+ )
+ }
+
private def checkExceptionMessage(df: DataFrame): Unit = {
withTempDir { outputDir =>
withTempDir { checkpointDir =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]