This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f406b54b2a8 [SPARK-44044][SS] Improve Error message for Window
functions with streaming
f406b54b2a8 is described below
commit f406b54b2a899d03bae2e6f70eef7fedfed63d65
Author: Siying Dong <[email protected]>
AuthorDate: Sat Jul 1 08:51:22 2023 +0300
[SPARK-44044][SS] Improve Error message for Window functions with streaming
### What changes were proposed in this pull request?
Replace existing error message when non-time window function is used with
streaming to include aggregation function and column. The error message looks
like following now:
org.apache.spark.sql.AnalysisException: Window function is not supported in
'row_number()' as column 'rn_col' on streaming DataFrames/Datasets. Structured
Streaming only supports time-window aggregation using the `window` unction.
(window specification: '(PARTITION BY col1 ORDER BY col2 ASC NULLS FIRST ROWS
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)')
Note that the message is a little bit unnatural as the existing unit test
requires the exception follows the pattern that it includes "not supported",
"streaming" "DataFrames" and "Dataset".
### Why are the changes needed?
The exiting error message is vague and a full logical plan is included. A
user reports that they aren't able to identify what the problem is.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a unit test
Closes #41578 from siying/window_error.
Lead-authored-by: Siying Dong <[email protected]>
Co-authored-by: Siying Dong <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../src/main/resources/error/error-classes.json | 5 ++++
.../analysis/UnsupportedOperationChecker.scala | 17 ++++++++++---
.../spark/sql/errors/QueryExecutionErrors.scala | 16 ++++++++++++-
.../analysis/UnsupportedOperationsSuite.scala | 24 ++++++++++++++-----
.../apache/spark/sql/streaming/StreamSuite.scala | 28 ++++++++++++++++++++++
5 files changed, 80 insertions(+), 10 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index eabd5533e13..14bd3bc6bac 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1775,6 +1775,11 @@
],
"sqlState" : "42000"
},
+ "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING" : {
+ "message" : [
+ "Window function is not supported in <windowFunc> (as column
<columnName>) on streaming DataFrames/Datasets. Structured Streaming only
supports time-window aggregation using the WINDOW function. (window
specification: <windowSpec>)"
+ ]
+ },
"NOT_ALLOWED_IN_FROM" : {
"message" : [
"Not allowed in the FROM clause:"
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index daa7c0d54b7..2a09d85d8f2 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike,
Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan,
LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike,
Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan,
LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow,
WindowExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
@@ -508,8 +509,18 @@ object UnsupportedOperationChecker extends Logging {
case Sample(_, _, _, _, child) if child.isStreaming =>
throwError("Sampling is not supported on streaming
DataFrames/Datasets")
- case Window(_, _, _, child) if child.isStreaming =>
- throwError("Non-time-based windows are not supported on streaming
DataFrames/Datasets")
+ case Window(windowExpression, _, _, child) if child.isStreaming =>
+ val (windowFuncList, columnNameList, windowSpecList) =
windowExpression.flatMap { e =>
+ e.collect {
+ case we: WindowExpression =>
+ (we.windowFunction.toString, e.toAttribute.sql,
we.windowSpec.sql)
+ }
+ }.unzip3
+ throw QueryExecutionErrors.nonTimeWindowNotSupportedInStreamingError(
+ windowFuncList,
+ columnNameList,
+ windowSpecList,
+ subPlan.origin)
case ReturnAnswer(child) if child.isStreaming =>
throwError("Cannot return immediate result on streaming
DataFrames/Dataset. Queries " +
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 59b66bd4343..74c29cabbe1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ValueInterval
-import org.apache.spark.sql.catalyst.trees.{SQLQueryContext, TreeNode}
+import org.apache.spark.sql.catalyst.trees.{Origin, SQLQueryContext, TreeNode}
import org.apache.spark.sql.catalyst.util.{sideBySide, BadRecordException,
DateTimeUtils, FailFastMode}
import org.apache.spark.sql.connector.catalog.{CatalogNotFoundException,
Table, TableProvider}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
@@ -724,6 +724,20 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
messageParameters = Map("className" -> className, "operator" ->
operator))
}
+ def nonTimeWindowNotSupportedInStreamingError(
+ windowFuncList: Seq[String],
+ columnNameList: Seq[String],
+ windowSpecList: Seq[String],
+ origin: Origin): AnalysisException = {
+ new AnalysisException(
+ errorClass = "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING",
+ messageParameters = Map(
+ "windowFunc" -> windowFuncList.map(toSQLStmt(_)).mkString(","),
+ "columnName" -> columnNameList.map(toSQLId(_)).mkString(","),
+ "windowSpec" -> windowSpecList.map(toSQLStmt(_)).mkString(",")),
+ origin = origin)
+ }
+
def multiplePathsSpecifiedError(allPaths: Seq[String]):
SparkIllegalArgumentException = {
new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_2050",
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index f9fd02b86e9..32c9a3aa17e 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -738,7 +738,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite with
SQLHelper {
testUnaryOperatorInStreamingPlan(
"sample", Sample(0.1, 1, true, 1L, _), expectedMsg = "sampling")
testUnaryOperatorInStreamingPlan(
- "window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows")
+ "window",
+ Window(Nil, Nil, Nil, _),
+ errorClass = "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING")
// Output modes with aggregation and non-aggregation plans
testOutputMode(Append, shouldSupportAggregation = false,
shouldSupportNonAggregation = true)
@@ -870,7 +872,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite with
SQLHelper {
operationName: String,
logicalPlanGenerator: LogicalPlan => LogicalPlan,
outputMode: OutputMode = Append,
- expectedMsg: String = ""): Unit = {
+ expectedMsg: String = "",
+ errorClass: String = ""): Unit = {
val expectedMsgs = if (expectedMsg.isEmpty) Seq(operationName) else
Seq(expectedMsg)
@@ -878,7 +881,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite with
SQLHelper {
s"$operationName with stream relation",
wrapInStreaming(logicalPlanGenerator(streamRelation)),
outputMode,
- expectedMsgs)
+ expectedMsgs,
+ errorClass)
assertSupportedInStreamingPlan(
s"$operationName with batch relation",
@@ -1025,10 +1029,12 @@ class UnsupportedOperationsSuite extends SparkFunSuite
with SQLHelper {
name: String,
plan: LogicalPlan,
outputMode: OutputMode,
- expectedMsgs: Seq[String]): Unit = {
+ expectedMsgs: Seq[String],
+ errorClass: String = ""): Unit = {
testError(
s"streaming plan - $name: not supported",
- expectedMsgs :+ "streaming" :+ "DataFrame" :+ "Dataset" :+ "not
supported") {
+ expectedMsgs :+ "streaming" :+ "DataFrame" :+ "Dataset" :+ "not
supported",
+ errorClass) {
UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan),
outputMode)
}
}
@@ -1090,7 +1096,10 @@ class UnsupportedOperationsSuite extends SparkFunSuite
with SQLHelper {
* Test whether the body of code will fail. If it does fail, then check if
it has expected
* messages.
*/
- def testError(testName: String, expectedMsgs: Seq[String])(testBody: =>
Unit): Unit = {
+ def testError(
+ testName: String,
+ expectedMsgs: Seq[String],
+ errorClass: String = "")(testBody: => Unit): Unit = {
test(testName) {
val e = intercept[AnalysisException] {
@@ -1102,6 +1111,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite
with SQLHelper {
s"actual exception message:\n\t'${e.getMessage}'")
}
}
+ if (!errorClass.isEmpty) {
+ assert(e.getErrorClass == errorClass)
+ }
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 6fd63454e82..0ee44a098f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.streaming._
import
org.apache.spark.sql.execution.streaming.sources.{ContinuousMemoryStream,
MemorySink}
import org.apache.spark.sql.execution.streaming.state.{StateStore,
StateStoreConf, StateStoreId, StateStoreProvider}
+import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.StreamSourceProvider
@@ -686,6 +687,33 @@ class StreamSuite extends StreamTest {
assert(query.exception.isEmpty)
}
+ test("SPARK-44044: non-time-window") {
+ val inputData = MemoryStream[(Int, Int)]
+ val e = intercept[AnalysisException] {
+ val agg = inputData
+ .toDF()
+ .selectExpr("CAST(_1 AS timestamp) AS col1", "_2 AS col2")
+ .withWatermark("col1", "10 seconds")
+ .withColumn("rn_col", row_number().over(Window
+ .partitionBy("col1")
+ .orderBy(col("col2"))))
+ .select("rn_col", "col1", "col2")
+ .writeStream
+ .format("console")
+ .start()
+ }
+ checkError(
+ e,
+ "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING",
+ parameters = Map(
+ "windowFunc" -> "ROW_NUMBER()",
+ "columnName" -> "`rn_col`",
+ "windowSpec" ->
+ ("(PARTITION BY COL1 ORDER BY COL2 ASC NULLS FIRST ROWS BETWEEN
UNBOUNDED PRECEDING " +
+ "AND CURRENT ROW)")))
+ }
+
+
test("SPARK-19873: streaming aggregation with change in number of
partitions") {
val inputData = MemoryStream[(Int, Int)]
val agg = inputData.toDS().groupBy("_1").count()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]