This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6a290caf82a [FLINK-36390][core] Remove deprecated open(Configuration)
method in RichFunction
6a290caf82a is described below
commit 6a290caf82a49639a60f2f31082e021d7a811e54
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Sep 12 16:18:34 2024 +0800
[FLINK-36390][core] Remove deprecated open(Configuration) method in
RichFunction
---
.../docs/dev/datastream/fault-tolerance/state.md | 2 +-
docs/content.zh/docs/learn-flink/etl.md | 6 +--
docs/content.zh/docs/learn-flink/event_driven.md | 4 +-
docs/content.zh/docs/ops/metrics.md | 14 +++---
.../docs/dev/datastream/fault-tolerance/state.md | 2 +-
docs/content/docs/learn-flink/etl.md | 6 +--
docs/content/docs/learn-flink/event_driven.md | 4 +-
docs/content/docs/ops/metrics.md | 14 +++---
.../api/common/functions/AbstractRichFunction.java | 3 +-
.../functions/BroadcastVariableInitializer.java | 2 +-
.../flink/api/common/functions/OpenContext.java | 4 +-
.../flink/api/common/functions/RichFunction.java | 58 +---------------------
.../flink/api/common/functions/RuntimeContext.java | 10 ++--
...text.java => WithConfigurationOpenContext.java} | 19 ++++---
.../flink/api/common/state/KeyedStateStore.java | 10 ++--
.../pattern/conditions/RichIterativeCondition.java | 7 ---
.../api/functions/KeyedStateReaderFunction.java | 12 +----
.../flink/state/api/SavepointDeepCopyTest.java | 7 ---
.../state/api/SavepointReaderKeyedStateITCase.java | 6 ---
.../state/api/input/KeyedStateInputFormatTest.java | 24 ---------
.../apache/flink/runtime/operators/BatchTask.java | 2 +-
.../api/checkpoint/CheckpointedFunction.java | 2 +-
.../functions/sink/OutputFormatSinkFunction.java | 21 ++------
.../api/functions/sink/PrintSinkFunction.java | 22 ++------
.../api/functions/sink/SocketClientSink.java | 21 +-------
.../source/FromSplittableIteratorFunction.java | 20 +-------
.../AbstractUdfStreamOperatorLifecycleTest.java | 2 +-
.../flink/table/functions/FunctionContext.java | 12 ++++-
.../planner/codegen/CollectorCodeGenerator.scala | 7 +--
.../table/planner/codegen/ExpressionReducer.scala | 6 +--
.../planner/codegen/FunctionCodeGenerator.scala | 4 +-
.../planner/codegen/LongHashJoinGenerator.scala | 3 +-
.../planner/codegen/LookupJoinCodeGenerator.scala | 6 +--
.../codegen/WatermarkGeneratorCodeGenerator.scala | 3 +-
.../codegen/calls/BridgingFunctionGenUtil.scala | 4 +-
.../expressions/utils/ExpressionTestBase.scala | 4 +-
.../operators/join/LookupJoinHarnessTest.java | 4 +-
pom.xml | 13 +++++
38 files changed, 113 insertions(+), 257 deletions(-)
diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
index a58b5a0807d..e4d8c3b9c13 100644
--- a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
+++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
@@ -160,7 +160,7 @@ public class CountWindowAverage extends
RichFlatMapFunction<Tuple2<Long, Long>,
}
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
diff --git a/docs/content.zh/docs/learn-flink/etl.md
b/docs/content.zh/docs/learn-flink/etl.md
index 591d6899ef9..b5e87ba44c1 100644
--- a/docs/content.zh/docs/learn-flink/etl.md
+++ b/docs/content.zh/docs/learn-flink/etl.md
@@ -236,7 +236,7 @@ minutesByStartCell
对其中的每一个接口,Flink 同样提供了一个所谓 "rich" 的变体,如 `RichFlatMapFunction`,其中增加了以下方法,包括:
-- `open(Configuration c)`
+- `open(OpenContext context)`
- `close()`
- `getRuntimeContext()`
@@ -280,7 +280,7 @@ public static class Deduplicator extends
RichFlatMapFunction<Event, Event> {
ValueState<Boolean> keyHasBeenSeen;
@Override
- public void open(Configuration conf) {
+ public void open(OpenContext ctx) {
ValueStateDescriptor<Boolean> desc = new
ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
keyHasBeenSeen = getRuntimeContext().getState(desc);
}
@@ -373,7 +373,7 @@ public static class ControlFunction extends
RichCoFlatMapFunction<String, String
private ValueState<Boolean> blocked;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
blocked = getRuntimeContext()
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
diff --git a/docs/content.zh/docs/learn-flink/event_driven.md
b/docs/content.zh/docs/learn-flink/event_driven.md
index 9037bc153ff..f95cf42eb16 100644
--- a/docs/content.zh/docs/learn-flink/event_driven.md
+++ b/docs/content.zh/docs/learn-flink/event_driven.md
@@ -79,7 +79,7 @@ public static class PseudoWindow extends
@Override
// 在初始化期间调用一次。
- public void open(Configuration conf) {
+ public void open(OpenContext ctx) {
. . .
}
@@ -126,7 +126,7 @@ public static class PseudoWindow extends
private transient MapState<Long, Float> sumOfTips;
@Override
-public void open(Configuration conf) {
+public void open(OpenContext ctx) {
MapStateDescriptor<Long, Float> sumDesc =
new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
diff --git a/docs/content.zh/docs/ops/metrics.md
b/docs/content.zh/docs/ops/metrics.md
index e461044dddf..b9ab7201dce 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -52,7 +52,7 @@ public class MyMapper extends RichMapFunction<String, String>
{
private transient Counter counter;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
@@ -116,7 +116,7 @@ public class MyMapper extends RichMapFunction<String,
String> {
private transient Counter counter;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCustomCounter", new CustomCounter());
@@ -173,7 +173,7 @@ public class MyMapper extends RichMapFunction<String,
String> {
private transient int valueToExpose = 0;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Integer>() {
@@ -247,7 +247,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram());
@@ -307,7 +307,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
@@ -366,7 +366,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter());
@@ -440,7 +440,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
com.codahale.metrics.Meter dropwizardMeter = new
com.codahale.metrics.Meter();
this.meter = getRuntimeContext()
diff --git a/docs/content/docs/dev/datastream/fault-tolerance/state.md
b/docs/content/docs/dev/datastream/fault-tolerance/state.md
index 97115f7bba0..2486212055e 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/state.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/state.md
@@ -192,7 +192,7 @@ public class CountWindowAverage extends
RichFlatMapFunction<Tuple2<Long, Long>,
}
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
diff --git a/docs/content/docs/learn-flink/etl.md
b/docs/content/docs/learn-flink/etl.md
index aa1423edb35..113bc6baf57 100644
--- a/docs/content/docs/learn-flink/etl.md
+++ b/docs/content/docs/learn-flink/etl.md
@@ -274,7 +274,7 @@ Abstract Method pattern.
For each of these interfaces, Flink also provides a so-called "rich" variant,
e.g.,
`RichFlatMapFunction`, which has some additional methods, including:
-- `open(Configuration c)`
+- `open(OpenContext context)`
- `close()`
- `getRuntimeContext()`
@@ -329,7 +329,7 @@ public static class Deduplicator extends
RichFlatMapFunction<Event, Event> {
ValueState<Boolean> keyHasBeenSeen;
@Override
- public void open(Configuration conf) {
+ public void open(OpenContext ctx) {
ValueStateDescriptor<Boolean> desc = new
ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
keyHasBeenSeen = getRuntimeContext().getState(desc);
}
@@ -447,7 +447,7 @@ public static class ControlFunction extends
RichCoFlatMapFunction<String, String
private ValueState<Boolean> blocked;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
blocked = getRuntimeContext()
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
diff --git a/docs/content/docs/learn-flink/event_driven.md
b/docs/content/docs/learn-flink/event_driven.md
index e03d4598e20..173b0829d31 100644
--- a/docs/content/docs/learn-flink/event_driven.md
+++ b/docs/content/docs/learn-flink/event_driven.md
@@ -75,7 +75,7 @@ public static class PseudoWindow extends
@Override
// Called once during initialization.
- public void open(Configuration conf) {
+ public void open(OpenContext ctx) {
. . .
}
@@ -116,7 +116,7 @@ Things to be aware of:
private transient MapState<Long, Float> sumOfTips;
@Override
-public void open(Configuration conf) {
+public void open(OpenContext ctx) {
MapStateDescriptor<Long, Float> sumDesc =
new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index 5ab5cb21180..3347bd48c0a 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -52,7 +52,7 @@ public class MyMapper extends RichMapFunction<String, String>
{
private transient Counter counter;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
@@ -116,7 +116,7 @@ public class MyMapper extends RichMapFunction<String,
String> {
private transient Counter counter;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCustomCounter", new CustomCounter());
@@ -173,7 +173,7 @@ public class MyMapper extends RichMapFunction<String,
String> {
private transient int valueToExpose = 0;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Integer>() {
@@ -247,7 +247,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram());
@@ -307,7 +307,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
@@ -366,7 +366,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter());
@@ -440,7 +440,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext ctx) {
com.codahale.metrics.Meter dropwizardMeter = new
com.codahale.metrics.Meter();
this.meter = getRuntimeContext()
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
index 7a25c4abc72..d4ae07068d3 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
@@ -19,7 +19,6 @@
package org.apache.flink.api.common.functions;
import org.apache.flink.annotation.Public;
-import org.apache.flink.configuration.Configuration;
import java.io.Serializable;
@@ -69,7 +68,7 @@ public abstract class AbstractRichFunction implements
RichFunction, Serializable
//
--------------------------------------------------------------------------------------------
@Override
- public void open(Configuration parameters) throws Exception {}
+ public void open(OpenContext openContext) throws Exception {}
@Override
public void close() throws Exception {}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
index e99e68a078d..42ef2a8af40 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
@@ -41,7 +41,7 @@ import org.apache.flink.annotation.Public;
*
* private Map<Long, String> map;
*
- * public void open(Configuration cfg) throws Exception {
+ * public void open(OpenContext ctx) throws Exception {
* getRuntimeContext().getBroadcastVariableWithInitializer("mapvar",
* new BroadcastVariableInitializer<Tuple2<Long, String>,
Map<Long, String>>() {
*
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
b/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
index 4ff5484b3b0..ff07bfc1816 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
@@ -18,12 +18,12 @@
package org.apache.flink.api.common.functions;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Public;
/**
* The {@link OpenContext} interface provides necessary information required
by the {@link
* RichFunction} when it is opened. The {@link OpenContext} is currently empty
because it can be
* used to add more methods without affecting the signature of {@code
RichFunction#open}.
*/
-@PublicEvolving
+@Public
public interface OpenContext {}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
index ae83fb30f2b..f1aa70a97d9 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.common.functions;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
/**
* An base interface for all rich user-defined functions. This class defines
methods for the life
@@ -30,51 +29,6 @@ import org.apache.flink.configuration.Configuration;
@Public
public interface RichFunction extends Function {
- /**
- * Initialization method for the function. It is called before the actual
working methods (like
- * <i>map</i> or <i>join</i>) and thus suitable for one time setup work.
For functions that are
- * part of an iteration, this method will be invoked at the beginning of
each iteration
- * superstep.
- *
- * <p>The configuration object passed to the function can be used for
configuration and
- * initialization. The configuration contains all parameters that were
configured on the
- * function in the program composition.
- *
- * <pre>{@code
- * public class MyFilter extends RichFilterFunction<String> {
- *
- * private String searchString;
- *
- * public void open(Configuration parameters) {
- * this.searchString = parameters.getString("foo");
- * }
- *
- * public boolean filter(String value) {
- * return value.equals(searchString);
- * }
- * }
- * }</pre>
- *
- * <p>By default, this method does nothing.
- *
- * @param parameters The configuration containing the parameters attached
to the contract.
- * @throws Exception Implementations may forward exceptions, which are
caught by the runtime.
- * When the runtime catches an exception, it aborts the task and lets
the fail-over logic
- * decide whether to retry the task execution.
- * @see org.apache.flink.configuration.Configuration
- * @deprecated This method is deprecated since Flink 1.19. The users are
recommended to
- * implement {@code open(OpenContext openContext)} and implement
{@code open(Configuration
- * parameters)} with an empty body instead. 1. If you implement {@code
open(OpenContext
- * openContext)}, the {@code open(OpenContext openContext)} will be
invoked and the {@code
- * open(Configuration parameters)} won't be invoked. 2. If you don't
implement {@code
- * open(OpenContext openContext)}, the {@code open(Configuration
parameters)} will be
- * invoked in the default implementation of the {@code
open(OpenContext openContext)}.
- * @see <a
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231">
- * FLIP-344: Remove parameter in RichFunction#open </a>
- */
- @Deprecated
- void open(Configuration parameters) throws Exception;
-
/**
* Initialization method for the function. It is called before the actual
working methods (like
* <i>map</i> or <i>join</i>) and thus suitable for one time setup work.
For functions that are
@@ -100,14 +54,6 @@ public interface RichFunction extends Function {
* }
* }</pre>
*
- * <p>By default, this method does nothing.
- *
- * <p>1. If you implement {@code open(OpenContext openContext)}, the
{@code open(OpenContext
- * openContext)} will be invoked and the {@code open(Configuration
parameters)} won't be
- * invoked. 2. If you don't implement {@code open(OpenContext
openContext)}, the {@code
- * open(Configuration parameters)} will be invoked in the default
implementation of the {@code
- * open(OpenContext openContext)}.
- *
* @param openContext The context containing information about the context
in which the function
* is opened.
* @throws Exception Implementations may forward exceptions, which are
caught by the runtime.
@@ -115,9 +61,7 @@ public interface RichFunction extends Function {
* decide whether to retry the task execution.
*/
@PublicEvolving
- default void open(OpenContext openContext) throws Exception {
- open(new Configuration());
- }
+ void open(OpenContext openContext) throws Exception;
/**
* Tear-down method for the user code. It is called after the last call to
the main working
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index eb94f65a2f8..b36c0adaa2a 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -232,7 +232,7 @@ public interface RuntimeContext {
*
* private ValueState<Long> state;
*
- * public void open(Configuration cfg) {
+ * public void open(OpenContext ctx) {
* state = getRuntimeContext().getState(
* new ValueStateDescriptor<Long>("count",
LongSerializer.INSTANCE, 0L));
* }
@@ -269,7 +269,7 @@ public interface RuntimeContext {
*
* private ListState<MyType> state;
*
- * public void open(Configuration cfg) {
+ * public void open(OpenContext ctx) {
* state = getRuntimeContext().getListState(
* new ListStateDescriptor<>("myState", MyType.class));
* }
@@ -310,7 +310,7 @@ public interface RuntimeContext {
*
* private ReducingState<Long> state;
*
- * public void open(Configuration cfg) {
+ * public void open(OpenContext ctx) {
* state = getRuntimeContext().getReducingState(
* new ReducingStateDescriptor<>("sum", (a, b) -> a + b,
Long.class));
* }
@@ -348,7 +348,7 @@ public interface RuntimeContext {
*
* private AggregatingState<MyType, Long> state;
*
- * public void open(Configuration cfg) {
+ * public void open(OpenContext ctx) {
* state = getRuntimeContext().getAggregatingState(
* new AggregatingStateDescriptor<>("sum",
aggregateFunction, Long.class));
* }
@@ -388,7 +388,7 @@ public interface RuntimeContext {
*
* private MapState<MyType, Long> state;
*
- * public void open(Configuration cfg) {
+ * public void open(OpenContext ctx) {
* state = getRuntimeContext().getMapState(
* new MapStateDescriptor<>("sum", MyType.class,
Long.class));
* }
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
b/flink-core/src/main/java/org/apache/flink/api/common/functions/WithConfigurationOpenContext.java
similarity index 67%
copy from
flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
copy to
flink-core/src/main/java/org/apache/flink/api/common/functions/WithConfigurationOpenContext.java
index 4ff5484b3b0..7ce3cbcaa30 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/functions/WithConfigurationOpenContext.java
@@ -19,11 +19,18 @@
package org.apache.flink.api.common.functions;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
-/**
- * The {@link OpenContext} interface provides necessary information required
by the {@link
- * RichFunction} when it is opened. The {@link OpenContext} is currently empty
because it can be
- * used to add more methods without affecting the signature of {@code
RichFunction#open}.
- */
+/** A special {@link OpenContext} for passing configuration to udf. */
@PublicEvolving
-public interface OpenContext {}
+public class WithConfigurationOpenContext implements OpenContext {
+ private final Configuration configuration;
+
+ public WithConfigurationOpenContext(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
index 18e2d667031..8ce0d9c2c64 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
@@ -46,7 +46,7 @@ public interface KeyedStateStore {
*
* private ValueState<Long> state;
*
- * public void open(Configuration cfg) {
+ * public void open(OpenContext ctx) {
* state = getRuntimeContext().getState(
* new ValueStateDescriptor<Long>("count",
LongSerializer.INSTANCE, 0L));
* }
@@ -83,7 +83,7 @@ public interface KeyedStateStore {
*
* private ListState<MyType> state;
*
- * public void open(Configuration cfg) {
+ * public void open(OpenContext ctx) {
* state = getRuntimeContext().getListState(
* new ListStateDescriptor<>("myState", MyType.class));
* }
@@ -124,7 +124,7 @@ public interface KeyedStateStore {
*
* private ReducingState<Long> state;
*
- * public void open(Configuration cfg) {
+ * public void open(OpenContext ctx) {
* state = getRuntimeContext().getReducingState(
* new ReducingStateDescriptor<>("sum", (a, b) -> a + b,
Long.class));
* }
@@ -162,7 +162,7 @@ public interface KeyedStateStore {
*
* private AggregatingState<MyType, Long> state;
*
- * public void open(Configuration cfg) {
+ * public void open(OpenContext ctx) {
* state = getRuntimeContext().getAggregatingState(
* new AggregatingStateDescriptor<>("sum",
aggregateFunction, Long.class));
* }
@@ -202,7 +202,7 @@ public interface KeyedStateStore {
*
* private MapState<MyType, Long> state;
*
- * public void open(Configuration cfg) {
+ * public void open(OpenContext ctx) {
* state = getRuntimeContext().getMapState(
* new MapStateDescriptor<>("sum", MyType.class,
Long.class));
* }
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
index 4aaf582ecc5..e5f609500d2 100644
---
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
@@ -22,7 +22,6 @@ import
org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
/**
@@ -69,12 +68,6 @@ public abstract class RichIterativeCondition<T> extends
IterativeCondition<T>
@Override
public void open(OpenContext openContext) throws Exception {}
- @Override
- public void open(Configuration parameters) throws Exception {
- throw new UnsupportedOperationException(
- "This method is deprecated and shouldn't be invoked. Please
use open(OpenContext) instead.");
- }
-
@Override
public void close() throws Exception {}
}
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java
index 5bbf8c981fa..652d9253d45 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java
@@ -21,7 +21,6 @@ package org.apache.flink.state.api.functions;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.OpenContext;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import java.util.Set;
@@ -32,7 +31,7 @@ import java.util.Set;
* <p>For every key {@link #readKey(Object, Context, Collector)} is invoked.
This can produce zero
* or more elements as output.
*
- * <p><b>NOTE:</b> State descriptors must be eagerly registered in {@code
open(Configuration)}. Any
+ * <p><b>NOTE:</b> State descriptors must be eagerly registered in {@code
open(OpenContext)}. Any
* attempt to dynamically register states inside of {@code readKey} will
result in a {@code
* RuntimeException}.
*
@@ -51,15 +50,6 @@ public abstract class KeyedStateReaderFunction<K, OUT>
extends AbstractRichFunct
private static final long serialVersionUID = 3873843034140417407L;
- /**
- * Initialization method for the function. It is called before {@link
#readKey(Object, Context,
- * Collector)} and thus suitable for one time setup work.
- *
- * <p>This is the only method that my register state descriptors within a
{@code
- * KeyedStateReaderFunction}.
- */
- public abstract void open(Configuration parameters) throws Exception;
-
/**
* Process one key from the restored state backend.
*
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
index 17845da02ef..ce4094a4d0b 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
@@ -110,12 +109,6 @@ public class SavepointDeepCopyTest extends
AbstractTestBaseJUnit4 {
state = getRuntimeContext().getState(stateDescriptor);
}
- @Override
- public void open(Configuration parameters) throws Exception {
- throw new UnsupportedOperationException(
- "This method is deprecated and shouldn't be invoked.
Please use open(OpenContext) instead.");
- }
-
@Override
public void readKey(String key, Context ctx, Collector<Tuple2<String,
String>> out)
throws Exception {
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java
index 75f9f6f3dee..de3bdcd5f3a 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java
@@ -112,12 +112,6 @@ public abstract class SavepointReaderKeyedStateITCase<B
extends StateBackend>
state = getRuntimeContext().getState(valueState);
}
- @Override
- public void open(Configuration parameters) {
- throw new UnsupportedOperationException(
- "This method is deprecated and shouldn't be invoked.
Please use open(OpenContext) instead.");
- }
-
@Override
public void readKey(Integer key, Context ctx, Collector<Pojo> out)
throws Exception {
Pojo pojo = new Pojo();
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
index c287cdb5598..9b9f845bba1 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
@@ -262,12 +262,6 @@ public class KeyedStateInputFormatTest {
state = getRuntimeContext().getState(stateDescriptor);
}
- @Override
- public void open(Configuration parameters) throws Exception {
- throw new UnsupportedOperationException(
- "This method is deprecated and shouldn't be invoked.
Please use open(OpenContext) instead.");
- }
-
@Override
public void readKey(
Integer key, KeyedStateReaderFunction.Context ctx,
Collector<Integer> out)
@@ -284,12 +278,6 @@ public class KeyedStateInputFormatTest {
state = getRuntimeContext().getState(stateDescriptor);
}
- @Override
- public void open(Configuration parameters) throws Exception {
- throw new UnsupportedOperationException(
- "This method is deprecated and shouldn't be invoked.
Please use open(OpenContext) instead.");
- }
-
@Override
public void readKey(
Integer key, KeyedStateReaderFunction.Context ctx,
Collector<Integer> out)
@@ -306,12 +294,6 @@ public class KeyedStateInputFormatTest {
getRuntimeContext().getState(stateDescriptor);
}
- @Override
- public void open(Configuration parameters) throws Exception {
- throw new UnsupportedOperationException(
- "This method is deprecated and shouldn't be invoked.
Please use open(OpenContext) instead.");
- }
-
@Override
public void readKey(
Integer key, KeyedStateReaderFunction.Context ctx,
Collector<Integer> out)
@@ -360,12 +342,6 @@ public class KeyedStateInputFormatTest {
state = getRuntimeContext().getState(stateDescriptor);
}
- @Override
- public void open(Configuration parameters) throws Exception {
- throw new UnsupportedOperationException(
- "This method is deprecated and shouldn't be invoked.
Please use open(OpenContext) instead.");
- }
-
@Override
public void readKey(
Integer key, KeyedStateReaderFunction.Context ctx,
Collector<Integer> out)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index 658e026f630..9388d88cb15 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -1496,7 +1496,7 @@ public class BatchTask<S extends Function, OT> extends
AbstractInvokable
FunctionUtils.openFunction(stub, DefaultOpenContext.INSTANCE);
} catch (Throwable t) {
throw new Exception(
- "The user defined 'open(Configuration)' method in "
+ "The user defined 'open(OpenContext)' method in "
+ stub.getClass().toString()
+ " caused an exception: "
+ t.getMessage(),
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
index af3a2c7a9ff..f46855cc559 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
@@ -131,7 +131,7 @@ import
org.apache.flink.runtime.state.FunctionSnapshotContext;
*
* private ValueState<Long> count;
*
- * public void open(Configuration cfg) throws Exception {
+ * public void open(OpenContext ctx) throws Exception {
* count = getRuntimeContext().getState(new
ValueStateDescriptor<>("myCount", Long.class));
* }
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
index c0894e52414..f7c8b2c55e9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.functions.sink;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
import org.apache.flink.api.common.io.OutputFormat;
@@ -57,26 +58,10 @@ public class OutputFormatSinkFunction<IN> extends
RichSinkFunction<IN>
this.format = format;
}
- /**
- * Initialization method for the {@link OutputFormatSinkFunction}.
- *
- * @param parameters The configuration containing the parameters attached
to the contract.
- * @throws Exception if an error happens.
- * @deprecated This method is deprecated since Flink 1.19. The users are
recommended to
- * implement {@code open(OpenContext openContext)} and override {@code
open(Configuration
- * parameters)} with an empty body instead. 1. If you implement {@code
open(OpenContext
- * openContext)}, the {@code open(OpenContext openContext)} will be
invoked and the {@code
- * open(Configuration parameters)} won't be invoked. 2. If you don't
implement {@code
- * open(OpenContext openContext)}, the {@code open(Configuration
parameters)} will be
- * invoked in the default implementation of the {@code
open(OpenContext openContext)}.
- * @see <a
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231">
- * FLIP-344: Remove parameter in RichFunction#open </a>
- */
- @Deprecated
@Override
- public void open(Configuration parameters) throws Exception {
+ public void open(OpenContext openContext) throws Exception {
RuntimeContext context = getRuntimeContext();
- format.configure(parameters);
+ format.configure(new Configuration());
int indexInSubtaskGroup =
context.getTaskInfo().getIndexOfThisSubtask();
int currentNumberOfSubtasks =
context.getTaskInfo().getNumberOfParallelSubtasks();
format.open(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
index e9c7b546e91..5ece13b2a54 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.api.functions.sink;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
/**
@@ -69,25 +69,9 @@ public class PrintSinkFunction<IN> extends
RichSinkFunction<IN>
writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
}
- /**
- * Initialization method for the {@link PrintSinkFunction}.
- *
- * @param parameters The configuration containing the parameters attached
to the contract.
- * @throws Exception if an error happens.
- * @deprecated This method is deprecated since Flink 1.19. The users are
recommended to
- * implement {@code open(OpenContext openContext)} and override {@code
open(Configuration
- * parameters)} with an empty body instead. 1. If you implement {@code
open(OpenContext
- * openContext)}, the {@code open(OpenContext openContext)} will be
invoked and the {@code
- * open(Configuration parameters)} won't be invoked. 2. If you don't
implement {@code
- * open(OpenContext openContext)}, the {@code open(Configuration
parameters)} will be
- * invoked in the default implementation of the {@code
open(OpenContext openContext)}.
- * @see <a
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231">
- * FLIP-344: Remove parameter in RichFunction#open </a>
- */
- @Deprecated
@Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
+ public void open(OpenContext openContext) throws Exception {
+ super.open(openContext);
StreamingRuntimeContext context = (StreamingRuntimeContext)
getRuntimeContext();
writer.open(
context.getTaskInfo().getIndexOfThisSubtask(),
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index 40162f3ca35..d6736526466 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -18,8 +18,8 @@
package org.apache.flink.streaming.api.functions.sink;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.SerializableObject;
import org.slf4j.Logger;
@@ -129,25 +129,8 @@ public class SocketClientSink<IN> extends
RichSinkFunction<IN> {
// ------------------------------------------------------------------------
// Life cycle
// ------------------------------------------------------------------------
-
- /**
- * Initialize the connection with the Socket in the server.
- *
- * @param parameters The configuration containing the parameters attached
to the contract.
- * @throws Exception if an error happens.
- * @deprecated This method is deprecated since Flink 1.19. The users are
recommended to
- * implement {@code open(OpenContext openContext)} and override {@code
open(Configuration
- * parameters)} with an empty body instead. 1. If you implement {@code
open(OpenContext
- * openContext)}, the {@code open(OpenContext openContext)} will be
invoked and the {@code
- * open(Configuration parameters)} won't be invoked. 2. If you don't
implement {@code
- * open(OpenContext openContext)}, the {@code open(Configuration
parameters)} will be
- * invoked in the default implementation of the {@code
open(OpenContext openContext)}.
- * @see <a
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231">
- * FLIP-344: Remove parameter in RichFunction#open </a>
- */
- @Deprecated
@Override
- public void open(Configuration parameters) throws Exception {
+ public void open(OpenContext openContext) throws Exception {
try {
synchronized (lock) {
createConnection();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
index 78962212951..a44e60c7e09 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.api.functions.source;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.util.SplittableIterator;
import java.util.Iterator;
@@ -46,24 +46,8 @@ public class FromSplittableIteratorFunction<T> extends
RichParallelSourceFunctio
this.fullIterator = iterator;
}
- /**
- * Initialization method for the {@link FromSplittableIteratorFunction}.
- *
- * @param parameters The configuration containing the parameters attached
to the contract.
- * @throws Exception if an error happens.
- * @deprecated This method is deprecated since Flink 1.19. The users are
recommended to
- * implement {@code open(OpenContext openContext)} and override {@code
open(Configuration
- * parameters)} with an empty body instead. 1. If you implement {@code
open(OpenContext
- * openContext)}, the {@code open(OpenContext openContext)} will be
invoked and the {@code
- * open(Configuration parameters)} won't be invoked. 2. If you don't
implement {@code
- * open(OpenContext openContext)}, the {@code open(Configuration
parameters)} will be
- * invoked in the default implementation of the {@code
open(OpenContext openContext)}.
- * @see <a
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231">
- * FLIP-344: Remove parameter in RichFunction#open </a>
- */
- @Deprecated
@Override
- public void open(Configuration parameters) throws Exception {
+ public void open(OpenContext openContext) throws Exception {
int numberOfSubTasks =
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
int indexofThisSubTask =
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
iterator = fullIterator.split(numberOfSubTasks)[indexofThisSubTask];
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index e91f7e9fc03..bf76308424d 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -114,7 +114,7 @@ class AbstractUdfStreamOperatorLifecycleTest {
private static final String ALL_METHODS_RICH_FUNCTION =
"[close[], getIterationRuntimeContext[], getRuntimeContext[]"
- + ", open[class
org.apache.flink.configuration.Configuration], open[interface
org.apache.flink.api.common.functions.OpenContext], setRuntimeContext[interface
"
+ + ", open[interface
org.apache.flink.api.common.functions.OpenContext], setRuntimeContext[interface
"
+ "org.apache.flink.api.common.functions.RuntimeContext]]";
private static final List<String> ACTUAL_ORDER_TRACKING =
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
index aa18c5608ef..5facea46ab4 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
@@ -20,7 +20,9 @@ package org.apache.flink.table.functions;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.WithConfigurationOpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -62,10 +64,16 @@ public class FunctionContext {
public FunctionContext(
@Nullable RuntimeContext context,
@Nullable ClassLoader userClassLoader,
- @Nullable Configuration jobParameters) {
+ @Nullable OpenContext openContext) {
this.context = context;
this.userClassLoader = userClassLoader;
- this.jobParameters = jobParameters != null ? jobParameters.toMap() :
null;
+ if (openContext instanceof WithConfigurationOpenContext) {
+ Configuration configuration =
+ ((WithConfigurationOpenContext)
openContext).getConfiguration();
+ this.jobParameters = configuration.toMap();
+ } else {
+ this.jobParameters = null;
+ }
}
public FunctionContext(RuntimeContext context) {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala
index bd9e4d8f253..5fcede82e7d 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala
@@ -17,6 +17,7 @@
*/
package org.apache.flink.table.planner.codegen
+import org.apache.flink.api.common.functions.{DefaultOpenContext, OpenContext}
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.planner.codegen.CodeGenUtils._
import org.apache.flink.table.planner.codegen.Indenter.toISC
@@ -74,7 +75,7 @@ object CollectorCodeGenerator {
}
@Override
- public void open(${className[Configuration]} parameters) throws
Exception {
+ public void open(${className[OpenContext]} openContext) throws
Exception {
${ctx.reuseOpenCode()}
}
@@ -142,7 +143,7 @@ object CollectorCodeGenerator {
}
@Override
- public void open(${className[Configuration]} parameters) throws
Exception {
+ public void open(${className[OpenContext]} openContext) throws
Exception {
${ctx.reuseOpenCode()}
}
@@ -180,7 +181,7 @@ object CollectorCodeGenerator {
s"""
|$collectorTerm = new ${generatedCollector.getClassName}();
|$collectorTerm.setRuntimeContext(getRuntimeContext());
- |$collectorTerm.open(new ${className[Configuration]}());
+ |$collectorTerm.open(new ${className[DefaultOpenContext]}());
|""".stripMargin
ctx.addReusableOpenStatement(openCollector)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
index 63616c0a409..11a4ac1f15b 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
@@ -17,7 +17,7 @@
*/
package org.apache.flink.table.planner.codegen
-import org.apache.flink.api.common.functions.{MapFunction, OpenContext,
RichMapFunction}
+import org.apache.flink.api.common.functions.{DefaultOpenContext, MapFunction,
OpenContext, RichMapFunction, WithConfigurationOpenContext}
import org.apache.flink.configuration.{Configuration, PipelineOptions,
ReadableConfig}
import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.data.{DecimalData, GenericRowData, TimestampData}
@@ -107,7 +107,7 @@ class ExpressionReducer(
.getOrElse(new Configuration)
val reduced =
try {
- richMapFunction.open(parameters)
+ richMapFunction.open(new WithConfigurationOpenContext(parameters))
// execute
richMapFunction.map(EMPTY_ROW)
} catch {
@@ -315,7 +315,7 @@ class ConstantCodeGeneratorContext(tableConfig:
ReadableConfig, classLoader: Cla
super.addReusableFunction(
function,
classOf[FunctionContext],
- Seq("null", "this.getClass().getClassLoader()", "parameters"))
+ Seq("null", "this.getClass().getClassLoader()", "openContext"))
}
override def addReusableConverter(dataType: DataType, classLoaderTerm:
String = null): String = {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
index 76a5c4ca460..3b5943d6ab6 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
@@ -154,7 +154,7 @@ object FunctionCodeGenerator {
${ctx.reuseConstructorCode(funcName)}
@Override
- public void open(${classOf[Configuration].getCanonicalName}
parameters) throws Exception {
+ public void open(${classOf[OpenContext].getCanonicalName} openContext)
throws Exception {
${ctx.reuseOpenCode()}
}
@@ -230,7 +230,7 @@ object FunctionCodeGenerator {
${ctx.reuseConstructorCode(funcName)}
@Override
- public void open(${className[Configuration]} parameters) throws
Exception {
+ public void open(${className[OpenContext]} openContext) throws
Exception {
${ctx.reuseOpenCode()}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
index fdc7ac8036b..4256c90b596 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
@@ -17,6 +17,7 @@
*/
package org.apache.flink.table.planner.codegen
+import org.apache.flink.api.common.functions.DefaultOpenContext
import org.apache.flink.configuration.{Configuration, ReadableConfig}
import org.apache.flink.metrics.Gauge
import org.apache.flink.table.data.{RowData, TimestampData}
@@ -157,7 +158,7 @@ object LongHashJoinGenerator {
val condRefs = ctx.addReusableObject(condFunc.getReferences, "condRefs")
ctx.addReusableInitStatement(s"condFunc = new
${condFunc.getClassName}($condRefs);")
ctx.addReusableOpenStatement(s"condFunc.setRuntimeContext(getRuntimeContext());")
- ctx.addReusableOpenStatement(s"condFunc.open(new
${className[Configuration]}());")
+ ctx.addReusableOpenStatement(s"condFunc.open(new
${className[DefaultOpenContext]}());")
ctx.addReusableCloseStatement(s"condFunc.close();")
val leftIsBuildTerm = newName(ctx, "leftIsBuild")
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
index 45a615707f8..5aa5b6c8d20 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
@@ -17,7 +17,7 @@
*/
package org.apache.flink.table.planner.codegen
-import org.apache.flink.api.common.functions.{FlatMapFunction, Function}
+import org.apache.flink.api.common.functions.{FlatMapFunction, Function,
OpenContext}
import org.apache.flink.configuration.{Configuration, ReadableConfig}
import org.apache.flink.streaming.api.functions.async.AsyncFunction
import org.apache.flink.table.api.ValidationException
@@ -381,7 +381,7 @@ object LookupJoinCodeGenerator {
}
@Override
- public void open(${className[Configuration]} parameters) throws
Exception {
+ public void open(${className[OpenContext]} openContext) throws
Exception {
${ctx.reuseOpenCode()}
}
@@ -492,7 +492,7 @@ object LookupJoinCodeGenerator {
}
@Override
- public void open(${className[Configuration]} parameters) throws
Exception {
+ public void open(${className[OpenContext]} openContext) throws
Exception {
${ctx.reuseOpenCode()}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
index e15c2c8a205..18f3337902f 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.codegen
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier
+import org.apache.flink.api.common.functions.OpenContext
import org.apache.flink.configuration.{Configuration, ReadableConfig}
import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
@@ -87,7 +88,7 @@ object WatermarkGeneratorCodeGenerator {
}
@Override
- public void open(${classOf[Configuration].getCanonicalName}
parameters) throws Exception {
+ public void open(${classOf[OpenContext].getCanonicalName} openContext)
throws Exception {
${ctx.reuseOpenCode()}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala
index 5ad355e19ee..f04496d4d90 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala
@@ -17,7 +17,7 @@
*/
package org.apache.flink.table.planner.codegen.calls
-import org.apache.flink.api.common.functions.{AbstractRichFunction,
RichFunction}
+import org.apache.flink.api.common.functions.{AbstractRichFunction,
OpenContext, RichFunction}
import org.apache.flink.configuration.{Configuration, ReadableConfig}
import org.apache.flink.table.api.{DataTypes, TableException}
import org.apache.flink.table.api.Expressions.callSql
@@ -630,7 +630,7 @@ object BridgingFunctionGenUtil {
| ${ctx.reuseInitCode()}
| }
|
- | public void open(${className[Configuration]} parameters) throws
Exception {
+ | public void open(${className[OpenContext]} openContext) throws
Exception {
| ${ctx.reuseOpenCode()}
| }
|
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index ffbdfec752c..3f49c9bf116 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.planner.expressions.utils
import org.apache.flink.api.common.{TaskInfo, TaskInfoImpl}
-import org.apache.flink.api.common.functions.{MapFunction, RichFunction,
RichMapFunction}
+import org.apache.flink.api.common.functions.{DefaultOpenContext, MapFunction,
OpenContext, RichFunction, RichMapFunction}
import org.apache.flink.api.common.functions.util.RuntimeUDFContext
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.configuration.Configuration
@@ -241,7 +241,7 @@ abstract class ExpressionTestBase(isStreaming: Boolean =
true) {
Collections.emptyMap(),
null)
richMapper.setRuntimeContext(t)
- richMapper.open(new Configuration())
+ richMapper.open(DefaultOpenContext.INSTANCE)
}
val testRow = if (containsLegacyTypes) {
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
index 0b8f2a6fc35..2851a100cd3 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.table.runtime.operators.join;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -332,7 +332,7 @@ class LookupJoinHarnessTest {
private static final long serialVersionUID = 1L;
@Override
- public void open(Configuration parameters) throws Exception {
+ public void open(OpenContext context) throws Exception {
// do nothing
}
diff --git a/pom.xml b/pom.xml
index 907b938e62d..596e9ca4946 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2339,6 +2339,19 @@ under the License.
<!-- Mark these
2 methods to @Internal. Tracked under FLINK-34130, should be removed in 2.0 -->
<exclude>org.apache.flink.configuration.Configuration#getBytes(java.lang.String,byte[])</exclude>
<exclude>org.apache.flink.configuration.Configuration#setBytes(java.lang.String,byte[])</exclude>
+ <!-- FLIP-344:
Remove parameter in RichFunction#open in 2.0 -->
+
<exclude>org.apache.flink.api.common.functions.AbstractRichFunction#open(org.apache.flink.configuration.Configuration)</exclude>
+
<exclude>org.apache.flink.api.common.functions.RichCoGroupFunction</exclude>
+
<exclude>org.apache.flink.api.common.functions.RichCrossFunction</exclude>
+
<exclude>org.apache.flink.api.common.functions.RichFilterFunction</exclude>
+
<exclude>org.apache.flink.api.common.functions.RichFlatJoinFunction</exclude>
+
<exclude>org.apache.flink.api.common.functions.RichFlatMapFunction</exclude>
+
<exclude>org.apache.flink.api.common.functions.RichGroupCombineFunction</exclude>
+
<exclude>org.apache.flink.api.common.functions.RichGroupReduceFunction</exclude>
+
<exclude>org.apache.flink.api.common.functions.RichJoinFunction</exclude>
+
<exclude>org.apache.flink.api.common.functions.RichMapFunction</exclude>
+
<exclude>org.apache.flink.api.common.functions.RichMapPartitionFunction</exclude>
+
<exclude>org.apache.flink.api.common.functions.RichReduceFunction</exclude>
<!--
FLINK-34085 Deprecated string config should be removed in 2.0 -->
<exclude>org.apache.flink.configuration.ConfigConstants</exclude>
<!--
FLINK-35886: WatermarksWithIdleness constructor was marked as deprecated -->