Repository: spark Updated Branches: refs/heads/master 2242ab31e -> d70a07689
[SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time tdas https://issues.apache.org/jira/browse/SPARK-7326 The problem most likely resides in DStream.slice() implementation, as shown below. def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { if (!isInitialized) { throw new SparkException(this + " has not been initialized") } if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") } if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") } val alignedToTime = toTime.floor(slideDuration, zeroTime) val alignedFromTime = fromTime.floor(slideDuration, zeroTime) logInfo("Slicing from " + fromTime + " to " + toTime + " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { if (time >= zeroTime) getOrCompute(time) else None }) } Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation. The fix is to add a new floor() function in Time.scala to respect the zeroTime while performing the floor : def floor(that: Duration, zeroTime: Time): Time = { val t = that.milliseconds new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds) } And then change the DStream.slice to call this new floor function by passing in its zeroTime. val alignedToTime = toTime.floor(slideDuration, zeroTime) val alignedFromTime = fromTime.floor(slideDuration, zeroTime) This way the alignedToTime and alignedFromTime are *really* aligned in respect to zeroTime whose value is not really a 0. Author: Wesley Miao <[email protected]> Author: Wesley <[email protected]> Closes #5871 from wesleymiao/spark-7326 and squashes the following commits: 82a4d8c [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream dosen't work all the time 48b4dc0 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time 6ade399 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time 2611745 [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d70a0768 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d70a0768 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d70a0768 Branch: refs/heads/master Commit: d70a076892e0677acceccaba665908cdf664f1b4 Parents: 2242ab3 Author: Wesley Miao <[email protected]> Authored: Mon May 11 12:20:06 2015 +0100 Committer: Sean Owen <[email protected]> Committed: Mon May 11 12:20:06 2015 +0100 ---------------------------------------------------------------------- .../scala/org/apache/spark/streaming/Time.scala | 5 +++++ .../spark/streaming/dstream/DStream.scala | 22 +++++++++++++------- .../org/apache/spark/streaming/TimeSuite.scala | 3 +++ 3 files changed, 22 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d70a0768/streaming/src/main/scala/org/apache/spark/streaming/Time.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala index 42c4967..92cfd7d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala @@ -63,6 +63,11 @@ case class Time(private val millis: Long) { new Time((this.millis / t) * t) } + def floor(that: Duration, zeroTime: Time): Time = { + val t = that.milliseconds + new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds) + } + def isMultipleOf(that: Duration): Boolean = (this.millis % that.milliseconds == 0) http://git-wip-us.apache.org/repos/asf/spark/blob/d70a0768/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index f1f8a70..7092a3d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -763,16 +763,22 @@ abstract class DStream[T: ClassTag] ( if (!isInitialized) { throw new SparkException(this + " has not been initialized") } - if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { - logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" - + slideDuration + ")") + + val alignedToTime = if ((toTime - zeroTime).isMultipleOf(slideDuration)) { + toTime + } else { + logWarning("toTime (" + toTime + ") is not a multiple of slideDuration (" + + slideDuration + ")") + toTime.floor(slideDuration, zeroTime) } - if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { - logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" - + slideDuration + ")") + + val alignedFromTime = if ((fromTime - zeroTime).isMultipleOf(slideDuration)) { + fromTime + } else { + logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + + slideDuration + ")") + fromTime.floor(slideDuration, zeroTime) } - val alignedToTime = toTime.floor(slideDuration) - val alignedFromTime = fromTime.floor(slideDuration) logInfo("Slicing from " + fromTime + " to " + toTime + " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") http://git-wip-us.apache.org/repos/asf/spark/blob/d70a0768/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala index 5579ac3..e6a0165 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala @@ -69,6 +69,9 @@ class TimeSuite extends TestSuiteBase { assert(new Time(1200).floor(new Duration(200)) == new Time(1200)) assert(new Time(199).floor(new Duration(200)) == new Time(0)) assert(new Time(1).floor(new Duration(1)) == new Time(1)) + assert(new Time(1350).floor(new Duration(200), new Time(50)) == new Time(1250)) + assert(new Time(1350).floor(new Duration(200), new Time(150)) == new Time(1350)) + assert(new Time(1350).floor(new Duration(200), new Time(200)) == new Time(1200)) } test("isMultipleOf") { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
