[FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fad201bf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fad201bf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fad201bf Branch: refs/heads/table-retraction Commit: fad201bfb0b1f2757f68f7b3ffaf97a486eb93e8 Parents: 5c37e55 Author: Seth Wiesman <[email protected]> Authored: Sun Mar 5 23:07:18 2017 -0500 Committer: Aljoscha Krettek <[email protected]> Committed: Sat Mar 25 16:59:17 2017 +0100 ---------------------------------------------------------------------- .../FoldApplyProcessAllWindowFunction.java | 23 +- .../FoldApplyProcessWindowFunction.java | 23 +- .../InternalProcessApplyAllWindowContext.java | 57 +++++ .../InternalProcessApplyWindowContext.java | 58 +++++ .../windowing/ProcessAllWindowFunction.java | 22 ++ .../windowing/ProcessWindowFunction.java | 24 +- .../ReduceApplyProcessAllWindowFunction.java | 23 +- .../ReduceApplyProcessWindowFunction.java | 21 +- .../windowing/AccumulatingKeyedTimePanes.java | 75 ++++++- .../windowing/EvictingWindowOperator.java | 62 +++--- .../operators/windowing/WindowOperator.java | 220 +++++++++++++++---- ...ternalAggregateProcessAllWindowFunction.java | 28 ++- .../InternalAggregateProcessWindowFunction.java | 28 ++- .../InternalIterableAllWindowFunction.java | 7 +- ...nternalIterableProcessAllWindowFunction.java | 31 ++- .../InternalIterableProcessWindowFunction.java | 24 +- .../InternalIterableWindowFunction.java | 7 +- .../InternalProcessAllWindowContext.java | 57 +++++ .../functions/InternalProcessWindowContext.java | 58 +++++ .../InternalSingleValueAllWindowFunction.java | 7 +- ...rnalSingleValueProcessAllWindowFunction.java | 29 ++- ...nternalSingleValueProcessWindowFunction.java | 24 +- .../InternalSingleValueWindowFunction.java | 7 +- .../functions/InternalWindowFunction.java | 26 ++- .../FoldApplyProcessWindowFunctionTest.java | 82 ++++++- .../functions/InternalWindowFunctionTest.java | 49 +++-- .../RegularWindowOperatorContractTest.java | 12 +- .../windowing/WindowOperatorContractTest.java | 158 ++++++++++--- .../function/ProcessAllWindowFunction.scala | 20 ++ .../scala/function/ProcessWindowFunction.scala | 20 ++ .../ScalaProcessWindowFunctionWrapper.scala | 31 +++ 31 files changed, 1091 insertions(+), 222 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java index 5ac6766..8e8e52c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java @@ -50,6 +50,7 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R> private TypeSerializer<ACC> accSerializer; private final TypeInformation<ACC> accTypeInformation; private transient ACC initialValue; + private transient InternalProcessApplyAllWindowContext<ACC, R, W> ctx; public FoldApplyProcessAllWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> windowFunction, TypeInformation<ACC> accTypeInformation) { this.windowFunction = windowFunction; @@ -70,6 +71,9 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R> ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); initialValue = accSerializer.deserialize(in); + + ctx = new InternalProcessApplyAllWindowContext<>(windowFunction); + } @Override @@ -92,12 +96,19 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R> result = foldFunction.fold(result, val); } - windowFunction.process(windowFunction.new Context() { - @Override - public W window() { - return context.window(); - } - }, Collections.singletonList(result), out); + this.ctx.window = context.window(); + this.ctx.windowState = context.windowState(); + this.ctx.globalState = context.globalState(); + + windowFunction.process(ctx, Collections.singletonList(result), out); + } + + @Override + public void clear(final Context context) throws Exception { + this.ctx.window = context.window(); + this.ctx.windowState = context.windowState(); + this.ctx.globalState = context.globalState(); + windowFunction.clear(ctx); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java index e1bc759..073a2f3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java @@ -50,6 +50,7 @@ public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R> private TypeSerializer<ACC> accSerializer; private final TypeInformation<ACC> accTypeInformation; private transient ACC initialValue; + private transient InternalProcessApplyWindowContext<ACC, R, K, W> ctx; public FoldApplyProcessWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> accTypeInformation) { this.windowFunction = windowFunction; @@ -70,6 +71,8 @@ public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R> ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); initialValue = accSerializer.deserialize(in); + + ctx = new InternalProcessApplyWindowContext<>(windowFunction); } @Override @@ -85,19 +88,25 @@ public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R> } @Override - public void process(K key, final Context context, Iterable<T> values, Collector<R> out) throws Exception { + public void process(K key, Context context, Iterable<T> values, Collector<R> out) throws Exception { ACC result = accSerializer.copy(initialValue); for (T val : values) { result = foldFunction.fold(result, val); } - windowFunction.process(key, windowFunction.new Context() { - @Override - public W window() { - return context.window(); - } - }, Collections.singletonList(result), out); + this.ctx.window = context.window(); + this.ctx.windowState = context.windowState(); + this.ctx.globalState = context.globalState(); + windowFunction.process(key, ctx, Collections.singletonList(result), out); + } + + @Override + public void clear(final Context context) throws Exception{ + this.ctx.window = context.window(); + this.ctx.windowState = context.windowState(); + this.ctx.globalState = context.globalState(); + windowFunction.clear(ctx); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java new file mode 100644 index 0000000..e1a0a98 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.streaming.api.windowing.windows.Window; + +/** + * Internal reusable context wrapper. + * + * @param <IN> The type of the input value. + * @param <OUT> The type of the output value. + * @param <W> The type of the window. + */ +@Internal +public class InternalProcessApplyAllWindowContext<IN, OUT, W extends Window> + extends ProcessAllWindowFunction<IN, OUT, W>.Context { + + W window; + KeyedStateStore windowState; + KeyedStateStore globalState; + + InternalProcessApplyAllWindowContext(ProcessAllWindowFunction<IN, OUT, W> function) { + function.super(); + } + + @Override + public W window() { + return window; + } + + @Override + public KeyedStateStore windowState() { + return windowState; + } + + @Override + public KeyedStateStore globalState() { + return globalState; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java new file mode 100644 index 0000000..f547adc --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.streaming.api.windowing.windows.Window; + +/** + * Internal reusable context wrapper. + * + * @param <IN> The type of the input value. + * @param <OUT> The type of the output value. + * @param <KEY> The type of the window key. + * @param <W> The type of the window. + */ +@Internal +public class InternalProcessApplyWindowContext<IN, OUT, KEY, W extends Window> + extends ProcessWindowFunction<IN, OUT, KEY, W>.Context { + + W window; + KeyedStateStore windowState; + KeyedStateStore globalState; + + InternalProcessApplyWindowContext(ProcessWindowFunction<IN, OUT, KEY, W> function) { + function.super(); + } + + @Override + public W window() { + return window; + } + + @Override + public KeyedStateStore windowState() { + return windowState; + } + + @Override + public KeyedStateStore globalState() { + return globalState; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java index 622e020..f49aa27 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @@ -48,6 +49,14 @@ public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implem public abstract void process(Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception; /** + * Deletes any state in the {@code Context} when the Window is purged. + * + * @param context The context to which the window is being evaluated + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + public void clear(Context context) throws Exception {} + + /** * The context holding window metadata */ public abstract class Context { @@ -55,5 +64,18 @@ public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implem * @return The window that is being evaluated. */ public abstract W window(); + + /** + * State accessor for per-key and per-window state. + * + * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up + * by implementing {@link ProcessWindowFunction#clear(ProcessWindowFunction.Context)}. + */ + public abstract KeyedStateStore windowState(); + + /** + * State accessor for per-key global state. + */ + public abstract KeyedStateStore globalState(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java index 9c48e24..bcefaf7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @@ -50,12 +51,33 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> impl public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception; /** + * Deletes any state in the {@code Context} when the Window is purged. + * + * @param context The context to which the window is being evaluated + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + public void clear(Context context) throws Exception {} + + /** * The context holding window metadata */ - public abstract class Context { + public abstract class Context implements java.io.Serializable { /** * @return The window that is being evaluated. */ public abstract W window(); + + /** + * State accessor for per-key and per-window state. + * + * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up + * by implementing {@link ProcessWindowFunction#clear(Context)}. + */ + public abstract KeyedStateStore windowState(); + + /** + * State accessor for per-key global state. + */ + public abstract KeyedStateStore globalState(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java index 142c71e..4c54c94 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java @@ -35,6 +35,7 @@ public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R> private final ReduceFunction<T> reduceFunction; private final ProcessAllWindowFunction<T, R, W> windowFunction; + private transient InternalProcessApplyAllWindowContext<T, R, W> ctx; public ReduceApplyProcessAllWindowFunction(ReduceFunction<T> reduceFunction, ProcessAllWindowFunction<T, R, W> windowFunction) { this.windowFunction = windowFunction; @@ -52,17 +53,27 @@ public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R> curr = reduceFunction.reduce(curr, val); } } - windowFunction.process(windowFunction.new Context() { - @Override - public W window() { - return context.window(); - } - }, Collections.singletonList(curr), out); + + this.ctx.window = context.window(); + this.ctx.windowState = context.windowState(); + this.ctx.globalState = context.globalState(); + + windowFunction.process(ctx, Collections.singletonList(curr), out); + } + + @Override + public void clear(final Context context) throws Exception { + this.ctx.window = context.window(); + this.ctx.windowState = context.windowState(); + this.ctx.globalState = context.globalState(); + + windowFunction.clear(ctx); } @Override public void open(Configuration configuration) throws Exception { FunctionUtils.openFunction(this.windowFunction, configuration); + ctx = new InternalProcessApplyAllWindowContext<>(windowFunction); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java index 9ea1fdf..1af783a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java @@ -35,6 +35,7 @@ public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R> private final ReduceFunction<T> reduceFunction; private final ProcessWindowFunction<T, R, K, W> windowFunction; + private transient InternalProcessApplyWindowContext<T, R, K, W> ctx; public ReduceApplyProcessWindowFunction(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> windowFunction) { this.windowFunction = windowFunction; @@ -52,17 +53,25 @@ public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R> curr = reduceFunction.reduce(curr, val); } } - windowFunction.process(k, windowFunction.new Context() { - @Override - public W window() { - return context.window(); - } - }, Collections.singletonList(curr), out); + + this.ctx.window = context.window(); + this.ctx.windowState = context.windowState(); + this.ctx.globalState = context.globalState(); + windowFunction.process(k, ctx, Collections.singletonList(curr), out); + } + + @Override + public void clear(final Context context) throws Exception { + this.ctx.window = context.window(); + this.ctx.windowState = context.windowState(); + this.ctx.globalState = context.globalState(); + windowFunction.clear(ctx); } @Override public void open(Configuration configuration) throws Exception { FunctionUtils.openFunction(this.windowFunction, configuration); + ctx = new InternalProcessApplyWindowContext<>(windowFunction); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java index 87c5aca..d58b5cc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java @@ -19,6 +19,17 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.util.UnionIterator; @@ -38,6 +49,8 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function; + private final AccumulatingKeyedTimePanesContext context; + /** * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */ private long evaluationPass = 1L; @@ -47,6 +60,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) { this.keySelector = keySelector; this.function = function; + this.context = new AccumulatingKeyedTimePanesContext(); } // ------------------------------------------------------------------------ @@ -67,13 +81,15 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) { Key key = entry.getKey(); operator.setCurrentKey(key); - function.apply(entry.getKey(), window, entry.getValue(), out); + context.globalState = operator.getKeyedStateStore(); + + function.process(entry.getKey(), window, context, entry.getValue(), out); } } else { // general code path for multi-pane case WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>( - function, window, out, operator); + function, window, out, operator, context); traverseAllPanes(evaluator, evaluationPass); } @@ -95,17 +111,19 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed private final TimeWindow window; private final AbstractStreamOperator<Result> contextOperator; - + private Key currentKey; + private AccumulatingKeyedTimePanesContext context; WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window, - Collector<Result> out, AbstractStreamOperator<Result> contextOperator) { + Collector<Result> out, AbstractStreamOperator<Result> contextOperator, AccumulatingKeyedTimePanesContext context) { this.function = function; this.out = out; this.unionIterator = new UnionIterator<>(); this.window = window; this.contextOperator = contextOperator; + this.context = context; } @@ -123,7 +141,8 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed @Override public void keyDone() throws Exception { contextOperator.setCurrentKey(currentKey); - function.apply(currentKey, window, unionIterator, out); + context.globalState = contextOperator.getKeyedStateStore(); + function.process(currentKey, window, context, unionIterator, out); } } @@ -136,6 +155,52 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY; } + private static class ThrowingKeyedStateStore implements KeyedStateStore { + @Override + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows."); + } + + @Override + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows."); + } + + @Override + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows."); + } + + @Override + public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) { + throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows."); + } + + @Override + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { + throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows."); + } + } + + private static class AccumulatingKeyedTimePanesContext implements InternalWindowFunction.InternalWindowContext { + KeyedStateStore globalState; + KeyedStateStore throwingStore; + + public AccumulatingKeyedTimePanesContext() { + this.throwingStore = new ThrowingKeyedStateStore(); + } + + @Override + public KeyedStateStore windowState() { + return throwingStore; + } + + @Override + public KeyedStateStore globalState() { + return globalState; + } + } + private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index 24c8d32..85451a5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -134,14 +134,14 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> " window: " + mergeResult); } - context.key = key; - context.window = mergeResult; + triggerContext.key = key; + triggerContext.window = mergeResult; - context.onMerge(mergedWindows); + triggerContext.onMerge(mergedWindows); for (W m : mergedWindows) { - context.window = m; - context.clear(); + triggerContext.window = m; + triggerContext.clear(); deleteCleanupTimer(m); } @@ -165,12 +165,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> evictingWindowState.setCurrentNamespace(stateWindow); evictingWindowState.add(element); - context.key = key; - context.window = actualWindow; + triggerContext.key = key; + triggerContext.window = actualWindow; evictorContext.key = key; evictorContext.window = actualWindow; - TriggerResult triggerResult = context.onElement(element); + TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { Iterable<StreamRecord<IN>> contents = evictingWindowState.get(); @@ -201,12 +201,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> evictingWindowState.setCurrentNamespace(window); evictingWindowState.add(element); - context.key = key; - context.window = window; + triggerContext.key = key; + triggerContext.window = window; evictorContext.key = key; evictorContext.window = window; - TriggerResult triggerResult = context.onElement(element); + TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { Iterable<StreamRecord<IN>> contents = evictingWindowState.get(); @@ -236,8 +236,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> @Override public void onEventTime(InternalTimer<K, W> timer) throws Exception { - context.key = timer.getKey(); - context.window = timer.getNamespace(); + triggerContext.key = timer.getKey(); + triggerContext.window = timer.getNamespace(); evictorContext.key = timer.getKey(); evictorContext.window = timer.getNamespace(); @@ -245,7 +245,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); - W stateWindow = mergingWindows.getStateWindow(context.window); + W stateWindow = mergingWindows.getStateWindow(triggerContext.window); if (stateWindow == null) { // Timer firing for non-existent window, this can only happen if a // trigger did not clean up timers. We have already cleared the merging @@ -255,23 +255,23 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> evictingWindowState.setCurrentNamespace(stateWindow); } } else { - evictingWindowState.setCurrentNamespace(context.window); + evictingWindowState.setCurrentNamespace(triggerContext.window); } Iterable<StreamRecord<IN>> contents = evictingWindowState.get(); if (contents != null) { - TriggerResult triggerResult = context.onEventTime(timer.getTimestamp()); + TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp()); if (triggerResult.isFire()) { - emitWindowContents(context.window, contents, evictingWindowState); + emitWindowContents(triggerContext.window, contents, evictingWindowState); } if (triggerResult.isPurge()) { evictingWindowState.clear(); } } - if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) { - clearAllState(context.window, evictingWindowState, mergingWindows); + if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { + clearAllState(triggerContext.window, evictingWindowState, mergingWindows); } if (mergingWindows != null) { @@ -282,8 +282,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> @Override public void onProcessingTime(InternalTimer<K, W> timer) throws Exception { - context.key = timer.getKey(); - context.window = timer.getNamespace(); + triggerContext.key = timer.getKey(); + triggerContext.window = timer.getNamespace(); evictorContext.key = timer.getKey(); evictorContext.window = timer.getNamespace(); @@ -291,7 +291,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); - W stateWindow = mergingWindows.getStateWindow(context.window); + W stateWindow = mergingWindows.getStateWindow(triggerContext.window); if (stateWindow == null) { // Timer firing for non-existent window, this can only happen if a // trigger did not clean up timers. We have already cleared the merging @@ -301,23 +301,23 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> evictingWindowState.setCurrentNamespace(stateWindow); } } else { - evictingWindowState.setCurrentNamespace(context.window); + evictingWindowState.setCurrentNamespace(triggerContext.window); } Iterable<StreamRecord<IN>> contents = evictingWindowState.get(); if (contents != null) { - TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); + TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp()); if (triggerResult.isFire()) { - emitWindowContents(context.window, contents, evictingWindowState); + emitWindowContents(triggerContext.window, contents, evictingWindowState); } if (triggerResult.isPurge()) { evictingWindowState.clear(); } } - if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) { - clearAllState(context.window, evictingWindowState, mergingWindows); + if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { + clearAllState(triggerContext.window, evictingWindowState, mergingWindows); } if (mergingWindows != null) { @@ -348,7 +348,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> } }); - userFunction.apply(context.key, context.window, projectedContents, timestampedCollector); + processContext.window = triggerContext.window; + userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector); evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp)); @@ -364,9 +365,10 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> W window, ListState<StreamRecord<IN>> windowState, MergingWindowSet<W> mergingWindows) throws Exception { - windowState.clear(); - context.clear(); + triggerContext.clear(); + processContext.window = window; + processContext.clear(); if (mergingWindows != null) { mergingWindows.retireWindow(window); mergingWindows.persist(); http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 3745659..3d40716 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -23,8 +23,16 @@ import org.apache.commons.math3.util.ArithmeticUtils; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.AppendingState; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueState; @@ -159,7 +167,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> /** This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp. */ protected transient TimestampedCollector<OUT> timestampedCollector; - protected transient Context context = new Context(null, null); + protected transient Context triggerContext = new Context(null, null); + + protected transient WindowContext processContext = new WindowContext(null); protected transient WindowAssigner.WindowAssignerContext windowAssignerContext; @@ -264,7 +274,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> internalTimerService = getInternalTimerService("window-timers", windowSerializer, this); - context = new Context(null, null); + triggerContext = new Context(null, null); + processContext = new WindowContext( null); windowAssignerContext = new WindowAssigner.WindowAssignerContext() { @Override @@ -317,7 +328,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> public void close() throws Exception { super.close(); timestampedCollector = null; - context = null; + triggerContext = null; + processContext = null; windowAssignerContext = null; } @@ -325,7 +337,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> public void dispose() throws Exception { super.dispose(); timestampedCollector = null; - context = null; + triggerContext = null; + processContext = null; windowAssignerContext = null; } @@ -365,14 +378,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> " window: " + mergeResult); } - context.key = key; - context.window = mergeResult; + triggerContext.key = key; + triggerContext.window = mergeResult; - context.onMerge(mergedWindows); + triggerContext.onMerge(mergedWindows); for (W m: mergedWindows) { - context.window = m; - context.clear(); + triggerContext.window = m; + triggerContext.clear(); deleteCleanupTimer(m); } @@ -396,10 +409,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> windowState.setCurrentNamespace(stateWindow); windowState.add(element.getValue()); - context.key = key; - context.window = actualWindow; + triggerContext.key = key; + triggerContext.window = actualWindow; - TriggerResult triggerResult = context.onElement(element); + TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); @@ -429,10 +442,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> windowState.setCurrentNamespace(window); windowState.add(element.getValue()); - context.key = key; - context.window = window; + triggerContext.key = key; + triggerContext.window = window; - TriggerResult triggerResult = context.onElement(element); + TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); @@ -460,14 +473,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> @Override public void onEventTime(InternalTimer<K, W> timer) throws Exception { - context.key = timer.getKey(); - context.window = timer.getNamespace(); + triggerContext.key = timer.getKey(); + triggerContext.window = timer.getNamespace(); MergingWindowSet<W> mergingWindows; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); - W stateWindow = mergingWindows.getStateWindow(context.window); + W stateWindow = mergingWindows.getStateWindow(triggerContext.window); if (stateWindow == null) { // Timer firing for non-existent window, this can only happen if a // trigger did not clean up timers. We have already cleared the merging @@ -477,7 +490,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> windowState.setCurrentNamespace(stateWindow); } } else { - windowState.setCurrentNamespace(context.window); + windowState.setCurrentNamespace(triggerContext.window); mergingWindows = null; } @@ -487,17 +500,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } if (contents != null) { - TriggerResult triggerResult = context.onEventTime(timer.getTimestamp()); + TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp()); if (triggerResult.isFire()) { - emitWindowContents(context.window, contents); + emitWindowContents(triggerContext.window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } } - if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) { - clearAllState(context.window, windowState, mergingWindows); + if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { + clearAllState(triggerContext.window, windowState, mergingWindows); } if (mergingWindows != null) { @@ -508,14 +521,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> @Override public void onProcessingTime(InternalTimer<K, W> timer) throws Exception { - context.key = timer.getKey(); - context.window = timer.getNamespace(); + triggerContext.key = timer.getKey(); + triggerContext.window = timer.getNamespace(); MergingWindowSet<W> mergingWindows; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); - W stateWindow = mergingWindows.getStateWindow(context.window); + W stateWindow = mergingWindows.getStateWindow(triggerContext.window); if (stateWindow == null) { // Timer firing for non-existent window, this can only happen if a // trigger did not clean up timers. We have already cleared the merging @@ -525,7 +538,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> windowState.setCurrentNamespace(stateWindow); } } else { - windowState.setCurrentNamespace(context.window); + windowState.setCurrentNamespace(triggerContext.window); mergingWindows = null; } @@ -535,17 +548,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } if (contents != null) { - TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); + TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp()); if (triggerResult.isFire()) { - emitWindowContents(context.window, contents); + emitWindowContents(triggerContext.window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } } - if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) { - clearAllState(context.window, windowState, mergingWindows); + if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { + clearAllState(triggerContext.window, windowState, mergingWindows); } if (mergingWindows != null) { @@ -559,14 +572,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> * {@link Trigger#clear(Window, Trigger.TriggerContext)}. * * <p>The caller must ensure that the - * correct key is set in the state backend and the context object. + * correct key is set in the state backend and the triggerContext object. */ private void clearAllState( W window, AppendingState<IN, ACC> windowState, MergingWindowSet<W> mergingWindows) throws Exception { windowState.clear(); - context.clear(); + triggerContext.clear(); + processContext.window = window; + processContext.clear(); if (mergingWindows != null) { mergingWindows.retireWindow(window); mergingWindows.persist(); @@ -579,7 +594,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> @SuppressWarnings("unchecked") private void emitWindowContents(W window, ACC contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); - userFunction.apply(context.key, context.window, contents, timestampedCollector); + processContext.window = window; + userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector); } /** @@ -636,9 +652,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } if (windowAssigner.isEventTime()) { - context.registerEventTimeTimer(cleanupTime); + triggerContext.registerEventTimeTimer(cleanupTime); } else { - context.registerProcessingTimeTimer(cleanupTime); + triggerContext.registerProcessingTimeTimer(cleanupTime); } } @@ -654,9 +670,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> return; } if (windowAssigner.isEventTime()) { - context.deleteEventTimeTimer(cleanupTime); + triggerContext.deleteEventTimeTimer(cleanupTime); } else { - context.deleteProcessingTimeTimer(cleanupTime); + triggerContext.deleteProcessingTimeTimer(cleanupTime); } } @@ -686,6 +702,134 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } /** + * Base class for per-window {@link KeyedStateStore KeyedStateStores}. Used to allow per-window + * state access for {@link org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}. + */ + public abstract class AbstractPerWindowStateStore implements KeyedStateStore { + + // we have this in the base class even though it's not used in MergingKeyStore so that + // we can always set it and ignore what actual implementation we have + protected W window; + } + + /** + * Special {@link AbstractPerWindowStateStore} that doesn't allow access to per-window state. + */ + public class MergingWindowStateStore extends AbstractPerWindowStateStore { + @Override + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows."); + } + + @Override + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows."); + } + + @Override + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows."); + } + + @Override + public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) { + throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows."); + } + + @Override + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { + throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows."); + } + } + + /** + * Regular per-window state store for use with + * {@link org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}. + */ + public class PerWindowStateStore extends AbstractPerWindowStateStore { + @Override + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + } + + /** + * A utility class for handling {@code ProcessWindowFunction} invocations. This can be reused + * by setting the {@code key} and {@code window} fields. No internal state must be kept in the + * {@code WindowContext}. + */ + public class WindowContext implements InternalWindowFunction.InternalWindowContext { + protected W window; + + protected AbstractPerWindowStateStore windowState; + + public WindowContext(W window) { + this.window = window; + this.windowState = windowAssigner instanceof MergingWindowAssigner ? new MergingWindowStateStore() : new PerWindowStateStore(); + } + + @Override + public String toString() { + return "WindowContext{Window = " + window.toString() + "}"; + } + + public void clear() throws Exception { + userFunction.clear(window, this); + } + + @Override + public KeyedStateStore windowState() { + this.windowState.window = this.window; + return this.windowState; + } + + @Override + public KeyedStateStore globalState() { + return WindowOperator.this.getKeyedStateStore(); + } + } + + /** * {@code Context} is a utility for handling {@code Trigger} invocations. It can be reused * by setting the {@code key} and {@code window} fields. No internal state must be kept in * the {@code Context} http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java index 9533c95..83e896d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @@ -45,6 +46,8 @@ public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W ext private final AggregateFunction<T, ACC, V> aggFunction; + private transient InternalProcessAllWindowContext<V, R, W> ctx; + public InternalAggregateProcessAllWindowFunction( AggregateFunction<T, ACC, V> aggFunction, ProcessAllWindowFunction<V, R, W> windowFunction) { @@ -53,22 +56,31 @@ public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W ext } @Override - public void apply(Byte key, final W window, Iterable<T> input, Collector<R> out) throws Exception { + public void open(Configuration parameters) throws Exception { + super.open(parameters); ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction; - ProcessAllWindowFunction<V, R, W>.Context context = wrappedFunction.new Context() { - @Override - public W window() { - return window; - } - }; + this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction); + } + @Override + public void process(Byte key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception { final ACC acc = aggFunction.createAccumulator(); for (T val : input) { aggFunction.add(val, acc); } - wrappedFunction.process(context, Collections.singletonList(aggFunction.getResult(acc)), out); + this.ctx.window = window; + this.ctx.internalContext = context; + ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction; + wrappedFunction.process(ctx, Collections.singletonList(aggFunction.getResult(acc)), out); + } + + @Override + public void clear(final W window, final InternalWindowContext context) throws Exception { + this.ctx.window = window; + this.ctx.internalContext = context; + wrappedFunction.clear(ctx); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java index 433da9b..e14c9bd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java @@ -46,30 +46,36 @@ public final class InternalAggregateProcessWindowFunction<T, ACC, V, R, K, W ext private final AggregateFunction<T, ACC, V> aggFunction; + private final InternalProcessWindowContext<V, R, K, W> ctx; + public InternalAggregateProcessWindowFunction( AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction) { super(windowFunction); this.aggFunction = aggFunction; + this.ctx = new InternalProcessWindowContext<>(windowFunction); } - - @Override - public void apply(K key, final W window, Iterable<T> input, Collector<R> out) throws Exception { - ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction; - ProcessWindowFunction<V, R, K, W>.Context context = wrappedFunction.new Context() { - @Override - public W window() { - return window; - } - }; + @Override + public void process(K key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception { final ACC acc = aggFunction.createAccumulator(); for (T val : input) { aggFunction.add(val, acc); } - wrappedFunction.process(key, context, Collections.singletonList(aggFunction.getResult(acc)), out); + this.ctx.window = window; + this.ctx.internalContext = context; + ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction; + wrappedFunction.process(key, ctx, Collections.singletonList(aggFunction.getResult(acc)), out); + } + + @Override + public void clear(final W window, final InternalWindowContext context) throws Exception { + this.ctx.window = window; + this.ctx.internalContext = context; + ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction; + wrappedFunction.clear(ctx); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java index 672bdb6..f2507ed 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java @@ -39,11 +39,16 @@ public final class InternalIterableAllWindowFunction<IN, OUT, W extends Window> } @Override - public void apply(Byte key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception { + public void process(Byte key, W window, InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception { wrappedFunction.apply(window, input, out); } @Override + public void clear(W window, InternalWindowContext context) throws Exception { + + } + + @Override public RuntimeContext getRuntimeContext() { throw new RuntimeException("This should never be called."); } http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java index e33cc2a..47b7d55 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing.functions; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @@ -34,21 +35,33 @@ public final class InternalIterableProcessAllWindowFunction<IN, OUT, W extends W private static final long serialVersionUID = 1L; + private transient InternalProcessAllWindowContext<IN, OUT, W> ctx; + public InternalIterableProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) { super(wrappedFunction); } @Override - public void apply(Byte key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception { + public void open(Configuration parameters) throws Exception { + super.open(parameters); + ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction; + this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction); + } + + @Override + public void process(Byte key, final W window, final InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception { + this.ctx.window = window; + this.ctx.internalContext = context; + ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction; + wrappedFunction.process(ctx, input, out); + } + + @Override + public void clear(final W window, final InternalWindowContext context) throws Exception { + this.ctx.window = window; + this.ctx.internalContext = context; ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction; - ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() { - @Override - public W window() { - return window; - } - }; - - wrappedFunction.process(context, input, out); + wrappedFunction.clear(ctx); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java index de516a5..7eb015e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java @@ -34,21 +34,27 @@ public final class InternalIterableProcessWindowFunction<IN, OUT, KEY, W extends private static final long serialVersionUID = 1L; + private final InternalProcessWindowContext<IN, OUT, KEY, W> ctx; + public InternalIterableProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) { super(wrappedFunction); + this.ctx = new InternalProcessWindowContext<>(wrappedFunction); + } + + @Override + public void process(KEY key, final W window, final InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception { + this.ctx.window = window; + this.ctx.internalContext = context; + ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction; + wrappedFunction.process(key, ctx, input, out); } @Override - public void apply(KEY key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception { + public void clear(final W window, final InternalWindowContext context) throws Exception { + this.ctx.window = window; + this.ctx.internalContext = context; ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction; - ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() { - @Override - public W window() { - return window; - } - }; - - wrappedFunction.process(key, context, input, out); + wrappedFunction.clear(ctx); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java index 895b31f..e2f1517 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java @@ -39,11 +39,16 @@ public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window } @Override - public void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception { + public void process(KEY key, W window, InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception { wrappedFunction.apply(key, window, input, out); } @Override + public void clear(W window, InternalWindowContext context) throws Exception { + + } + + @Override public RuntimeContext getRuntimeContext() { throw new RuntimeException("This should never be called."); } http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java new file mode 100644 index 0000000..c70e161 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; + +/** + * Internal reusable context wrapper. + * + * @param <IN> The type of the input value. + * @param <OUT> The type of the output value. + * @param <W> The type of the window. + */ +@Internal +public class InternalProcessAllWindowContext<IN, OUT, W extends Window> + extends ProcessAllWindowFunction<IN, OUT, W>.Context { + + W window; + InternalWindowFunction.InternalWindowContext internalContext; + + InternalProcessAllWindowContext(ProcessAllWindowFunction<IN, OUT, W> function) { + function.super(); + } + + @Override + public W window() { + return window; + } + + @Override + public KeyedStateStore windowState() { + return internalContext.windowState(); + } + + @Override + public KeyedStateStore globalState() { + return internalContext.globalState(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java new file mode 100644 index 0000000..0f1c0ee --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; + +/** + * Internal reusable context wrapper. + * + * @param <IN> The type of the input value. + * @param <OUT> The type of the output value. + * @param <KEY> The type of the key. + * @param <W> The type of the window. + */ +@Internal +public class InternalProcessWindowContext<IN, OUT, KEY, W extends Window> + extends ProcessWindowFunction<IN, OUT, KEY, W>.Context { + + W window; + InternalWindowFunction.InternalWindowContext internalContext; + + InternalProcessWindowContext(ProcessWindowFunction<IN, OUT, KEY, W> function) { + function.super(); + } + + @Override + public W window() { + return window; + } + + @Override + public KeyedStateStore windowState() { + return internalContext.windowState(); + } + + @Override + public KeyedStateStore globalState() { + return internalContext.globalState(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java index a34d3ec..e90bcf4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java @@ -41,11 +41,16 @@ public final class InternalSingleValueAllWindowFunction<IN, OUT, W extends Windo } @Override - public void apply(Byte key, W window, IN input, Collector<OUT> out) throws Exception { + public void process(Byte key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception { wrappedFunction.apply(window, Collections.singletonList(input), out); } @Override + public void clear(W window, InternalWindowContext context) throws Exception { + + } + + @Override public RuntimeContext getRuntimeContext() { throw new RuntimeException("This should never be called."); } http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java index 0284ef7..f7c6a08 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing.functions; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @@ -36,21 +37,33 @@ public final class InternalSingleValueProcessAllWindowFunction<IN, OUT, W extend private static final long serialVersionUID = 1L; + private transient InternalProcessAllWindowContext<IN, OUT, W> ctx; + public InternalSingleValueProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) { super(wrappedFunction); } @Override - public void apply(Byte key, final W window, IN input, Collector<OUT> out) throws Exception { + public void open(Configuration parameters) throws Exception { + super.open(parameters); + ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction; + this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction); + } + + @Override + public void process(Byte key, final W window, final InternalWindowContext context, IN input, Collector<OUT> out) throws Exception { + this.ctx.window = window; + this.ctx.internalContext = context; ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction; - ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() { - @Override - public W window() { - return window; - } - }; + wrappedFunction.process(ctx, Collections.singletonList(input), out); + } - wrappedFunction.process(context, Collections.singletonList(input), out); + @Override + public void clear(final W window, final InternalWindowContext context) throws Exception { + this.ctx.window = window; + this.ctx.internalContext = context; + ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction; + wrappedFunction.clear(ctx); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java index 7a4e8c6..21d1639 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java @@ -36,21 +36,29 @@ public final class InternalSingleValueProcessWindowFunction<IN, OUT, KEY, W exte private static final long serialVersionUID = 1L; + private final InternalProcessWindowContext<IN, OUT, KEY, W> ctx; + public InternalSingleValueProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) { super(wrappedFunction); + ctx = new InternalProcessWindowContext<>(wrappedFunction); } @Override - public void apply(KEY key, final W window, IN input, Collector<OUT> out) throws Exception { + public void process(KEY key, final W window, final InternalWindowContext context, IN input, Collector<OUT> out) throws Exception { + this.ctx.window = window; + this.ctx.internalContext = context; + ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction; - ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() { - @Override - public W window() { - return window; - } - }; + wrappedFunction.process(key, ctx, Collections.singletonList(input), out); + } - wrappedFunction.process(key, context, Collections.singletonList(input), out); + @Override + public void clear(final W window, final InternalWindowContext context) throws Exception { + this.ctx.window = window; + this.ctx.internalContext = context; + + ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction; + wrappedFunction.clear(ctx); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java index 9a0a447..d5cc4a2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java @@ -41,11 +41,16 @@ public final class InternalSingleValueWindowFunction<IN, OUT, KEY, W extends Win } @Override - public void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception { + public void process(KEY key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception { wrappedFunction.apply(key, window, Collections.singletonList(input), out); } @Override + public void clear(W window, InternalWindowContext context) throws Exception { + + } + + @Override public RuntimeContext getRuntimeContext() { throw new RuntimeException("This should never be called."); } http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java index 2eb4052..9834480 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.operators.windowing.functions; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @@ -29,15 +30,28 @@ import org.apache.flink.util.Collector; * @param <KEY> The type of the key. */ public interface InternalWindowFunction<IN, OUT, KEY, W extends Window> extends Function { - /** * Evaluates the window and outputs none or several elements. * - * @param key The key for which this window is evaluated. - * @param window The window that is being evaluated. - * @param input The elements in the window being evaluated. - * @param out A collector for emitting elements. + * @param context The context in which the window is being evaluated. + * @param input The elements in the window being evaluated. + * @param out A collector for emitting elements. + * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ - void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception; + void process(KEY key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception; + + /** + * Deletes any state in the {@code Context} when the Window is purged. + * + * @param context The context to which the window is being evaluated + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + void clear(W window, InternalWindowContext context) throws Exception; + + interface InternalWindowContext extends java.io.Serializable { + KeyedStateStore windowState(); + + KeyedStateStore globalState(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java index 4b479f3..c4bed37 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java @@ -21,20 +21,28 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.ByteSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; @@ -45,8 +53,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProces import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction; import org.apache.flink.util.Collector; -import org.junit.Test; import org.junit.Assert; +import org.junit.Test; import java.util.ArrayList; import java.util.List; @@ -139,12 +147,26 @@ public class FoldApplyProcessWindowFunctionTest { expected.add(initValue); - foldWindowFunction.process(0, foldWindowFunction.new Context() { + FoldApplyProcessWindowFunction<Integer, TimeWindow, Integer, Integer, Integer>.Context ctx = foldWindowFunction.new Context() { @Override public TimeWindow window() { return new TimeWindow(0, 1); } - }, input, new ListCollector<>(result)); + + @Override + public KeyedStateStore windowState() { + return new DummyKeyedStateStore(); + } + + @Override + public KeyedStateStore globalState() { + return new DummyKeyedStateStore(); + } + }; + + foldWindowFunction.open(new Configuration()); + + foldWindowFunction.process(0, ctx, input, new ListCollector<>(result)); Assert.assertEquals(expected, result); } @@ -234,16 +256,58 @@ public class FoldApplyProcessWindowFunctionTest { expected.add(initValue); - foldWindowFunction.process(foldWindowFunction.new Context() { + FoldApplyProcessAllWindowFunction<TimeWindow, Integer, Integer, Integer>.Context ctx = foldWindowFunction.new Context() { @Override public TimeWindow window() { return new TimeWindow(0, 1); } - }, input, new ListCollector<>(result)); + + @Override + public KeyedStateStore windowState() { + return new DummyKeyedStateStore(); + } + + @Override + public KeyedStateStore globalState() { + return new DummyKeyedStateStore(); + } + }; + + foldWindowFunction.open(new Configuration()); + + foldWindowFunction.process(ctx, input, new ListCollector<>(result)); Assert.assertEquals(expected, result); } + public static class DummyKeyedStateStore implements KeyedStateStore { + + @Override + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { + return null; + } + + @Override + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { + return null; + } + + @Override + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { + return null; + } + + @Override + public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { + return null; + } + + @Override + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { + return null; + } + } + public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { public DummyStreamExecutionEnvironment() {
