[ 
https://issues.apache.org/jira/browse/FLINK-2922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14974914#comment-14974914
 ] 

Aljoscha Krettek commented on FLINK-2922:
-----------------------------------------

I don't exactly know what you mean by that but this would still require a 
special kind of operator to support that, that's where the queryable window 
operator comes in.

This is a mockup of what it would look like in practice:
{code}
DataStream<String> text = env.socketTextStream("localhost", 9999);
DataStream<String> query = env.socketTextStream("localhost", 9998);

WindowStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> 
winStream = text
        .flatMap(new WordCount.Tokenizer())
        .keyBy(0)
        .countWindow(10)
        .query(query.keyBy(new IdentityKey()))
        .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple, GlobalWindow>() {
            private static final long serialVersionUID = 1L;

            @Override
            public void apply(Tuple tuple,
                    GlobalWindow window,
                    Iterable<Tuple2<String, Integer>> values,
                    Collector<Tuple2<String, Integer>> out) throws Exception {
                int sum = 0;
                for (Tuple2<String, Integer> val : values) {
                    sum += val.f1;
                }
                out.collect(Tuple2.of((String) tuple.getField(0), sum));
            }
        });

winStream.print();

// WindowResult<QT, T> 
// QT = query type
// T = window result type
DataStream<WindowResult<String, Tuple2<String, Integer>>> queryResults = 
winStream.getQueryResultStream();
querResults.print();
{code}

> Add Queryable Window Operator
> -----------------------------
>
>                 Key: FLINK-2922
>                 URL: https://issues.apache.org/jira/browse/FLINK-2922
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> The idea is to provide a window operator that allows to query the current 
> window result at any time without discarding the current result.
> For example, a user might have an aggregation window operation with tumbling 
> windows of 1 hour. Now, at any time they might be interested in the current 
> aggregated value for the currently in-flight hour window.
> The idea is to make the operator a two input operator where normal elements 
> arrive on input one while queries arrive on input two. The query stream must 
> be keyed by the same key as the input stream. If an input arrives for a key 
> the current value for that key is emitted along with the query element so 
> that the user can map the result to the query.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to