[ 
https://issues.apache.org/jira/browse/FLINK-8663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368957#comment-16368957
 ] 

Stephan Ewen commented on FLINK-8663:
-------------------------------------

I don't think this is wrong, it depends on what you assume as the definition of 
{{countWindow(10, 5)}}.

The current definition starts with sliding the windows by 5 from the start. 
That results in the behavior above.

It is as correct or incorrect as other variants - it depends on how the 
behavior is specified.

> Execution of DataStreams result in non functionality of size of Window for 
> countWindow
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-8663
>                 URL: https://issues.apache.org/jira/browse/FLINK-8663
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.0
>         Environment: package com.vnl.stocks;
> import java.util.concurrent.TimeUnit;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.datastream.AllWindowedStream;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.WindowedStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> public class StocksProcessing {
>     
>     public static void main(String[] args) throws Exception {
>             final StreamExecutionEnvironment env =
>                     StreamExecutionEnvironment.getExecutionEnvironment();
>         
>                 //Read from a socket stream at map it to StockPrice objects
>                 DataStream<StockPrice> socketStockStream = env
>                         .socketTextStream("localhost", 9999)
>                         .map(new MapFunction<String, StockPrice>() {
>                             private String[] tokens;
>         
>                             @Override
>                             public StockPrice map(String value) throws 
> Exception {
>                                 tokens = value.split(",");
>                                 return new StockPrice(tokens[0],
>                                     Double.parseDouble(tokens[1]));
>                             }
>                         });
>                 
>                 socketStockStream.print();
>                 //Generate other stock streams
>                 DataStream<StockPrice> SPX_stream = env.addSource(new 
> StockSource("SPX", 10));
>               //  DataStream<StockPrice> FTSE_stream = env.addSource(new 
> StockSource("FTSE", 20));
>               //  DataStream<StockPrice> DJI_stream = env.addSource(new 
> StockSource("DJI", 30));
>               //  DataStream<StockPrice> BUX_stream = env.addSource(new 
> StockSource("BUX", 40));
>         
>                 //Merge all stock streams together
>                 
>                 DataStream<StockPrice> stockStream = 
> socketStockStream.union(SPX_stream/*, FTSE_stream, DJI_stream, BUX_stream*/);
>                 
>                 
>                // stockStream.print();
>                 Thread.sleep(100);
>                                                 
>                 AllWindowedStream<StockPrice, GlobalWindow> windowedStream = 
> stockStream
>                         .countWindowAll(10, 5);
>                         
>                         //.keyBy("symbol")
>                         //.timeWindowAll(Time.of(10, TimeUnit.SECONDS), 
> Time.of(1, TimeUnit.SECONDS));
>                 
>                     //stockStream.keyBy("symbol");
>                     //Compute some simple statistics on a rolling window
>                     DataStream<StockPrice> lowest = 
> windowedStream.maxBy("price");
>                     //DataStream<StockPrice> highest = windowedStream.;
>                     /*DataStream<StockPrice> maxByStock = 
> windowedStream.groupBy("symbol")
>                         .maxBy("price").flatten();
>                     DataStream<StockPrice> rollingMean = 
> windowedStream.groupBy("symbol")
>                         .mapWindow(new WindowMean()).flatten();*/
>                     lowest.print();
>                     
>                       Thread.sleep(100);
>                 /*    
>                     AllWindowedStream<StockPrice, GlobalWindow> 
> windowedStream1 = lowest
>                             .countWindowAll(5,2);
>                     //windowedStream1.print();
>                     DataStream<StockPrice> highest = 
> windowedStream1.minBy("price");*/
>                     //highest.print();
>                     
>                     env.execute("Stock stream");
>         }
> }
>            Reporter: Subham
>            Priority: Major
>
> I used AllWindowedStream<?,GlobalWindow> to process a stream and generate 
> maximum of my window using countWindowAll functions. In this output the size 
> and slide of window works incorrectly.
> Refer below example for the bug
> Initial stream : 1,2,3,4,5,6.........
> Output 1: (Find min for window 10,5) : 1,6,11.....(This is correct)
> However if i calculate maximum, I get output as:
> Output 2: (Find max for window 10,5) : 5,10,15.... (which is wrong)
> Expected: 10,15,20....
>  
> Please resolve this error.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to