Hi All, I am trying to "test" a very simple custom receiver and am a little puzzled.
Using Spark 2.2.0 shell on my laptop, I am running the code below. I was expecting the code to timeout since my timeout wait period is 1 ms and I have a sleep in the class that is much more (1200 ms). Is this normal? Or am I interpreting something incorrectly? import org.apache.spark.streaming.Seconds import org.apache.spark.streaming._ class CustomReceiver extends org.apache.spark.streaming.receiver.Receiver[String](org.apache.spark.storage.StorageLevel.MEMORY_ONLY) { def onStart() { new Thread("CustomReceiver") { override def run() { receive() } }.start() } def onStop() {} private def receive() { val hostname = java.net.InetAddress.getLocalHost() val time = java.util.Calendar.getInstance.getTime var counter = 0 while (isStarted && !isStopped) { counter += 1 store(s"host = ${hostname} time = ${time} counter = ${counter}") Thread.sleep(1200) } } } val ssc = new StreamingContext(sc, Seconds(1)) val words = ssc.receiverStream(new CustomReceiver()) words.print() ssc.start() ssc.awaitTerminationOrTimeout(1)