Repository: spark
Updated Branches:
refs/heads/master 5d7994736 -> 1ac6567bd
[SPARK-18852][SS] StreamingQuery.lastProgress should be null when
recentProgress is empty
## What changes were proposed in this pull request?
Right now `StreamingQuery.lastProgress` throws NoSuchElementException and it's
hard to be used in Python since Python user will just see Py4jError.
This PR just makes it return null instead.
## How was this patch tested?
`test("lastProgress should be null when recentProgress is empty")`
Author: Shixiong Zhu <[email protected]>
Closes #16273 from zsxwing/SPARK-18852.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ac6567b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ac6567b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ac6567b
Branch: refs/heads/master
Commit: 1ac6567bdb03d7cc5c5f3473827a102280cb1030
Parents: 5d79947
Author: Shixiong Zhu <[email protected]>
Authored: Wed Dec 14 13:36:41 2016 -0800
Committer: Shixiong Zhu <[email protected]>
Committed: Wed Dec 14 13:36:41 2016 -0800
----------------------------------------------------------------------
python/pyspark/sql/streaming.py | 9 ++-
python/pyspark/sql/tests.py | 18 ++++-
.../execution/streaming/ProgressReporter.scala | 4 +-
.../streaming/StreamingQueryManagerSuite.scala | 9 +--
.../sql/streaming/StreamingQuerySuite.scala | 21 +++++-
.../sql/streaming/util/BlockingSource.scala | 72 ++++++++++++++++++++
.../sql/streaming/util/DefaultSource.scala | 66 ------------------
7 files changed, 121 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1ac6567b/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 9cfb3fe..eabd5ef 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -125,10 +125,15 @@ class StreamingQuery(object):
@since(2.1)
def lastProgress(self):
"""
- Returns the most recent :class:`StreamingQueryProgress` update of this
streaming query.
+ Returns the most recent :class:`StreamingQueryProgress` update of this
streaming query or
+ None if there were no progress updates
:return: a map
"""
- return json.loads(self._jsq.lastProgress().json())
+ lastProgress = self._jsq.lastProgress()
+ if lastProgress:
+ return json.loads(lastProgress.json())
+ else:
+ return None
@since(2.0)
def processAllAvailable(self):
http://git-wip-us.apache.org/repos/asf/spark/blob/1ac6567b/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index af7d52c..6ddd804 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1128,9 +1128,25 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- q = df.writeStream \
+
+ def func(x):
+ time.sleep(1)
+ return x
+
+ from pyspark.sql.functions import col, udf
+ sleep_udf = udf(func)
+
+ # Use "sleep_udf" to delay the progress update so that we can test
`lastProgress` when there
+ # were no updates.
+ q = df.select(sleep_udf(col("value")).alias('value')).writeStream \
.start(path=out, format='parquet', queryName='this_query',
checkpointLocation=chk)
try:
+ # "lastProgress" will return None in most cases. However, as it
may be flaky when
+ # Jenkins is very slow, we don't assert it. If there is something
wrong, "lastProgress"
+ # may throw error with a high chance and make this test flaky, so
we should still be
+ # able to detect broken codes.
+ q.lastProgress
+
q.processAllAvailable()
lastProgress = q.lastProgress
recentProgress = q.recentProgress
http://git-wip-us.apache.org/repos/asf/spark/blob/1ac6567b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 549b936..e40135f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -100,9 +100,9 @@ trait ProgressReporter extends Logging {
progressBuffer.toArray
}
- /** Returns the most recent query progress update. */
+ /** Returns the most recent query progress update or null if there were no
progress updates. */
def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
- progressBuffer.last
+ progressBuffer.lastOption.orNull
}
/** Begins recording statistics about query progress for a given trigger. */
http://git-wip-us.apache.org/repos/asf/spark/blob/1ac6567b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index d188319..1742a54 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -32,6 +32,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkException
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.streaming.util.BlockingSource
import org.apache.spark.util.Utils
class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
@@ -217,7 +218,7 @@ class StreamingQueryManagerSuite extends StreamTest with
BeforeAndAfter {
test("SPARK-18811: Source resolution should not block main thread") {
failAfter(streamingTimeout) {
- StreamingQueryManagerSuite.latch = new CountDownLatch(1)
+ BlockingSource.latch = new CountDownLatch(1)
withTempDir { tempDir =>
// if source resolution was happening on the main thread, it would
block the start call,
// now it should only be blocking the stream execution thread
@@ -231,7 +232,7 @@ class StreamingQueryManagerSuite extends StreamTest with
BeforeAndAfter {
eventually(Timeout(streamingTimeout)) {
assert(sq.status.message.contains("Initializing sources"))
}
- StreamingQueryManagerSuite.latch.countDown()
+ BlockingSource.latch.countDown()
sq.stop()
}
}
@@ -321,7 +322,3 @@ class StreamingQueryManagerSuite extends StreamTest with
BeforeAndAfter {
(inputData, mapped)
}
}
-
-object StreamingQueryManagerSuite {
- var latch: CountDownLatch = null
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/1ac6567b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index afd788c..b052bd9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.streaming
-import scala.collection.JavaConverters._
+import java.util.concurrent.CountDownLatch
import org.apache.commons.lang3.RandomStringUtils
import org.scalactic.TolerantNumerics
@@ -32,6 +32,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.util.BlockingSource
import org.apache.spark.util.ManualClock
@@ -312,6 +313,24 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging {
)
}
+ test("lastProgress should be null when recentProgress is empty") {
+ BlockingSource.latch = new CountDownLatch(1)
+ withTempDir { tempDir =>
+ val sq = spark.readStream
+ .format("org.apache.spark.sql.streaming.util.BlockingSource")
+ .load()
+ .writeStream
+ .format("org.apache.spark.sql.streaming.util.BlockingSource")
+ .option("checkpointLocation", tempDir.toString)
+ .start()
+ // Creating source is blocked so recentProgress is empty and
lastProgress should be null
+ assert(sq.lastProgress === null)
+ // Release the latch and stop the query
+ BlockingSource.latch.countDown()
+ sq.stop()
+ }
+ }
+
test("codahale metrics") {
val inputData = MemoryStream[Int]
http://git-wip-us.apache.org/repos/asf/spark/blob/1ac6567b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
new file mode 100644
index 0000000..19ab2ff
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.util
+
+import java.util.concurrent.CountDownLatch
+
+import org.apache.spark.sql.{SQLContext, _}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink,
Source}
+import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+/** Dummy provider: returns a SourceProvider with a blocking `createSource`
call. */
+class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
+
+ private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
+
+ override def sourceSchema(
+ spark: SQLContext,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): (String, StructType) = {
+ ("dummySource", fakeSchema)
+ }
+
+ override def createSource(
+ spark: SQLContext,
+ metadataPath: String,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): Source = {
+ BlockingSource.latch.await()
+ new Source {
+ override def schema: StructType = fakeSchema
+ override def getOffset: Option[Offset] = Some(new LongOffset(0))
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ import spark.implicits._
+ Seq[Int]().toDS().toDF()
+ }
+ override def stop() {}
+ }
+ }
+
+ override def createSink(
+ spark: SQLContext,
+ parameters: Map[String, String],
+ partitionColumns: Seq[String],
+ outputMode: OutputMode): Sink = {
+ new Sink {
+ override def addBatch(batchId: Long, data: DataFrame): Unit = {}
+ }
+ }
+}
+
+object BlockingSource {
+ var latch: CountDownLatch = null
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1ac6567b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
deleted file mode 100644
index b0adf76..0000000
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming.util
-
-import org.apache.spark.sql.{SQLContext, _}
-import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink,
Source}
-import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite}
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
-
-/** Dummy provider: returns a SourceProvider with a blocking `createSource`
call. */
-class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
-
- private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
-
- override def sourceSchema(
- spark: SQLContext,
- schema: Option[StructType],
- providerName: String,
- parameters: Map[String, String]): (String, StructType) = {
- ("dummySource", fakeSchema)
- }
-
- override def createSource(
- spark: SQLContext,
- metadataPath: String,
- schema: Option[StructType],
- providerName: String,
- parameters: Map[String, String]): Source = {
- StreamingQueryManagerSuite.latch.await()
- new Source {
- override def schema: StructType = fakeSchema
- override def getOffset: Option[Offset] = Some(new LongOffset(0))
- override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
- import spark.implicits._
- Seq[Int]().toDS().toDF()
- }
- override def stop() {}
- }
- }
-
- override def createSink(
- spark: SQLContext,
- parameters: Map[String, String],
- partitionColumns: Seq[String],
- outputMode: OutputMode): Sink = {
- new Sink {
- override def addBatch(batchId: Long, data: DataFrame): Unit = {}
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]