Repository: spark
Updated Branches:
  refs/heads/master 0c23e254c -> 7f1b6b182


[SPARK-24136][SS] Fix MemoryStreamDataReader.next to skip sleeping if record is 
available

## What changes were proposed in this pull request?

Avoid unnecessary sleep (10 ms) in each invocation of 
MemoryStreamDataReader.next.

## How was this patch tested?

Ran ContinuousSuite from IDE.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Arun Mahadevan <ar...@apache.org>

Closes #21207 from arunmahadevan/memorystream.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f1b6b18
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f1b6b18
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f1b6b18

Branch: refs/heads/master
Commit: 7f1b6b182e3cf3cbf29399e7bfbe03fa869e0bc8
Parents: 0c23e25
Author: Arun Mahadevan <ar...@apache.org>
Authored: Fri May 4 16:02:21 2018 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Fri May 4 16:02:21 2018 +0800

----------------------------------------------------------------------
 .../streaming/sources/ContinuousMemoryStream.scala          | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7f1b6b18/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index c28919b..a8fca3c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -183,11 +183,10 @@ class ContinuousMemoryStreamDataReader(
   private var current: Option[Row] = None
 
   override def next(): Boolean = {
-    current = None
+    current = getRecord
     while (current.isEmpty) {
       Thread.sleep(10)
-      current = endpoint.askSync[Option[Row]](
-          GetRecord(ContinuousMemoryStreamPartitionOffset(partition, 
currentOffset)))
+      current = getRecord
     }
     currentOffset += 1
     true
@@ -199,6 +198,10 @@ class ContinuousMemoryStreamDataReader(
 
   override def getOffset: ContinuousMemoryStreamPartitionOffset =
     ContinuousMemoryStreamPartitionOffset(partition, currentOffset)
+
+  private def getRecord: Option[Row] =
+    endpoint.askSync[Option[Row]](
+      GetRecord(ContinuousMemoryStreamPartitionOffset(partition, 
currentOffset)))
 }
 
 case class ContinuousMemoryStreamOffset(partitionNums: Map[Int, Int])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to