Re: issue about the windows slice of stream

2017-06-26 Thread ??????????
Hi  Owen,


Would you like help me check this issue please?
Is it a potential bug please or not?


thanks
Fei Shao




 
---Original---
From: "??"<1427357...@qq.com>
Date: 2017/6/25 21:44:41
To: "user"<user@spark.apache.org>;"dev"<d...@spark.apache.org>;
Subject: Re: issue about the windows slice of stream


Hi all,


Let me add more info about this.
The log showed:
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Time 1498383086000 ms is valid
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Window time = 2000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Slide time = 8000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Zero time = 1498383078000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 
ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = 
[1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 
1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms)
17/06/25 17:31:26 INFO ShuffledDStream: Time 1498383078000 ms is invalid as 
zeroTime is 1498383078000 ms , slideDuration is 1000 ms and difference is 0 ms
17/06/25 17:31:26 DEBUG ShuffledDStream: Time 1498383079000 ms is valid
17/06/25 17:31:26 DEBUG MappedDStream: Time 1498383079000 ms is valid
the slice time is wrong.


For my test code:
lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => { ??=== 
here the windowDuration is 2 seconds and the slideDuration is 8 seconds.
===key  log begin 
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 
ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = 
[1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 
1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms) ??=== here, 
the old RDD slices from 1498383077000 to 1498383084000 . It is 8 seconds. 
Actual it should be 2 seconds.
===key log end
===code in ReducedWindowedDStream.scala begin
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
val reduceF = reduceFunc
val invReduceF = invReduceFunc
val currentTime = validTime
val currentWindow = new Interval(currentTime - windowDuration + 
parent.slideDuration,
currentTime)
val previousWindow = currentWindow - slideDuration
logDebug("Window time = " + windowDuration)
logDebug("Slide time = " + slideDuration)
logDebug("Zero time = " + zeroTime)
logDebug("Current window = " + currentWindow)
logDebug("Previous window = " + previousWindow)
// _
// | previous window |__
// |___| current window | --> Time
// |_|
//
// | | |___ _|
// | |
// V V
// old RDDs new RDDs
//
// Get the RDDs of the reduced values in "old time steps"
val oldRDDs =
reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - 
parent.slideDuration) ??== I think this line is 
"reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime + 
windowDuration - parent.slideDuration)"
logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
val newRDDs =
reducedStream.slice(previousWindow.endTime + parent.slideDuration, 
currentWindow.endTime)??== this line is 
"reducedStream.slice(previousWindow.endTime + windowDuration - 
parent.slideDuration,
currentWindow.endTime)"
logDebug("# new RDDs = " + newRDDs.size)
===code in ReducedWindowedDStream.scala end



Thanks
Fei Shao
 
---Original---
From: "??"<1427357...@qq.com>
Date: 2017/6/24 14:51:52
To: "user"<user@spark.apache.org>;"dev"<d...@spark.apache.org>;
Subject: issue about the windows slice of stream


Hi all,
I found an issue about the windows slice of dstream.
My code is :


ssc = new StreamingContext( conf, Seconds(1))


val content = ssc.socketTextStream('ip','port')
content.countByValueAndWindow( Seconds(2),  Seconds(8)).foreach( println())
The key is that slide is greater than windows.
I checked the output.The result from  foreach( println()) was wrong.
I found the stream was cut apart wrong.
Can I open a JIRA please?


thanks
Fei Shao

Re: issue about the windows slice of stream

2017-06-25 Thread ??????????
Hi all,


Let me add more info about this.
The log showed:
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Time 1498383086000 ms is valid
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Window time = 2000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Slide time = 8000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Zero time = 1498383078000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 
ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = 
[1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 
1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms)
17/06/25 17:31:26 INFO ShuffledDStream: Time 1498383078000 ms is invalid as 
zeroTime is 1498383078000 ms , slideDuration is 1000 ms and difference is 0 ms
17/06/25 17:31:26 DEBUG ShuffledDStream: Time 1498383079000 ms is valid
17/06/25 17:31:26 DEBUG MappedDStream: Time 1498383079000 ms is valid
the slice time is wrong.


For my test code:
lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => { ??=== 
here the windowDuration is 2 seconds and the slideDuration is 8 seconds.
===key  log begin 
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 
ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = 
[1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 
1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms) ??=== here, 
the old RDD slices from 1498383077000 to 1498383084000 . It is 8 seconds. 
Actual it should be 2 seconds.
===key log end
===code in ReducedWindowedDStream.scala begin
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
val reduceF = reduceFunc
val invReduceF = invReduceFunc
val currentTime = validTime
val currentWindow = new Interval(currentTime - windowDuration + 
parent.slideDuration,
currentTime)
val previousWindow = currentWindow - slideDuration
logDebug("Window time = " + windowDuration)
logDebug("Slide time = " + slideDuration)
logDebug("Zero time = " + zeroTime)
logDebug("Current window = " + currentWindow)
logDebug("Previous window = " + previousWindow)
// _
// | previous window |__
// |___| current window | --> Time
// |_|
//
// | | |___ _|
// | |
// V V
// old RDDs new RDDs
//
// Get the RDDs of the reduced values in "old time steps"
val oldRDDs =
reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - 
parent.slideDuration) ??== I think this line is 
"reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime + 
windowDuration - parent.slideDuration)"
logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
val newRDDs =
reducedStream.slice(previousWindow.endTime + parent.slideDuration, 
currentWindow.endTime)??== this line is 
"reducedStream.slice(previousWindow.endTime + windowDuration - 
parent.slideDuration,
currentWindow.endTime)"
logDebug("# new RDDs = " + newRDDs.size)
===code in ReducedWindowedDStream.scala end



Thanks
Fei Shao
 
---Original---
From: "??"<1427357...@qq.com>
Date: 2017/6/24 14:51:52
To: "user"<user@spark.apache.org>;"dev"<d...@spark.apache.org>;
Subject: issue about the windows slice of stream


Hi all,
I found an issue about the windows slice of dstream.
My code is :


ssc = new StreamingContext( conf, Seconds(1))


val content = ssc.socketTextStream('ip','port')
content.countByValueAndWindow( Seconds(2),  Seconds(8)).foreach( println())
The key is that slide is greater than windows.
I checked the output.The result from  foreach( println()) was wrong.
I found the stream was cut apart wrong.
Can I open a JIRA please?


thanks
Fei Shao

issue about the windows slice of stream

2017-06-24 Thread ??????????
Hi all,
I found an issue about the windows slice of dstream.
My code is :


ssc = new StreamingContext( conf, Seconds(1))


val content = ssc.socketTextStream('ip','port')
content.countByValueAndWindow( Seconds(2),  Seconds(8)).foreach( println())
The key is that slide is greater than windows.
I checked the output.The result from  foreach( println()) was wrong.
I found the stream was cut apart wrong.
Can I open a JIRA please?


thanks
Fei Shao