[
https://issues.apache.org/jira/browse/FLINK-2922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14974914#comment-14974914
]
Aljoscha Krettek commented on FLINK-2922:
-----------------------------------------
I don't exactly know what you mean by that but this would still require a
special kind of operator to support that, that's where the queryable window
operator comes in.
This is a mockup of what it would look like in practice:
{code}
DataStream<String> text = env.socketTextStream("localhost", 9999);
DataStream<String> query = env.socketTextStream("localhost", 9998);
WindowStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>
winStream = text
.flatMap(new WordCount.Tokenizer())
.keyBy(0)
.countWindow(10)
.query(query.keyBy(new IdentityKey()))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String,
Integer>, Tuple, GlobalWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(Tuple tuple,
GlobalWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
int sum = 0;
for (Tuple2<String, Integer> val : values) {
sum += val.f1;
}
out.collect(Tuple2.of((String) tuple.getField(0), sum));
}
});
winStream.print();
// WindowResult<QT, T>
// QT = query type
// T = window result type
DataStream<WindowResult<String, Tuple2<String, Integer>>> queryResults =
winStream.getQueryResultStream();
querResults.print();
{code}
> Add Queryable Window Operator
> -----------------------------
>
> Key: FLINK-2922
> URL: https://issues.apache.org/jira/browse/FLINK-2922
> Project: Flink
> Issue Type: Improvement
> Components: Streaming
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> The idea is to provide a window operator that allows to query the current
> window result at any time without discarding the current result.
> For example, a user might have an aggregation window operation with tumbling
> windows of 1 hour. Now, at any time they might be interested in the current
> aggregated value for the currently in-flight hour window.
> The idea is to make the operator a two input operator where normal elements
> arrive on input one while queries arrive on input two. The query stream must
> be keyed by the same key as the input stream. If an input arrives for a key
> the current value for that key is emitted along with the query element so
> that the user can map the result to the query.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)