[ 
https://issues.apache.org/jira/browse/FLINK-1956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14595769#comment-14595769
 ] 

ASF GitHub Bot commented on FLINK-1956:
---------------------------------------

GitHub user mbalassi opened a pull request:

    https://github.com/apache/flink/pull/855

    [streaming] Properly forward rich window function calls to wrapped fu…

    …nctions
    
    Open and close were not called, because of the wrapping in windowing. I 
should have realized this when fixing a former issue. [1]
    
    [1] https://issues.apache.org/jira/browse/FLINK-1956 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mbalassi/flink richwindowfunctions

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/855.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #855
    
----
commit 861a1037ce547a521c1a54995814e7db3095d430
Author: mbalassi <[email protected]>
Date:   2015-06-22T10:45:33Z

    [streaming] Properly forward rich window function calls to wrapped functions

----


> Runtime context not initialized in RichWindowMapFunction
> --------------------------------------------------------
>
>                 Key: FLINK-1956
>                 URL: https://issues.apache.org/jira/browse/FLINK-1956
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Daniel Bali
>            Assignee: Márton Balassi
>              Labels: context, runtime, streaming, window
>             Fix For: 0.9
>
>
> Trying to access the runtime context in a rich window map function results in 
> an exception. The following snippet demonstrates the bug:
> {code}
>     env.generateSequence(0, 1000)
>     .window(Count.of(10))
>     .mapWindow(new RichWindowMapFunction<Long, Tuple2<Long, Long>>() {
>         @Override
>         public void mapWindow(Iterable<Long> input, Collector<Tuple2<Long, 
> Long>> out) throws Exception {
>             long self = getRuntimeContext().getIndexOfThisSubtask();
>             for (long value : input) {
>                 out.collect(new Tuple2<>(self, value));
>             }
>         }
>     }).flatten().print();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to