HI all,
I have code like below: Logger.getLogger("org.apache.spark").setLevel( Level.ERROR) // Logger.getLogger("org.apache.spark.streaming.dstream").setLevel( Level.DEBUG) val conf = new SparkConf().setAppName("testDstream").setMaster("local[4]") // val sc = SparkContext.getOrCreate( conf) val ssc = new StreamingContext(conf, Seconds(1)) ssc.checkpoint( "E:\\spark\\tmp\\cp") val lines = ssc.socketTextStream("127.0.0.1", 9999) lines.foreachRDD( r=>{ println("RDD" + r.id + "begin" + " " + new SimpleDateFormat("yyyy-mm-dd HH:MM:SS").format( new Date())) r.foreach( ele => println(":::" + ele)) println("RDD" + r.id + "end") }) lines.countByValueAndWindow( Seconds(4), Seconds(1)).foreachRDD( s => { // here is key code println( "countByValueAndWindow RDD ID IS : " + s.id + "begin") println("time is " + new SimpleDateFormat("yyyy-mm-dd HH:MM:SS").format( new Date())) s.foreach( e => println("data is " + e._1 + " :" + e._2)) println("countByValueAndWindow RDD ID IS : " + s.id + "end") }) ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate I run the code and use "nc" send the message manually. The speed I input message is about one letter per seconds.I know the time in log does not equal the window duration, but I think they are very near.the output and my comment is :-----------------------------------------------------------RDD1begin 2017-41-27 22:06:16 RDD1end countByValueAndWindow RDD ID IS : 7 begin time is 2017-41-27 22:06:16 countByValueAndWindow RDD ID IS : 7 end RDD8begin 2017-41-27 22:06:17 RDD8end countByValueAndWindow RDD ID IS : 13 begin time is 2017-41-27 22:06:17 countByValueAndWindow RDD ID IS : 13 end RDD14begin 2017-41-27 22:06:18 :::1 RDD14end countByValueAndWindow RDD ID IS : 19 begin time is 2017-41-27 22:06:18 <== data from 22:06:15 -- 22:06:18 is in RDD 14. data is 1 :1 countByValueAndWindow RDD ID IS : 19 end RDD20begin 2017-41-27 22:06:19 :::2 RDD20end countByValueAndWindow RDD ID IS : 25 begin time is 2017-41-27 22:06:19 <== data from 22:06:16 -- 22:06:19 is in RDD 14 ,20. data is 1 :1 data is 2 :1 countByValueAndWindow RDD ID IS : 25 end RDD26begin 2017-41-27 22:06:20 :::3 RDD26end countByValueAndWindow RDD ID IS : 31 begin time is 2017-41-27 22:06:20 <== data from 22:06:17 -- 22:06:20 is in RDD 14 , 20, 26 data is 2 :1 data is 1 :1 data is 3 :1 countByValueAndWindow RDD ID IS : 31 end RDD32begin 2017-41-27 22:06:21 :::4 RDD32end countByValueAndWindow RDD ID IS : 37 begin time is 2017-41-27 22:06:21 <== data from 22:06:18 -- 22:06:21 is in RDD 14 , 20, 26, 32 data is 2 :1 data is 1 :1 data is 4 :1 data is 3 :1 countByValueAndWindow RDD ID IS : 37 end RDD38begin 2017-41-27 22:06:22:::5:::6 RDD38end countByValueAndWindow RDD ID IS : 43 begin time is 2017-41-27 22:06:22<== data from 22:06:19 -- 22:06:22 is in RDD 20, 26, 32,38. Here 14 is out of window. data is 4 :1 data is 5 :1 data is 6 :1 data is 2 :1 data is 3 :1 countByValueAndWindow RDD ID IS : 43 end RDD44begin 2017-41-27 22:06:23 :::7 RDD44end countByValueAndWindow RDD ID IS : 49 begin time is 2017-41-27 22:06:23 <== data from 22:06:29 -- 22:06:23 is in RDD 26, 32,38, 44. Here 20is out of window. data is 5 :1 data is 4 :1 data is 6 :1 data is 7 :1 data is 3 :1 countByValueAndWindow RDD ID IS : 49 end-----------------------------------------------------------I think the foreachRDD function outputs the last RDD calculated by countByValueAndWindow, and the above log validate my idea.Now, I change the red code tolines.countByValueAndWindow( Seconds(4), Seconds(6)).foreachRDD( s => { // here is key code the slide duration is 6 seconds. The log and my comment is below:-----------------------------------------------------------DD1begin 2017-59-27 10:59:12 RDD1end RDD2begin 2017-59-27 10:59:13 :::1 :::2 RDD2end RDD3begin 2017-59-27 10:59:14 :::3 RDD3end RDD4begin 2017-59-27 10:59:15 :::4 RDD4end RDD5begin 2017-59-27 10:59:16 :::5 RDD5end RDD6begin 2017-59-27 10:59:17 RDD6end countByValueAndWindow RDD ID IS : 22 begin time is 2017-59-27 10:59:17 <== I think here is OK, event RDD2 is calculated. data is 4 :1 data is 5 :1 data is 1 :1 data is 2 :1 data is 3 :1 countByValueAndWindow RDD ID IS : 22 end RDD23begin 2017-59-27 10:59:18 :::6 RDD23end RDD24begin 2017-59-27 10:59:19 :::8 :::7 RDD24end RDD25begin 2017-59-27 10:59:20 :::9 RDD25end RDD26begin 2017-59-27 10:59:21 :::0 RDD26end RDD27begin 2017-59-27 10:59:22 :::- RDD27end RDD28begin 2017-59-27 10:59:23 :::p RDD28end countByValueAndWindow RDD ID IS : 43 begin time is 2017-59-27 10:59:23 <==the data between 10:59:20 --10:59:23 should be RDD 25, 26, 27, 28. but the data is wrong. data is 6 :1 data is 2 :1 data is 9 :1 data is - :1 data is 1 :1 data is 8 :1 data is p :1 data is 0 :1 data is 7 :1 countByValueAndWindow RDD ID IS : 43 end RDD44begin 2017-59-27 10:59:24 :::o RDD44end RDD46begin 2017-59-27 10:59:25 :::i RDD46end RDD47begin 2017-59-27 10:59:26 :::u RDD47end RDD48begin 2017-59-27 10:59:27 :::y RDD48end RDD49begin 2017-59-27 10:59:28 :::t RDD49end RDD50begin 2017-59-27 10:59:29 :::r RDD50end countByValueAndWindow RDD ID IS : 65 begin time is 2017-59-27 10:59:29<==here is wrong too. data is 6 :1 data is 2 :1 data is r :1 data is 8 :1 data is t :1 data is i :1 data is y :1 data is u :1 data is 1 :1 data is 7 :1 data is o :1 countByValueAndWindow RDD ID IS : 65 end----------------------------------------------------------- Would you like tell me why the log of second time is not same with my understanding please?This issue besets me several days.ThanksFei Shao