Repository: spark
Updated Branches:
  refs/heads/branch-2.1 e8866f9fc -> c4de90fc7


[SPARK-18852][SS] StreamingQuery.lastProgress should be null when 
recentProgress is empty

## What changes were proposed in this pull request?

Right now `StreamingQuery.lastProgress` throws NoSuchElementException and it's 
hard to be used in Python since Python user will just see Py4jError.

This PR just makes it return null instead.

## How was this patch tested?

`test("lastProgress should be null when recentProgress is empty")`

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #16273 from zsxwing/SPARK-18852.

(cherry picked from commit 1ac6567bdb03d7cc5c5f3473827a102280cb1030)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4de90fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4de90fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4de90fc

Branch: refs/heads/branch-2.1
Commit: c4de90fc76d5aa5d2c8fee4ed692d4ab922cbab0
Parents: e8866f9
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Dec 14 13:36:41 2016 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Dec 14 13:36:55 2016 -0800

----------------------------------------------------------------------
 python/pyspark/sql/streaming.py                 |  9 ++-
 python/pyspark/sql/tests.py                     | 18 ++++-
 .../execution/streaming/ProgressReporter.scala  |  4 +-
 .../streaming/StreamingQueryManagerSuite.scala  |  9 +--
 .../sql/streaming/StreamingQuerySuite.scala     | 21 +++++-
 .../sql/streaming/util/BlockingSource.scala     | 72 ++++++++++++++++++++
 .../sql/streaming/util/DefaultSource.scala      | 66 ------------------
 7 files changed, 121 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c4de90fc/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 9cfb3fe..eabd5ef 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -125,10 +125,15 @@ class StreamingQuery(object):
     @since(2.1)
     def lastProgress(self):
         """
-        Returns the most recent :class:`StreamingQueryProgress` update of this 
streaming query.
+        Returns the most recent :class:`StreamingQueryProgress` update of this 
streaming query or
+        None if there were no progress updates
         :return: a map
         """
-        return json.loads(self._jsq.lastProgress().json())
+        lastProgress = self._jsq.lastProgress()
+        if lastProgress:
+            return json.loads(lastProgress.json())
+        else:
+            return None
 
     @since(2.0)
     def processAllAvailable(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/c4de90fc/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 66320bd..115b4a9 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1119,9 +1119,25 @@ class SQLTests(ReusedPySparkTestCase):
         self.assertTrue(df.isStreaming)
         out = os.path.join(tmpPath, 'out')
         chk = os.path.join(tmpPath, 'chk')
-        q = df.writeStream \
+
+        def func(x):
+            time.sleep(1)
+            return x
+
+        from pyspark.sql.functions import col, udf
+        sleep_udf = udf(func)
+
+        # Use "sleep_udf" to delay the progress update so that we can test 
`lastProgress` when there
+        # were no updates.
+        q = df.select(sleep_udf(col("value")).alias('value')).writeStream \
             .start(path=out, format='parquet', queryName='this_query', 
checkpointLocation=chk)
         try:
+            # "lastProgress" will return None in most cases. However, as it 
may be flaky when
+            # Jenkins is very slow, we don't assert it. If there is something 
wrong, "lastProgress"
+            # may throw error with a high chance and make this test flaky, so 
we should still be
+            # able to detect broken codes.
+            q.lastProgress
+
             q.processAllAvailable()
             lastProgress = q.lastProgress
             recentProgress = q.recentProgress

http://git-wip-us.apache.org/repos/asf/spark/blob/c4de90fc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 549b936..e40135f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -100,9 +100,9 @@ trait ProgressReporter extends Logging {
     progressBuffer.toArray
   }
 
-  /** Returns the most recent query progress update. */
+  /** Returns the most recent query progress update or null if there were no 
progress updates. */
   def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
-    progressBuffer.last
+    progressBuffer.lastOption.orNull
   }
 
   /** Begins recording statistics about query progress for a given trigger. */

http://git-wip-us.apache.org/repos/asf/spark/blob/c4de90fc/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index d188319..1742a54 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -32,6 +32,7 @@ import org.scalatest.time.SpanSugar._
 import org.apache.spark.SparkException
 import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.streaming.util.BlockingSource
 import org.apache.spark.util.Utils
 
 class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
