[
https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16062338#comment-16062338
]
Jark Wu commented on FLINK-6990:
--------------------------------
[~BriceBingman] [~fhueske] the poor performance of sliding time window is
obvious in our production. We have planed to optimize it with window pane
optimization. I have created a issue FLINK-7001 about it and will attach a more
detail design doc on it.
> Poor performance with Sliding Time Windows
> ------------------------------------------
>
> Key: FLINK-6990
> URL: https://issues.apache.org/jira/browse/FLINK-6990
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API, Streaming
> Affects Versions: 1.3.0
> Environment: OSX 10.11.4
> 2.8 GHz Intel Core i7
> 16 GB 1600 MHz DDR3
> Reporter: Brice Bingman
>
> I'm experiencing poor performance when using sliding time windows. Here is a
> simple example that performs poorly for me:
> {code:java}
> public class FlinkPerfTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment see =
> StreamExecutionEnvironment.getExecutionEnvironment();
> //Streaming 10,000 events per second
> see.addSource(new SourceFunction<TestObject>() {
> transient ScheduledExecutorService executor;
> @Override
> public synchronized void run(final SourceContext<TestObject> ctx)
> throws Exception {
> executor = Executors.newSingleThreadScheduledExecutor();
> executor.scheduleAtFixedRate(new Runnable() {
> @Override
> public void run() {
> for (int k = 0; k < 10; k++) {
> for (int i = 0; i < 1000; i++) {
> TestObject obj = new TestObject();
> obj.setKey(k);
> ctx.collect(obj);
> }
> }
> }
> }, 0, 1, TimeUnit.SECONDS);
> this.wait();
> }
> @Override
> public synchronized void cancel() {
> executor.shutdown();
> this.notify();
> }
> }).keyBy("key")
> .window(SlidingProcessingTimeWindows.of(Time.minutes(10),
> Time.seconds(1))).apply(new WindowFunction<TestObject, String, Tuple,
> TimeWindow>() {
> @Override
> public void apply(Tuple key, TimeWindow window,
> Iterable<TestObject> input, Collector<String> out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> .print();
> see.execute();
> }
> public static class TestObject {
> private Integer key;
> public Integer getKey() {
> return key;
> }
> public void setKey(Integer key) {
> this.key = key;
> }
> }
> }
> {code}
> When running this, flink periodically pauses for long periods of time. I
> would expect a steady stream of output at 1 second intervals. For
> comparison, you can switch to a count window of similar size which peforms
> just fine:
> {code:java}
> .countWindow(600000, 1000).apply(new
> WindowFunction<FlinkPerfTest.TestObject, String, Tuple, GlobalWindow>() {
> @Override
> public void apply(Tuple key, GlobalWindow window,
> Iterable<TestObject> input, Collector<String> out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> {code}
> I would expect the sliding time window to perform similarly to a count
> window. The sliding time window also uses significantly more cpu and memory
> than the count window. I would also expect resource consumption to be
> similar.
> A possible cause could be that the SystemProcessingTimeService.TriggerTask is
> locking with the checkpointLock which acts like a global lock. There should
> be a lock per key or preferably a lock-less solution.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)