Platon Potapov created SPARK-6232:
-------------------------------------
Summary: Spark Streaming: simple application stalls processing
Key: SPARK-6232
URL: https://issues.apache.org/jira/browse/SPARK-6232
Project: Spark
Issue Type: Bug
Components: Streaming
Affects Versions: 1.2.1
Environment: Ubuntu, MacOS.
Reporter: Platon Potapov
Priority: Critical
Below is a snippet of a simple test application.
Run it in one terminal window, and "nc -lk 9999" in another.
Once per second, enter a number (so that the window would slide over several
non-empty RDDs). 2-3 numbers is going to be enough for the program to stall
with the following output:
{code}
-------------------------------------------
Time: 1425922369000 ms
-------------------------------------------
-------------------------------------------
Time: 1425922370000 ms
-------------------------------------------
(1.0,4.0)
-------------------------------------------
Time: 1425922371000 ms
-------------------------------------------
(1.0,4.0)
[Stage 17:=============================> (1 + 0) / 2]
{code}
We've tried both standalone (local master) and clustered setups - reproduces in
all cases. We tried raw sockets and Kafka as a receiver - reproduces in both
cases.
NOTE that the bug does not reproduce under the following conditions:
* the receiver is from a queue (StreamingContext.queueStream)
* in the commented-out "print" is un-commented.
* if the window+reduce is substituted to reduceByKeyAndWindow
here is the simple test application:
{code}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming._
object SparkStreamingTest extends App {
val sparkConf = new
SparkConf().setMaster("local[*]").setAppName("SparkStreamingTest")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines0 = ssc.socketTextStream("localhost", 9999,
StorageLevel.MEMORY_AND_DISK_SER)
val words = lines0.map(x => (1.0, x.toDouble))
// words.print() // TODO: enable this print to avoid the program freeze
val windowed = words.window(Seconds(4), Seconds(1))
val grouped = windowed.reduceByKey(_ + _)
grouped.print()
ssc.start()
ssc.awaitTermination()
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]