[ 
https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16060881#comment-16060881
 ] 

Brice Bingman commented on FLINK-6990:
--------------------------------------

[~Cody] I chose the smallest scenario that shows the slowdown on my machine.  
If you are on a machine with better resources, you may need to increase the 
event input rate or increase the window size to see the slowdown.  I noticed if 
I reduced the window size to 1 minute, the problem goes away.

[~fhueske] 
1) So on each slide of a time window, the data in that window is replicated and 
put into the new window?  That does seem like a lot of overhead.  Why is 
everything replicated?  Is there any way to disable that?
2) While this example could be implemented in a ReduceFunction, I also need to 
support more complicated calculations such as a linear regression or an 
exponential moving average where you would need access to all the data in the 
window.  Which is why I've been stress testing with the WindowFunction.

Perhaps there should be a more performant time window that doesn't replicate 
data on each slide.  Similar to how the count window is implemented.

> Poor performance with Sliding Time Windows
> ------------------------------------------
>
>                 Key: FLINK-6990
>                 URL: https://issues.apache.org/jira/browse/FLINK-6990
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API, Streaming
>    Affects Versions: 1.3.0
>         Environment: OSX 10.11.4
> 2.8 GHz Intel Core i7
> 16 GB 1600 MHz DDR3
>            Reporter: Brice Bingman
>
> I'm experiencing poor performance when using sliding time windows.  Here is a 
> simple example that performs poorly for me:
> {code:java}
> public class FlinkPerfTest {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         //Streaming 10,000 events per second
>         see.addSource(new SourceFunction<TestObject>() {
>             transient ScheduledExecutorService executor;
>             @Override
>             public synchronized void run(final SourceContext<TestObject> ctx) 
> throws Exception {
>                 executor = Executors.newSingleThreadScheduledExecutor();
>                 executor.scheduleAtFixedRate(new Runnable() {
>                     @Override
>                     public void run() {
>                         for (int k = 0; k < 10; k++) {
>                             for (int i = 0; i < 1000; i++) {
>                                 TestObject obj = new TestObject();
>                                 obj.setKey(k);
>                                 ctx.collect(obj);
>                             }
>                         }
>                     }
>                 }, 0, 1, TimeUnit.SECONDS);
>                 this.wait();
>             }
>             @Override
>             public synchronized void cancel() {
>                 executor.shutdown();
>                 this.notify();
>             }
>         }).keyBy("key")
>         .window(SlidingProcessingTimeWindows.of(Time.minutes(10), 
> Time.seconds(1))).apply(new WindowFunction<TestObject, String, Tuple, 
> TimeWindow>() {
>             @Override
>             public void apply(Tuple key, TimeWindow window, 
> Iterable<TestObject> input, Collector<String> out) throws Exception {
>                 int count = 0;
>                 for (Object obj : input) {
>                     count++;
>                 }
>                 out.collect(key.getField(0) + ": " + count);
>             }
>         })
>         .print();
>         see.execute();
>     }
>     public static class TestObject {
>         private Integer key;
>         public Integer getKey() {
>             return key;
>         }
>         public void setKey(Integer key) {
>             this.key = key;
>         }
>     }
> }
> {code}
> When running this, flink periodically pauses for long periods of time.  I 
> would expect a steady stream of output at 1 second intervals.  For 
> comparison, you can switch to a count window of similar size which peforms 
> just fine:
> {code:java}
>    .countWindow(600000, 1000).apply(new 
> WindowFunction<FlinkPerfTest.TestObject, String, Tuple, GlobalWindow>() {
>                     @Override
>                     public void apply(Tuple key, GlobalWindow window, 
> Iterable<TestObject> input, Collector<String> out) throws Exception {
>                         int count = 0;
>                         for (Object obj : input) {
>                             count++;
>                         }
>                         out.collect(key.getField(0) + ": " + count);
>                     }
>                 })
> {code}
> I would expect the sliding time window to perform similarly to a count 
> window.  The sliding time window also uses significantly more cpu and memory 
> than the count window.  I would also expect resource consumption to be 
> similar.
> A possible cause could be that the SystemProcessingTimeService.TriggerTask is 
> locking with the checkpointLock which acts like a global lock.  There should 
> be a lock per key or preferably a lock-less solution.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to