[
https://issues.apache.org/jira/browse/FLINK-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887802#comment-15887802
]
ASF GitHub Bot commented on FLINK-5157:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2946#discussion_r103425262
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
---
@@ -642,31 +980,71 @@ public AllWindowedStream(DataStream<T> input,
* @return The data stream that is the result of applying the window
function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R,
W> function) {
+ String callLocation = Utils.getCallLocationName();
+ function = input.getExecutionEnvironment().clean(function);
TypeInformation<R> resultType =
TypeExtractor.getUnaryOperatorReturnType(
function, AllWindowFunction.class, true, true,
getInputType(), null, false);
-
- return apply(function, resultType);
+ return apply(new InternalIterableAllWindowFunction<>(function),
resultType, callLocation);
}
/**
* Applies the given window function to each window. The window
function is called for each
- * evaluation of the window for each key individually. The output of
the window function is
+ * evaluation of the window. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>
* Not that this function requires that all data in the windows is
buffered until the window
* is evaluated, as the function provides no means of incremental
aggregation.
*
* @param function The window function.
- * @param resultType Type information for the result type of the window
function
* @return The data stream that is the result of applying the window
function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R,
W> function, TypeInformation<R> resultType) {
+ String callLocation = Utils.getCallLocationName();
+ function = input.getExecutionEnvironment().clean(function);
+ return apply(new InternalIterableAllWindowFunction<>(function),
resultType, callLocation);
+ }
- //clean the closure
+ /**
+ * Applies the given window function to each window. The window
function is called for each
+ * evaluation of the window. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>
+ * Not that this function requires that all data in the windows is
buffered until the window
+ * is evaluated, as the function provides no means of incremental
aggregation.
+ *
+ * @param function The process window function.
+ * @return The data stream that is the result of applying the window
function to the window.
+ */
+ public <R> SingleOutputStreamOperator<R>
process(ProcessAllWindowFunction<T, R, W> function) {
--- End diff --
This and the following should be `@PublicEvolving`
> Extending AllWindow Function Metadata
> -------------------------------------
>
> Key: FLINK-5157
> URL: https://issues.apache.org/jira/browse/FLINK-5157
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API, Streaming
> Reporter: Ventura Del Monte
> Assignee: Ventura Del Monte
>
> Following the logic behind [1,2], ProcessAllWindowFunction can be introduced
> in Flink and AllWindowedStream can be extended in order to support them.
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
> [2] https://issues.apache.org/jira/browse/FLINK-4997
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)