This is an automated email from the ASF dual-hosted git repository.
jtorres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4ebff5b [SPARK-28223][SS] stream-stream joins should fail unsupported
checker in update mode
4ebff5b is described below
commit 4ebff5b6d68f26cc1ff9265a5489e0d7c2e05449
Author: Jose Torres <[email protected]>
AuthorDate: Tue Jul 2 09:59:11 2019 -0700
[SPARK-28223][SS] stream-stream joins should fail unsupported checker in
update mode
## What changes were proposed in this pull request?
Right now they fail only for inner joins, because we implemented the check
when that was the only supported type.
## How was this patch tested?
new unit test
Closes #25023 from jose-torres/changevalidation.
Authored-by: Jose Torres <[email protected]>
Signed-off-by: Jose Torres <[email protected]>
---
.../catalyst/analysis/UnsupportedOperationChecker.scala | 11 +++++------
.../catalyst/analysis/UnsupportedOperationsSuite.scala | 16 +++++++++++++++-
2 files changed, 20 insertions(+), 7 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 41ba6d3..288ff1a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -230,15 +230,14 @@ object UnsupportedOperationChecker {
"streaming DataFrame/Dataset")
case Join(left, right, joinType, condition, _) =>
+ if (left.isStreaming && right.isStreaming && outputMode !=
InternalOutputModes.Append) {
+ throwError("Join between two streaming DataFrames/Datasets is not
supported" +
+ s" in ${outputMode} output mode, only in Append output mode")
+ }
joinType match {
-
case _: InnerLike =>
- if (left.isStreaming && right.isStreaming &&
- outputMode != InternalOutputModes.Append) {
- throwError("Inner join between two streaming
DataFrames/Datasets is not supported" +
- s" in ${outputMode} output mode, only in Append output mode")
- }
+ // no further validations needed
case FullOuter =>
if (left.isStreaming || right.isStreaming) {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 28a164b..0fe646e 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -404,7 +404,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
_.join(_, joinType = Inner),
outputMode = Update,
streamStreamSupported = false,
- expectedMsg = "inner join")
+ expectedMsg = "is not supported in Update output mode")
// Full outer joins: only batch-batch is allowed
testBinaryOperationInStreamingPlan(
@@ -422,6 +422,20 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
streamStreamSupported = false,
expectedMsg = "outer join")
+ // Left outer joins: update and complete mode not allowed
+ assertNotSupportedInStreamingPlan(
+ s"left outer join with stream-stream relations and update mode",
+ streamRelation.join(streamRelation, joinType = LeftOuter,
+ condition = Some(attribute === attribute)),
+ OutputMode.Update(),
+ Seq("is not supported in Update output mode"))
+ assertNotSupportedInStreamingPlan(
+ s"left outer join with stream-stream relations and complete mode",
+ Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType
= LeftOuter,
+ condition = Some(attribute === attribute))),
+ OutputMode.Complete(),
+ Seq("is not supported in Complete output mode"))
+
// Left outer joins: stream-stream allowed with join on watermark attribute
// Note that the attribute need not be watermarked on both sides.
assertSupportedInStreamingPlan(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]