This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8506728  [SPARK-32776][SS] Limit in streaming should not be optimized 
away by PropagateEmptyRelation
8506728 is described below

commit 850672870faed6677d166c0029335600a15f4153
Author: liwensun <liwen....@databricks.com>
AuthorDate: Wed Sep 2 18:05:06 2020 +0900

    [SPARK-32776][SS] Limit in streaming should not be optimized away by 
PropagateEmptyRelation
    
    PropagateEmptyRelation will not be applied to LIMIT operators in streaming 
queries.
    
    Right now, the limit operator in a streaming query may get optimized away 
when the relation is empty. This can be problematic for stateful streaming, as 
this empty batch will not write any state store files, and the next batch will 
fail when trying to read these state store files and throw a file not found 
error.
    
    We should not let PropagateEmptyRelation optimize away the Limit operator 
for streaming queries.
    
    This PR is intended as a small and safe fix for PropagateEmptyRelation. A 
fundamental fix that can prevent this from happening again in the future and in 
other optimizer rules is more desirable, but that's a much larger task.
    
    No
    
    unit tests.
    
    Closes #29623 from liwensun/spark-32776.
    
    Authored-by: liwensun <liwen....@databricks.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
    (cherry picked from commit f0851e95c6a2c05716a32c961bf2842e18fe1bc7)
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../optimizer/PropagateEmptyRelation.scala         |  4 +--
 .../optimizer/PropagateEmptyRelationSuite.scala    |  6 ++++
 .../spark/sql/streaming/StreamingQuerySuite.scala  | 38 ++++++++++++++++++++++
 3 files changed, 46 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
index b19e138..7535f9c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
@@ -85,8 +85,8 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with 
PredicateHelper wit
       case _: Filter => empty(p)
       case _: Sample => empty(p)
       case _: Sort => empty(p)
-      case _: GlobalLimit => empty(p)
-      case _: LocalLimit => empty(p)
+      case _: GlobalLimit if !p.isStreaming => empty(p)
+      case _: LocalLimit if !p.isStreaming => empty(p)
       case _: Repartition => empty(p)
       case _: RepartitionByExpression => empty(p)
       // An aggregate with non-empty group expression will return one output 
row per group when the
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index 9c7d4c7..535be12 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -221,4 +221,10 @@ class PropagateEmptyRelationSuite extends PlanTest {
     val optimized = Optimize.execute(query.analyze)
     assert(optimized.resolved)
   }
+
+  test("should not optimize away limit if streaming") {
+    val query = LocalRelation(Nil, Nil, isStreaming = true).limit(1).analyze
+    val optimized = Optimize.execute(query)
+    comparePlans(optimized, query)
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 434b6a2..1f408d5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -34,7 +34,9 @@ import org.scalatestplus.mockito.MockitoSugar
 import org.apache.spark.{SparkException, TestUtils}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, 
Row}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, 
Shuffle, Uuid}
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
@@ -1141,6 +1143,42 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     }
   }
 
+  testQuietly("limit on empty batch should not cause state store error") {
+    // The source only produces two batches, the first batch is empty and the 
second batch has data.
+    val source = new Source {
+      var batchId = 0
+      override def stop(): Unit = {}
+      override def getOffset: Option[Offset] = {
+        Some(LongOffset(batchId + 1))
+      }
+      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+        if (batchId == 0) {
+          batchId += 1
+          Dataset.ofRows(spark, LocalRelation(schema.toAttributes, Nil, 
isStreaming = true))
+        } else {
+          Dataset.ofRows(spark,
+            LocalRelation(schema.toAttributes, InternalRow(10) :: Nil, 
isStreaming = true))
+        }
+      }
+      override def schema: StructType = MockSourceProvider.fakeSchema
+    }
+
+    MockSourceProvider.withMockSources(source) {
+      val df = spark.readStream
+        .format("org.apache.spark.sql.streaming.util.MockSourceProvider")
+        .load()
+        .limit(1)
+
+      testStream(df)(
+        StartStream(),
+        AssertOnQuery { q =>
+          q.processAllAvailable()
+          true
+        },
+        CheckAnswer(10))
+    }
+  }
+
   private def checkExceptionMessage(df: DataFrame): Unit = {
     withTempDir { outputDir =>
       withTempDir { checkpointDir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to