@@ -217,7 +218,7 @@ class StreamingQueryManagerSuite extends StreamTest with 
BeforeAndAfter {
 
   test("SPARK-18811: Source resolution should not block main thread") {
     failAfter(streamingTimeout) {
-      StreamingQueryManagerSuite.latch = new CountDownLatch(1)
+      BlockingSource.latch = new CountDownLatch(1)
       withTempDir { tempDir =>
         // if source resolution was happening on the main thread, it would 
block the start call,
         // now it should only be blocking the stream execution thread
@@ -231,7 +232,7 @@ class StreamingQueryManagerSuite extends StreamTest with 
BeforeAndAfter {
         eventually(Timeout(streamingTimeout)) {
           assert(sq.status.message.contains("Initializing sources"))
         }
-        StreamingQueryManagerSuite.latch.countDown()
+        BlockingSource.latch.countDown()
         sq.stop()
       }
     }
@@ -321,7 +322,3 @@ class StreamingQueryManagerSuite extends StreamTest with 
BeforeAndAfter {
     (inputData, mapped)
   }
 }
-
-object StreamingQueryManagerSuite {
-  var latch: CountDownLatch = null
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c4de90fc/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index afd788c..b052bd9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.streaming
 
-import scala.collection.JavaConverters._
+import java.util.concurrent.CountDownLatch
 
 import org.apache.commons.lang3.RandomStringUtils
 import org.scalactic.TolerantNumerics
@@ -32,6 +32,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.util.BlockingSource
 import org.apache.spark.util.ManualClock
 
 
@@ -312,6 +313,24 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
     )
   }
 
+  test("lastProgress should be null when recentProgress is empty") {
+    BlockingSource.latch = new CountDownLatch(1)
+    withTempDir { tempDir =>
+      val sq = spark.readStream
+        .format("org.apache.spark.sql.streaming.util.BlockingSource")
+        .load()
+        .writeStream
+        .format("org.apache.spark.sql.streaming.util.BlockingSource")
+        .option("checkpointLocation", tempDir.toString)
+        .start()
+      // Creating source is blocked so recentProgress is empty and 
lastProgress should be null
+      assert(sq.lastProgress === null)
+      // Release the latch and stop the query
+      BlockingSource.latch.countDown()
+      sq.stop()
+    }
+  }
+
   test("codahale metrics") {
     val inputData = MemoryStream[Int]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c4de90fc/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
new file mode 100644
index 0000000..19ab2ff
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.util
+
+import java.util.concurrent.CountDownLatch
+
+import org.apache.spark.sql.{SQLContext, _}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, 
Source}
+import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+/** Dummy provider: returns a SourceProvider with a blocking `createSource` 
call. */
+class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
+
+  private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
+
+  override def sourceSchema(
+      spark: SQLContext,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): (String, StructType) = {
+    ("dummySource", fakeSchema)
+  }
+
+  override def createSource(
+      spark: SQLContext,
+      metadataPath: String,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): Source = {
+    BlockingSource.latch.await()
+    new Source {
+      override def schema: StructType = fakeSchema
+      override def getOffset: Option[Offset] = Some(new LongOffset(0))
+      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+        import spark.implicits._
+        Seq[Int]().toDS().toDF()
+      }
+      override def stop() {}
+    }
+  }
+
+  override def createSink(
+      spark: SQLContext,
+      parameters: Map[String, String],
+      partitionColumns: Seq[String],
+      outputMode: OutputMode): Sink = {
+    new Sink {
+      override def addBatch(batchId: Long, data: DataFrame): Unit = {}
+    }
+  }
+}
+
+object BlockingSource {
+  var latch: CountDownLatch = null
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c4de90fc/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
deleted file mode 100644
index b0adf76..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming.util
-
-import org.apache.spark.sql.{SQLContext, _}
-import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, 
Source}
-import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite}
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
-
-/** Dummy provider: returns a SourceProvider with a blocking `createSource` 
call. */
-class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
-
-  private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
-
-  override def sourceSchema(
-      spark: SQLContext,
-      schema: Option[StructType],
-      providerName: String,
-      parameters: Map[String, String]): (String, StructType) = {
-    ("dummySource", fakeSchema)
-  }
-
-  override def createSource(
-      spark: SQLContext,
-      metadataPath: String,
-      schema: Option[StructType],
-      providerName: String,
-      parameters: Map[String, String]): Source = {
-    StreamingQueryManagerSuite.latch.await()
-    new Source {
-      override def schema: StructType = fakeSchema
-      override def getOffset: Option[Offset] = Some(new LongOffset(0))
-      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
-        import spark.implicits._
-        Seq[Int]().toDS().toDF()
-      }
-      override def stop() {}
-    }
-  }
-
-  override def createSink(
-      spark: SQLContext,
-      parameters: Map[String, String],
-      partitionColumns: Seq[String],
-      outputMode: OutputMode): Sink = {
-    new Sink {
-      override def addBatch(batchId: Long, data: DataFrame): Unit = {}
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to