[FLINK-6164] Make ProcessWindowFunction a RichFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e76a0aa9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e76a0aa9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e76a0aa9 Branch: refs/heads/release-1.3 Commit: e76a0aa9e2324c4647509379fe4125d8dc576ff0 Parents: e5adf11 Author: zentol <[email protected]> Authored: Wed May 3 15:57:05 2017 +0200 Committer: zentol <[email protected]> Committed: Tue May 9 21:07:33 2017 +0200 ---------------------------------------------------------------------- .../FoldApplyProcessAllWindowFunction.java | 2 +- .../FoldApplyProcessWindowFunction.java | 2 +- .../windowing/ProcessAllWindowFunction.java | 4 +- .../windowing/ProcessWindowFunction.java | 4 +- .../ReduceApplyProcessAllWindowFunction.java | 3 +- .../ReduceApplyProcessWindowFunction.java | 2 +- .../windowing/RichProcessAllWindowFunction.java | 53 ++------------------ .../windowing/RichProcessWindowFunction.java | 53 ++------------------ .../functions/InternalWindowFunctionTest.java | 11 ++-- ...AlignedProcessingTimeWindowOperatorTest.java | 4 +- .../function/ProcessAllWindowFunction.scala | 7 ++- .../scala/function/ProcessWindowFunction.scala | 6 ++- .../function/RichProcessAllWindowFunction.scala | 53 +------------------- .../function/RichProcessWindowFunction.scala | 53 +------------------- .../ScalaProcessWindowFunctionWrapper.scala | 20 +++----- ...ngIdentityRichProcessAllWindowFunction.scala | 4 +- ...ckingIdentityRichProcessWindowFunction.scala | 4 +- 17 files changed, 45 insertions(+), 240 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java index 8982c71..1d39252 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java @@ -45,7 +45,7 @@ import org.apache.flink.util.Collector; @Internal @Deprecated public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R> - extends RichProcessAllWindowFunction<T, R, W> + extends ProcessAllWindowFunction<T, R, W> implements OutputTypeConfigurable<R> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java index 0e0356a..fa4fe86 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java @@ -45,7 +45,7 @@ import org.apache.flink.util.Collector; @Internal @Deprecated public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R> - extends RichProcessWindowFunction<T, R, K, W> + extends ProcessWindowFunction<T, R, K, W> implements OutputTypeConfigurable<R> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java index 4d247a7..34a37bf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @@ -33,7 +33,7 @@ import org.apache.flink.util.Collector; * @param <W> The type of {@code Window} that this window function can be applied on. */ @PublicEvolving -public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implements Function { +public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java index 2c80e9e..506b610 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @@ -34,7 +34,7 @@ import org.apache.flink.util.Collector; * @param <W> The type of {@code Window} that this window function can be applied on. */ @PublicEvolving -public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function { +public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java index d1f9ccd..e7e6609 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java @@ -32,8 +32,7 @@ import org.apache.flink.util.Collector; * {@link ReduceFunction}. */ @Internal -public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R> - extends RichProcessAllWindowFunction<T, R, W> { +public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R> extends ProcessAllWindowFunction<T, R, W> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java index 836726d..18037b7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java @@ -33,7 +33,7 @@ import org.apache.flink.util.Collector; */ @Internal public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R> - extends RichProcessWindowFunction<T, R, K, W> { + extends ProcessWindowFunction<T, R, K, W> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java index 1130fa5..a800870 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java @@ -19,10 +19,6 @@ package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.IterationRuntimeContext; -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.windowing.windows.Window; /** @@ -32,53 +28,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window; * @param <IN> The type of the input value. * @param <OUT> The type of the output value. * @param <W> The type of {@code Window} that this window function can be applied on. + * + * @deprecated use {@link ProcessAllWindowFunction} instead */ @PublicEvolving -public abstract class RichProcessAllWindowFunction<IN, OUT, W extends Window> - extends ProcessAllWindowFunction<IN, OUT, W> - implements RichFunction { +@Deprecated +public abstract class RichProcessAllWindowFunction<IN, OUT, W extends Window> extends ProcessAllWindowFunction<IN, OUT, W> { private static final long serialVersionUID = 1L; - - - // -------------------------------------------------------------------------------------------- - // Runtime context access - // -------------------------------------------------------------------------------------------- - - private transient RuntimeContext runtimeContext; - - @Override - public void setRuntimeContext(RuntimeContext t) { - this.runtimeContext = t; - } - - @Override - public RuntimeContext getRuntimeContext() { - if (this.runtimeContext != null) { - return this.runtimeContext; - } else { - throw new IllegalStateException("The runtime context has not been initialized."); - } - } - - @Override - public IterationRuntimeContext getIterationRuntimeContext() { - if (this.runtimeContext == null) { - throw new IllegalStateException("The runtime context has not been initialized."); - } else if (this.runtimeContext instanceof IterationRuntimeContext) { - return (IterationRuntimeContext) this.runtimeContext; - } else { - throw new IllegalStateException("This stub is not part of an iteration step function."); - } - } - - // -------------------------------------------------------------------------------------------- - // Default life cycle methods - // -------------------------------------------------------------------------------------------- - - @Override - public void open(Configuration parameters) throws Exception {} - - @Override - public void close() throws Exception {} } http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java index ac55bc6..83da065 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java @@ -19,10 +19,6 @@ package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.IterationRuntimeContext; -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.windowing.windows.Window; /** @@ -33,53 +29,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window; * @param <OUT> The type of the output value. * @param <KEY> The type of the key. * @param <W> The type of {@code Window} that this window function can be applied on. + * + * @deprecated use {@link ProcessWindowFunction} instead */ @PublicEvolving -public abstract class RichProcessWindowFunction<IN, OUT, KEY, W extends Window> - extends ProcessWindowFunction<IN, OUT, KEY, W> - implements RichFunction { +@Deprecated +public abstract class RichProcessWindowFunction<IN, OUT, KEY, W extends Window> extends ProcessWindowFunction<IN, OUT, KEY, W> { private static final long serialVersionUID = 1L; - - - // -------------------------------------------------------------------------------------------- - // Runtime context access - // -------------------------------------------------------------------------------------------- - - private transient RuntimeContext runtimeContext; - - @Override - public void setRuntimeContext(RuntimeContext t) { - this.runtimeContext = t; - } - - @Override - public RuntimeContext getRuntimeContext() { - if (this.runtimeContext != null) { - return this.runtimeContext; - } else { - throw new IllegalStateException("The runtime context has not been initialized."); - } - } - - @Override - public IterationRuntimeContext getIterationRuntimeContext() { - if (this.runtimeContext == null) { - throw new IllegalStateException("The runtime context has not been initialized."); - } else if (this.runtimeContext instanceof IterationRuntimeContext) { - return (IterationRuntimeContext) this.runtimeContext; - } else { - throw new IllegalStateException("This stub is not part of an iteration step function."); - } - } - - // -------------------------------------------------------------------------------------------- - // Default life cycle methods - // -------------------------------------------------------------------------------------------- - - @Override - public void open(Configuration parameters) throws Exception {} - - @Override - public void close() throws Exception {} } http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java index 4b8057f..4b0f5ab 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java @@ -24,11 +24,10 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.util.functions.StreamingFunctionUtils; import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -612,7 +611,7 @@ public class InternalWindowFunctionTest { } public static class ProcessWindowFunctionMock - extends RichProcessWindowFunction<Long, String, Long, TimeWindow> + extends ProcessWindowFunction<Long, String, Long, TimeWindow> implements OutputTypeConfigurable<String> { private static final long serialVersionUID = 1L; @@ -626,7 +625,7 @@ public class InternalWindowFunctionTest { } public static class AggregateProcessWindowFunctionMock - extends RichProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow> + extends ProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow> implements OutputTypeConfigurable<String> { private static final long serialVersionUID = 1L; @@ -640,7 +639,7 @@ public class InternalWindowFunctionTest { } public static class AggregateProcessAllWindowFunctionMock - extends RichProcessAllWindowFunction<Map<Long, Long>, String, TimeWindow> + extends ProcessAllWindowFunction<Map<Long, Long>, String, TimeWindow> implements OutputTypeConfigurable<String> { private static final long serialVersionUID = 1L; @@ -679,7 +678,7 @@ public class InternalWindowFunctionTest { } public static class ProcessAllWindowFunctionMock - extends RichProcessAllWindowFunction<Long, String, TimeWindow> + extends ProcessAllWindowFunction<Long, String, TimeWindow> implements OutputTypeConfigurable<String> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index a8d3154..2f7e302 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -35,9 +35,7 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; -import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction; @@ -1038,7 +1036,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { // ------------------------------------------------------------------------ - private static class StatefulFunction extends RichProcessWindowFunction<Integer, Integer, Integer, TimeWindow> { + private static class StatefulFunction extends ProcessWindowFunction<Integer, Integer, Integer, TimeWindow> { // we use a concurrent map here even though there is no concurrency, to // get "volatile" style access to entries http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala index 2f0e48e..49911e4 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala @@ -18,10 +18,8 @@ package org.apache.flink.streaming.api.scala.function -import java.io.Serializable - import org.apache.flink.annotation.PublicEvolving -import org.apache.flink.api.common.functions.Function +import org.apache.flink.api.common.functions.AbstractRichFunction import org.apache.flink.api.common.state.KeyedStateStore import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector @@ -35,7 +33,8 @@ import org.apache.flink.util.Collector * @tparam W The type of the window. */ @PublicEvolving -abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function with Serializable { +abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] + extends AbstractRichFunction { /** * Evaluates the window and outputs none or several elements. * http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala index bc79a26..d2075db 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala @@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala.function import java.io.Serializable import org.apache.flink.annotation.PublicEvolving -import org.apache.flink.api.common.functions.Function +import org.apache.flink.api.common.functions.AbstractRichFunction import org.apache.flink.api.common.state.KeyedStateStore import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector @@ -36,7 +36,9 @@ import org.apache.flink.util.Collector * @tparam W The type of the window. */ @PublicEvolving -abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable { +abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] + extends AbstractRichFunction { + /** * Evaluates the window and outputs none or several elements. * http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala index 22d64a8..6edc1e6 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala @@ -18,11 +18,7 @@ package org.apache.flink.streaming.api.scala.function -import java.beans.Transient - import org.apache.flink.annotation.Public -import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext} -import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.windowing.windows.Window /** @@ -34,53 +30,8 @@ import org.apache.flink.streaming.api.windowing.windows.Window * @tparam W The type of the window. */ @Public +@deprecated("use [[ProcessAllWindowFunction]] instead") abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window] - extends ProcessAllWindowFunction[IN, OUT, W] - with RichFunction { - - @Transient - private var runtimeContext: RuntimeContext = null - - // -------------------------------------------------------------------------------------------- - // Runtime context access - // -------------------------------------------------------------------------------------------- - - override def setRuntimeContext(t: RuntimeContext) { - this.runtimeContext = t - } - - override def getRuntimeContext: RuntimeContext = { - if (this.runtimeContext != null) { - this.runtimeContext - } - else { - throw new IllegalStateException("The runtime context has not been initialized.") - } - } - - override def getIterationRuntimeContext: IterationRuntimeContext = { - if (this.runtimeContext == null) { - throw new IllegalStateException("The runtime context has not been initialized.") - } - else { - this.runtimeContext match { - case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext - case _ => - throw new IllegalStateException("This stub is not part of an iteration step function.") - } - } - } - - // -------------------------------------------------------------------------------------------- - // Default life cycle methods - // -------------------------------------------------------------------------------------------- - - @throws[Exception] - override def open(parameters: Configuration) { - } - - @throws[Exception] - override def close() { - } + extends ProcessAllWindowFunction[IN, OUT, W] { } http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala index 320685a..d9cd275 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala @@ -18,11 +18,7 @@ package org.apache.flink.streaming.api.scala.function -import java.beans.Transient - import org.apache.flink.annotation.Public -import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext} -import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.windowing.windows.Window /** @@ -35,53 +31,8 @@ import org.apache.flink.streaming.api.windowing.windows.Window * @tparam W The type of the window. */ @Public +@deprecated("use [[ProcessWindowFunction]] instead") abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window] - extends ProcessWindowFunction[IN, OUT, KEY, W] - with RichFunction { - - @Transient - private var runtimeContext: RuntimeContext = null - - // -------------------------------------------------------------------------------------------- - // Runtime context access - // -------------------------------------------------------------------------------------------- - - override def setRuntimeContext(t: RuntimeContext) { - this.runtimeContext = t - } - - override def getRuntimeContext: RuntimeContext = { - if (this.runtimeContext != null) { - this.runtimeContext - } - else { - throw new IllegalStateException("The runtime context has not been initialized.") - } - } - - override def getIterationRuntimeContext: IterationRuntimeContext = { - if (this.runtimeContext == null) { - throw new IllegalStateException("The runtime context has not been initialized.") - } - else { - this.runtimeContext match { - case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext - case _ => - throw new IllegalStateException("This stub is not part of an iteration step function.") - } - } - } - - // -------------------------------------------------------------------------------------------- - // Default life cycle methods - // -------------------------------------------------------------------------------------------- - - @throws[Exception] - override def open(parameters: Configuration) { - } - - @throws[Exception] - override def close() { - } + extends ProcessWindowFunction[IN, OUT, KEY, W] { } http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala index 263373e..bc4b7dd 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala @@ -21,13 +21,9 @@ package org.apache.flink.streaming.api.scala.function.util import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction} -import org.apache.flink.streaming.api.functions.windowing.{RichProcessWindowFunction => JRichProcessWindowFunction} -import org.apache.flink.streaming.api.functions.windowing.{RichProcessAllWindowFunction => JRichProcessAllWindowFunction} import org.apache.flink.streaming.api.functions.windowing.{ProcessAllWindowFunction => JProcessAllWindowFunction} import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction => ScalaProcessWindowFunction} import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction => ScalaProcessAllWindowFunction} -import org.apache.flink.streaming.api.scala.function.{RichProcessWindowFunction => ScalaRichProcessWindowFunction} -import org.apache.flink.streaming.api.scala.function.{RichProcessAllWindowFunction => ScalaRichProcessAllWindowFunction} import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector @@ -43,7 +39,7 @@ import scala.collection.JavaConverters._ */ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window]( private[this] val func: ScalaProcessWindowFunction[IN, OUT, KEY, W]) - extends JRichProcessWindowFunction[IN, OUT, KEY, W] { + extends JProcessWindowFunction[IN, OUT, KEY, W] { override def process( key: KEY, @@ -82,7 +78,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window]( override def setRuntimeContext(t: RuntimeContext): Unit = { super.setRuntimeContext(t) func match { - case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.setRuntimeContext(t) + case rfunc: ScalaProcessWindowFunction[IN, OUT, KEY, W] => rfunc.setRuntimeContext(t) case _ => } } @@ -90,7 +86,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window]( override def open(parameters: Configuration): Unit = { super.open(parameters) func match { - case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.open(parameters) + case rfunc: ScalaProcessWindowFunction[IN, OUT, KEY, W] => rfunc.open(parameters) case _ => } } @@ -98,7 +94,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window]( override def close(): Unit = { super.close() func match { - case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.close() + case rfunc: ScalaProcessWindowFunction[IN, OUT, KEY, W] => rfunc.close() case _ => } } @@ -114,7 +110,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window]( */ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window]( private[this] val func: ScalaProcessAllWindowFunction[IN, OUT, W]) - extends JRichProcessAllWindowFunction[IN, OUT, W] { + extends JProcessAllWindowFunction[IN, OUT, W] { override def process( context: JProcessAllWindowFunction[IN, OUT, W]#Context, @@ -145,7 +141,7 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window]( override def setRuntimeContext(t: RuntimeContext): Unit = { super.setRuntimeContext(t) func match { - case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.setRuntimeContext(t) + case rfunc : ScalaProcessAllWindowFunction[IN, OUT, W] => rfunc.setRuntimeContext(t) case _ => } } @@ -153,7 +149,7 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window]( override def open(parameters: Configuration): Unit = { super.open(parameters) func match { - case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.open(parameters) + case rfunc : ScalaProcessAllWindowFunction[IN, OUT, W] => rfunc.open(parameters) case _ => } } @@ -161,7 +157,7 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window]( override def close(): Unit = { super.close() func match { - case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.close() + case rfunc : ScalaProcessAllWindowFunction[IN, OUT, W] => rfunc.close() case _ => } } http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala index df005fa..146452b 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala @@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.scala.testutils import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.configuration.Configuration -import org.apache.flink.streaming.api.scala.function.RichProcessAllWindowFunction +import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector class CheckingIdentityRichProcessAllWindowFunction[T, W <: Window] - extends RichProcessAllWindowFunction[T, T, W] { + extends ProcessAllWindowFunction[T, T, W] { override def process(context: Context, input: Iterable[T], out: Collector[T]): Unit = { for (value <- input) { http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala index d62f2d3..2ec179a 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala @@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.scala.testutils import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.configuration.Configuration -import org.apache.flink.streaming.api.scala.function.RichProcessWindowFunction +import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector class CheckingIdentityRichProcessWindowFunction[T, K, W <: Window] - extends RichProcessWindowFunction[T, T, K, W] { + extends ProcessWindowFunction[T, T, K, W] { override def process(key: K, context: Context, input: Iterable[T], out: Collector[T]): Unit = { for (value <- input) {
