Hello Sanjay,

Yes, your understanding of lazy semantics is correct. But ideally
every batch should read based on the batch interval provided in the
StreamingContext. Can you open a JIRA on this?

On Mon, Mar 24, 2014 at 7:45 AM, Sanjay Awatramani
<sanjay_a...@yahoo.com> wrote:
> Hi All,
>
> I found out why this problem exists. Consider the following scenario:
> - a DStream is created from any source. (I've checked with file and socket)
> - No actions are applied to this DStream
> - Sliding Window operation is applied to this DStream and an action is
> applied to the sliding window.
> In this case, Spark will not even read the input stream in the batch in
> which the sliding interval isn't a multiple of batch interval. Put another
> way, it won't read the input when it doesn't have to apply the window
> function. This is happening because all transformations in Spark are lazy.
>
> How to fix this or workaround it (see line#3):
> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
> Duration(1 * 60 * 1000));
> JavaDStream<String> inputStream = stcObj.textFileStream("/Input");
> inputStream.print(); // This is the workaround
> JavaDStream<String> objWindow = inputStream.window(new
> Duration(windowLen*60*1000), new Duration(slideInt*60*1000));
> objWindow.dstream().saveAsTextFiles("/Output", "");
>
>
> The "Window operations" example on the streaming guide implies that Spark
> will read the stream in every batch, which is not happening because of the
> lazy transformations.
> Wherever sliding window would be used, in most of the cases, no actions will
> be taken on the pre-window batch, hence my gut feeling was that Streaming
> would read every batch if any actions are being taken in the windowed
> stream.
>
> Regards,
> Sanjay
>
>
> On Friday, 21 March 2014 8:06 PM, Sanjay Awatramani <sanjay_a...@yahoo.com>
> wrote:
> Hi,
>
> I want to run a map/reduce process over last 5 seconds of data, every 4
> seconds. This is quite similar to the sliding window pictorial example under
> Window Operations section on
> http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html
> .
>
> The RDDs returned by window transformation function are incorrect in my
> case. To investigate this further, I ran a series of examples with varying
> values of window length & slide interval. Summary of the test results:
> (window length, slide interval) -> result
> (3,1) -> success
> (4,2) -> success
> (3,2) -> fail
> (4,3) -> fail
> (5,4) -> fail
> (5,2) -> fail
>
> The only condition mentioned in the doc is that the two values(5 & 4) should
> be multiples of batch interval(1 in my case) and obviously, I get a run time
> error if I attempt to violate this condition. Looking at my results, it
> seems that failures result when the slide interval isn't a multiple of
> window length.
>
> My code:
> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
> Duration(1 * 60 * 1000));
> JavaDStream<String> inputStream = stcObj.textFileStream("/Input");
> JavaDStream<String> objWindow = inputStream.window(new
> Duration(windowLen*60*1000), new Duration(slideInt*60*1000));
> objWindow.dstream().saveAsTextFiles("/Output", "");
>
> Detailed results:
> (3,1) -> success
> @t_0: [inputStream's RDD@t_0]
> @t_1: [inputStream's RDD@t_0,1]
> @t_2: [inputStream's RDD@t_0,1,2]
> @t_3: [inputStream's RDD@t_1,2,3]
> @t_4: [inputStream's RDD@t_2,3,4]
> @t_5: [inputStream's RDD@t_3,4,5]
>
> (4,2) -> success
> @t_0: nothing
> @t_1: [inputStream's RDD@t_0,1]
> @t_2: nothing
> @t_3: [inputStream's RDD@t_0,1,2,3]
> @t_4: nothing
> @t_5: [inputStream's RDD@t_2,3,4,5]
>
> (3,2) -> fail
> @t_0: nothing
> @t_1: [inputStream's RDD@t_0,1]
> @t_2: nothing
> @t_3: [inputStream's RDD@t_2,3]    //(expected RDD@t_1,2,3)
> @t_4: nothing
> @t_5: [inputStream's RDD@t_4,5]    //(expected RDD@t_3,4,5)
>
> (4,3) -> fail
> @t_0: nothing
> @t_1: nothing
> @t_2: [inputStream's RDD@t_0,1,2]
> @t_3: nothing
> @t_4: nothing
> @t_5: [inputStream's RDD@t_3,4,5]    //(expected RDD@t_2,3,4,5)
>
> (5,4) -> fail
> @t_0: nothing
> @t_1: nothing
> @t_2: nothing
> @t_3: [inputStream's RDD@t_0,1,2,3]
> @t_4: nothing
> @t_5: nothing
> @t_6: nothing
> @t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)
>
> (5,2) -> fail
> @t_0: nothing
> @t_1: [inputStream's RDD@t_0,1]
> @t_2: nothing
> @t_3: [inputStream's RDD@t_0,1,2,3]
> @t_4: nothing
> @t_5: [inputStream's RDD@t_2,3,4,5]    //(expected RDD@t_1,2,3,4,5)
> @t_6: nothing
> @t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)
>
> I have run all the above examples twice to be sure !
> I believe either my understanding of sliding window mechanism is incorrect
> or there is a problem in the sliding window mechanism.
>
> Regards,
> Sanjay
>
>

Reply via email to