This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a290caf82a [FLINK-36390][core] Remove deprecated open(Configuration) 
method in RichFunction
6a290caf82a is described below

commit 6a290caf82a49639a60f2f31082e021d7a811e54
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Sep 12 16:18:34 2024 +0800

    [FLINK-36390][core] Remove deprecated open(Configuration) method in 
RichFunction
---
 .../docs/dev/datastream/fault-tolerance/state.md   |  2 +-
 docs/content.zh/docs/learn-flink/etl.md            |  6 +--
 docs/content.zh/docs/learn-flink/event_driven.md   |  4 +-
 docs/content.zh/docs/ops/metrics.md                | 14 +++---
 .../docs/dev/datastream/fault-tolerance/state.md   |  2 +-
 docs/content/docs/learn-flink/etl.md               |  6 +--
 docs/content/docs/learn-flink/event_driven.md      |  4 +-
 docs/content/docs/ops/metrics.md                   | 14 +++---
 .../api/common/functions/AbstractRichFunction.java |  3 +-
 .../functions/BroadcastVariableInitializer.java    |  2 +-
 .../flink/api/common/functions/OpenContext.java    |  4 +-
 .../flink/api/common/functions/RichFunction.java   | 58 +---------------------
 .../flink/api/common/functions/RuntimeContext.java | 10 ++--
 ...text.java => WithConfigurationOpenContext.java} | 19 ++++---
 .../flink/api/common/state/KeyedStateStore.java    | 10 ++--
 .../pattern/conditions/RichIterativeCondition.java |  7 ---
 .../api/functions/KeyedStateReaderFunction.java    | 12 +----
 .../flink/state/api/SavepointDeepCopyTest.java     |  7 ---
 .../state/api/SavepointReaderKeyedStateITCase.java |  6 ---
 .../state/api/input/KeyedStateInputFormatTest.java | 24 ---------
 .../apache/flink/runtime/operators/BatchTask.java  |  2 +-
 .../api/checkpoint/CheckpointedFunction.java       |  2 +-
 .../functions/sink/OutputFormatSinkFunction.java   | 21 ++------
 .../api/functions/sink/PrintSinkFunction.java      | 22 ++------
 .../api/functions/sink/SocketClientSink.java       | 21 +-------
 .../source/FromSplittableIteratorFunction.java     | 20 +-------
 .../AbstractUdfStreamOperatorLifecycleTest.java    |  2 +-
 .../flink/table/functions/FunctionContext.java     | 12 ++++-
 .../planner/codegen/CollectorCodeGenerator.scala   |  7 +--
 .../table/planner/codegen/ExpressionReducer.scala  |  6 +--
 .../planner/codegen/FunctionCodeGenerator.scala    |  4 +-
 .../planner/codegen/LongHashJoinGenerator.scala    |  3 +-
 .../planner/codegen/LookupJoinCodeGenerator.scala  |  6 +--
 .../codegen/WatermarkGeneratorCodeGenerator.scala  |  3 +-
 .../codegen/calls/BridgingFunctionGenUtil.scala    |  4 +-
 .../expressions/utils/ExpressionTestBase.scala     |  4 +-
 .../operators/join/LookupJoinHarnessTest.java      |  4 +-
 pom.xml                                            | 13 +++++
 38 files changed, 113 insertions(+), 257 deletions(-)

diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md 
b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
index a58b5a0807d..e4d8c3b9c13 100644
--- a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
+++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
@@ -160,7 +160,7 @@ public class CountWindowAverage extends 
RichFlatMapFunction<Tuple2<Long, Long>,
     }
 
     @Override
-    public void open(Configuration config) {
+    public void open(OpenContext ctx) {
         ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                 new ValueStateDescriptor<>(
                         "average", // the state name
diff --git a/docs/content.zh/docs/learn-flink/etl.md 
b/docs/content.zh/docs/learn-flink/etl.md
index 591d6899ef9..b5e87ba44c1 100644
--- a/docs/content.zh/docs/learn-flink/etl.md
+++ b/docs/content.zh/docs/learn-flink/etl.md
@@ -236,7 +236,7 @@ minutesByStartCell
 
 对其中的每一个接口,Flink 同样提供了一个所谓 "rich" 的变体,如 `RichFlatMapFunction`,其中增加了以下方法,包括:
 
-- `open(Configuration c)`
+- `open(OpenContext context)`
 - `close()`
 - `getRuntimeContext()`
 
@@ -280,7 +280,7 @@ public static class Deduplicator extends 
RichFlatMapFunction<Event, Event> {
     ValueState<Boolean> keyHasBeenSeen;
 
     @Override
-    public void open(Configuration conf) {
+    public void open(OpenContext ctx) {
         ValueStateDescriptor<Boolean> desc = new 
ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
         keyHasBeenSeen = getRuntimeContext().getState(desc);
     }
@@ -373,7 +373,7 @@ public static class ControlFunction extends 
RichCoFlatMapFunction<String, String
     private ValueState<Boolean> blocked;
       
     @Override
-    public void open(Configuration config) {
+    public void open(OpenContext ctx) {
         blocked = getRuntimeContext()
             .getState(new ValueStateDescriptor<>("blocked", Boolean.class));
     }
diff --git a/docs/content.zh/docs/learn-flink/event_driven.md 
b/docs/content.zh/docs/learn-flink/event_driven.md
index 9037bc153ff..f95cf42eb16 100644
--- a/docs/content.zh/docs/learn-flink/event_driven.md
+++ b/docs/content.zh/docs/learn-flink/event_driven.md
@@ -79,7 +79,7 @@ public static class PseudoWindow extends
 
     @Override
     // 在初始化期间调用一次。
-    public void open(Configuration conf) {
+    public void open(OpenContext ctx) {
         . . .
     }
 
@@ -126,7 +126,7 @@ public static class PseudoWindow extends
 private transient MapState<Long, Float> sumOfTips;
 
 @Override
-public void open(Configuration conf) {
+public void open(OpenContext ctx) {
 
     MapStateDescriptor<Long, Float> sumDesc =
             new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
diff --git a/docs/content.zh/docs/ops/metrics.md 
b/docs/content.zh/docs/ops/metrics.md
index e461044dddf..b9ab7201dce 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -52,7 +52,7 @@ public class MyMapper extends RichMapFunction<String, String> 
{
   private transient Counter counter;
 
   @Override
-  public void open(Configuration config) {
+  public void open(OpenContext ctx) {
     this.counter = getRuntimeContext()
       .getMetricGroup()
       .counter("myCounter");
@@ -116,7 +116,7 @@ public class MyMapper extends RichMapFunction<String, 
String> {
   private transient Counter counter;
 
   @Override
-  public void open(Configuration config) {
+  public void open(OpenContext ctx) {
     this.counter = getRuntimeContext()
       .getMetricGroup()
       .counter("myCustomCounter", new CustomCounter());
@@ -173,7 +173,7 @@ public class MyMapper extends RichMapFunction<String, 
String> {
   private transient int valueToExpose = 0;
 
   @Override
-  public void open(Configuration config) {
+  public void open(OpenContext ctx) {
     getRuntimeContext()
       .getMetricGroup()
       .gauge("MyGauge", new Gauge<Integer>() {
@@ -247,7 +247,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
   private transient Histogram histogram;
 
   @Override
-  public void open(Configuration config) {
+  public void open(OpenContext ctx) {
     this.histogram = getRuntimeContext()
       .getMetricGroup()
       .histogram("myHistogram", new MyHistogram());
@@ -307,7 +307,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
   private transient Histogram histogram;
 
   @Override
-  public void open(Configuration config) {
+  public void open(OpenContext ctx) {
     com.codahale.metrics.Histogram dropwizardHistogram =
       new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
 
@@ -366,7 +366,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
   private transient Meter meter;
 
   @Override
-  public void open(Configuration config) {
+  public void open(OpenContext ctx) {
     this.meter = getRuntimeContext()
       .getMetricGroup()
       .meter("myMeter", new MyMeter());
@@ -440,7 +440,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
   private transient Meter meter;
 
   @Override
-  public void open(Configuration config) {
+  public void open(OpenContext ctx) {
     com.codahale.metrics.Meter dropwizardMeter = new 
com.codahale.metrics.Meter();
 
     this.meter = getRuntimeContext()
diff --git a/docs/content/docs/dev/datastream/fault-tolerance/state.md 
b/docs/content/docs/dev/datastream/fault-tolerance/state.md
index 97115f7bba0..2486212055e 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/state.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/state.md
@@ -192,7 +192,7 @@ public class CountWindowAverage extends 
RichFlatMapFunction<Tuple2<Long, Long>,
     }
 
     @Override
-    public void open(Configuration config) {
+    public void open(OpenContext ctx) {
         ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                 new ValueStateDescriptor<>(
                         "average", // the state name
diff --git a/docs/content/docs/learn-flink/etl.md 
b/docs/content/docs/learn-flink/etl.md
index aa1423edb35..113bc6baf57 100644
--- a/docs/content/docs/learn-flink/etl.md
+++ b/docs/content/docs/learn-flink/etl.md
@@ -274,7 +274,7 @@ Abstract Method pattern.
 For each of these interfaces, Flink also provides a so-called "rich" variant, 
e.g.,
 `RichFlatMapFunction`, which has some additional methods, including:
 
-- `open(Configuration c)`
+- `open(OpenContext context)`
 - `close()`
 - `getRuntimeContext()`
 
@@ -329,7 +329,7 @@ public static class Deduplicator extends 
RichFlatMapFunction<Event, Event> {
     ValueState<Boolean> keyHasBeenSeen;
 
     @Override
-    public void open(Configuration conf) {
+    public void open(OpenContext ctx) {
         ValueStateDescriptor<Boolean> desc = new 
ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
         keyHasBeenSeen = getRuntimeContext().getState(desc);
     }
@@ -447,7 +447,7 @@ public static class ControlFunction extends 
RichCoFlatMapFunction<String, String
     private ValueState<Boolean> blocked;
       
     @Override
-    public void open(Configuration config) {
+    public void open(OpenContext ctx) {
         blocked = getRuntimeContext()
             .getState(new ValueStateDescriptor<>("blocked", Boolean.class));
     }
diff --git a/docs/content/docs/learn-flink/event_driven.md 
b/docs/content/docs/learn-flink/event_driven.md
index e03d4598e20..173b0829d31 100644
--- a/docs/content/docs/learn-flink/event_driven.md
+++ b/docs/content/docs/learn-flink/event_driven.md
@@ -75,7 +75,7 @@ public static class PseudoWindow extends
 
     @Override
     // Called once during initialization.
-    public void open(Configuration conf) {
+    public void open(OpenContext ctx) {
         . . .
     }
 
@@ -116,7 +116,7 @@ Things to be aware of:
 private transient MapState<Long, Float> sumOfTips;
 
 @Override
-public void open(Configuration conf) {
+public void open(OpenContext ctx) {
 
     MapStateDescriptor<Long, Float> sumDesc =
             new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index 5ab5cb21180..3347bd48c0a 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -52,7 +52,7 @@ public class MyMapper extends RichMapFunction<String, String> 
{
   private transient Counter counter;
 
   @Override
-  public void open(Configuration config) {
+  public void open(OpenContext ctx) {
     this.counter = getRuntimeContext()
       .getMetricGroup()
       .counter("myCounter");
@@ -116,7 +116,7 @@ public class MyMapper extends RichMapFunction<String, 
String> {
   private transient Counter counter;
 
   @Override
-  public void open(Configuration config) {
+  public void open(OpenContext ctx) {
     this.counter = getRuntimeContext()
       .getMetricGroup()
       .counter("myCustomCounter", new CustomCounter());
@@ -173,7 +173,7 @@ public class MyMapper extends RichMapFunction<String, 
String> {
   private transient int valueToExpose = 0;
 
   @Override
-  public void open(Configuration config) {
+  public void open(OpenContext ctx) {
     getRuntimeContext()
       .getMetricGroup()
       .gauge("MyGauge", new Gauge<Integer>() {
@@ -247,7 +247,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
   private transient Histogram histogram;
 
   @Override
-  public void open(Configuration config) {
+  public void open(OpenContext ctx) {
     this.histogram = getRuntimeContext()
       .getMetricGroup()
       .histogram("myHistogram", new MyHistogram());
@@ -307,7 +307,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
   private transient Histogram histogram;
 
   @Override
-  public void open(Configuration config) {
+  public void open(OpenContext ctx) {
     com.codahale.metrics.Histogram dropwizardHistogram =
       new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
 
@@ -366,7 +366,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
   private transient Meter meter;
 
   @Override
-  public void open(Configuration config) {
+  public void open(OpenContext ctx) {
     this.meter = getRuntimeContext()
       .getMetricGroup()
       .meter("myMeter", new MyMeter());
@@ -440,7 +440,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
   private transient Meter meter;
 
   @Override
-  public void open(Configuration config) {
+  public void open(OpenContext ctx) {
     com.codahale.metrics.Meter dropwizardMeter = new 
com.codahale.metrics.Meter();
 
     this.meter = getRuntimeContext()
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
index 7a25c4abc72..d4ae07068d3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.common.functions;
 
 import org.apache.flink.annotation.Public;
-import org.apache.flink.configuration.Configuration;
 
 import java.io.Serializable;
 
@@ -69,7 +68,7 @@ public abstract class AbstractRichFunction implements 
RichFunction, Serializable
     // 
--------------------------------------------------------------------------------------------
 
     @Override
-    public void open(Configuration parameters) throws Exception {}
+    public void open(OpenContext openContext) throws Exception {}
 
     @Override
     public void close() throws Exception {}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
index e99e68a078d..42ef2a8af40 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
@@ -41,7 +41,7 @@ import org.apache.flink.annotation.Public;
  *
  *     private Map<Long, String> map;
  *
- *     public void open(Configuration cfg) throws Exception {
+ *     public void open(OpenContext ctx) throws Exception {
  *         getRuntimeContext().getBroadcastVariableWithInitializer("mapvar",
  *             new BroadcastVariableInitializer<Tuple2<Long, String>, 
Map<Long, String>>() {
  *
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
index 4ff5484b3b0..ff07bfc1816 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.api.common.functions;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Public;
 
 /**
  * The {@link OpenContext} interface provides necessary information required 
by the {@link
  * RichFunction} when it is opened. The {@link OpenContext} is currently empty 
because it can be
  * used to add more methods without affecting the signature of {@code 
RichFunction#open}.
  */
-@PublicEvolving
+@Public
 public interface OpenContext {}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
index ae83fb30f2b..f1aa70a97d9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.common.functions;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
 
 /**
  * An base interface for all rich user-defined functions. This class defines 
methods for the life
@@ -30,51 +29,6 @@ import org.apache.flink.configuration.Configuration;
 @Public
 public interface RichFunction extends Function {
 
-    /**
-     * Initialization method for the function. It is called before the actual 
working methods (like
-     * <i>map</i> or <i>join</i>) and thus suitable for one time setup work. 
For functions that are
-     * part of an iteration, this method will be invoked at the beginning of 
each iteration
-     * superstep.
-     *
-     * <p>The configuration object passed to the function can be used for 
configuration and
-     * initialization. The configuration contains all parameters that were 
configured on the
-     * function in the program composition.
-     *
-     * <pre>{@code
-     * public class MyFilter extends RichFilterFunction<String> {
-     *
-     *     private String searchString;
-     *
-     *     public void open(Configuration parameters) {
-     *         this.searchString = parameters.getString("foo");
-     *     }
-     *
-     *     public boolean filter(String value) {
-     *         return value.equals(searchString);
-     *     }
-     * }
-     * }</pre>
-     *
-     * <p>By default, this method does nothing.
-     *
-     * @param parameters The configuration containing the parameters attached 
to the contract.
-     * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime.
-     *     When the runtime catches an exception, it aborts the task and lets 
the fail-over logic
-     *     decide whether to retry the task execution.
-     * @see org.apache.flink.configuration.Configuration
-     * @deprecated This method is deprecated since Flink 1.19. The users are 
recommended to
-     *     implement {@code open(OpenContext openContext)} and implement 
{@code open(Configuration
-     *     parameters)} with an empty body instead. 1. If you implement {@code 
open(OpenContext
-     *     openContext)}, the {@code open(OpenContext openContext)} will be 
invoked and the {@code
-     *     open(Configuration parameters)} won't be invoked. 2. If you don't 
implement {@code
-     *     open(OpenContext openContext)}, the {@code open(Configuration 
parameters)} will be
-     *     invoked in the default implementation of the {@code 
open(OpenContext openContext)}.
-     * @see <a 
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231";>
-     *     FLIP-344: Remove parameter in RichFunction#open </a>
-     */
-    @Deprecated
-    void open(Configuration parameters) throws Exception;
-
     /**
      * Initialization method for the function. It is called before the actual 
working methods (like
      * <i>map</i> or <i>join</i>) and thus suitable for one time setup work. 
For functions that are
@@ -100,14 +54,6 @@ public interface RichFunction extends Function {
      * }
      * }</pre>
      *
-     * <p>By default, this method does nothing.
-     *
-     * <p>1. If you implement {@code open(OpenContext openContext)}, the 
{@code open(OpenContext
-     * openContext)} will be invoked and the {@code open(Configuration 
parameters)} won't be
-     * invoked. 2. If you don't implement {@code open(OpenContext 
openContext)}, the {@code
-     * open(Configuration parameters)} will be invoked in the default 
implementation of the {@code
-     * open(OpenContext openContext)}.
-     *
      * @param openContext The context containing information about the context 
in which the function
      *     is opened.
      * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime.
@@ -115,9 +61,7 @@ public interface RichFunction extends Function {
      *     decide whether to retry the task execution.
      */
     @PublicEvolving
-    default void open(OpenContext openContext) throws Exception {
-        open(new Configuration());
-    }
+    void open(OpenContext openContext) throws Exception;
 
     /**
      * Tear-down method for the user code. It is called after the last call to 
the main working
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index eb94f65a2f8..b36c0adaa2a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -232,7 +232,7 @@ public interface RuntimeContext {
      *
      *     private ValueState<Long> state;
      *
-     *     public void open(Configuration cfg) {
+     *     public void open(OpenContext ctx) {
      *         state = getRuntimeContext().getState(
      *                 new ValueStateDescriptor<Long>("count", 
LongSerializer.INSTANCE, 0L));
      *     }
@@ -269,7 +269,7 @@ public interface RuntimeContext {
      *
      *     private ListState<MyType> state;
      *
-     *     public void open(Configuration cfg) {
+     *     public void open(OpenContext ctx) {
      *         state = getRuntimeContext().getListState(
      *                 new ListStateDescriptor<>("myState", MyType.class));
      *     }
@@ -310,7 +310,7 @@ public interface RuntimeContext {
      *
      *     private ReducingState<Long> state;
      *
-     *     public void open(Configuration cfg) {
+     *     public void open(OpenContext ctx) {
      *         state = getRuntimeContext().getReducingState(
      *                 new ReducingStateDescriptor<>("sum", (a, b) -> a + b, 
Long.class));
      *     }
@@ -348,7 +348,7 @@ public interface RuntimeContext {
      *
      *     private AggregatingState<MyType, Long> state;
      *
-     *     public void open(Configuration cfg) {
+     *     public void open(OpenContext ctx) {
      *         state = getRuntimeContext().getAggregatingState(
      *                 new AggregatingStateDescriptor<>("sum", 
aggregateFunction, Long.class));
      *     }
@@ -388,7 +388,7 @@ public interface RuntimeContext {
      *
      *     private MapState<MyType, Long> state;
      *
-     *     public void open(Configuration cfg) {
+     *     public void open(OpenContext ctx) {
      *         state = getRuntimeContext().getMapState(
      *                 new MapStateDescriptor<>("sum", MyType.class, 
Long.class));
      *     }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/WithConfigurationOpenContext.java
similarity index 67%
copy from 
flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
copy to 
flink-core/src/main/java/org/apache/flink/api/common/functions/WithConfigurationOpenContext.java
index 4ff5484b3b0..7ce3cbcaa30 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/WithConfigurationOpenContext.java
@@ -19,11 +19,18 @@
 package org.apache.flink.api.common.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
 
-/**
- * The {@link OpenContext} interface provides necessary information required 
by the {@link
- * RichFunction} when it is opened. The {@link OpenContext} is currently empty 
because it can be
- * used to add more methods without affecting the signature of {@code 
RichFunction#open}.
- */
+/** A special {@link OpenContext} for passing configuration to udf. */
 @PublicEvolving
-public interface OpenContext {}
+public class WithConfigurationOpenContext implements OpenContext {
+    private final Configuration configuration;
+
+    public WithConfigurationOpenContext(Configuration configuration) {
+        this.configuration = configuration;
+    }
+
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
index 18e2d667031..8ce0d9c2c64 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
@@ -46,7 +46,7 @@ public interface KeyedStateStore {
      *
      *     private ValueState<Long> state;
      *
-     *     public void open(Configuration cfg) {
+     *     public void open(OpenContext ctx) {
      *         state = getRuntimeContext().getState(
      *                 new ValueStateDescriptor<Long>("count", 
LongSerializer.INSTANCE, 0L));
      *     }
@@ -83,7 +83,7 @@ public interface KeyedStateStore {
      *
      *     private ListState<MyType> state;
      *
-     *     public void open(Configuration cfg) {
+     *     public void open(OpenContext ctx) {
      *         state = getRuntimeContext().getListState(
      *                 new ListStateDescriptor<>("myState", MyType.class));
      *     }
@@ -124,7 +124,7 @@ public interface KeyedStateStore {
      *
      *     private ReducingState<Long> state;
      *
-     *     public void open(Configuration cfg) {
+     *     public void open(OpenContext ctx) {
      *         state = getRuntimeContext().getReducingState(
      *                 new ReducingStateDescriptor<>("sum", (a, b) -> a + b, 
Long.class));
      *     }
@@ -162,7 +162,7 @@ public interface KeyedStateStore {
      *
      *     private AggregatingState<MyType, Long> state;
      *
-     *     public void open(Configuration cfg) {
+     *     public void open(OpenContext ctx) {
      *         state = getRuntimeContext().getAggregatingState(
      *                 new AggregatingStateDescriptor<>("sum", 
aggregateFunction, Long.class));
      *     }
@@ -202,7 +202,7 @@ public interface KeyedStateStore {
      *
      *     private MapState<MyType, Long> state;
      *
-     *     public void open(Configuration cfg) {
+     *     public void open(OpenContext ctx) {
      *         state = getRuntimeContext().getMapState(
      *                 new MapStateDescriptor<>("sum", MyType.class, 
Long.class));
      *     }
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
index 4aaf582ecc5..e5f609500d2 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -69,12 +68,6 @@ public abstract class RichIterativeCondition<T> extends 
IterativeCondition<T>
     @Override
     public void open(OpenContext openContext) throws Exception {}
 
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        throw new UnsupportedOperationException(
-                "This method is deprecated and shouldn't be invoked. Please 
use open(OpenContext) instead.");
-    }
-
     @Override
     public void close() throws Exception {}
 }
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java
index 5bbf8c981fa..652d9253d45 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java
@@ -21,7 +21,6 @@ package org.apache.flink.state.api.functions;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.OpenContext;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
 import java.util.Set;
@@ -32,7 +31,7 @@ import java.util.Set;
  * <p>For every key {@link #readKey(Object, Context, Collector)} is invoked. 
This can produce zero
  * or more elements as output.
  *
- * <p><b>NOTE:</b> State descriptors must be eagerly registered in {@code 
open(Configuration)}. Any
+ * <p><b>NOTE:</b> State descriptors must be eagerly registered in {@code 
open(OpenContext)}. Any
  * attempt to dynamically register states inside of {@code readKey} will 
result in a {@code
  * RuntimeException}.
  *
@@ -51,15 +50,6 @@ public abstract class KeyedStateReaderFunction<K, OUT> 
extends AbstractRichFunct
 
     private static final long serialVersionUID = 3873843034140417407L;
 
-    /**
-     * Initialization method for the function. It is called before {@link 
#readKey(Object, Context,
-     * Collector)} and thus suitable for one time setup work.
-     *
-     * <p>This is the only method that my register state descriptors within a 
{@code
-     * KeyedStateReaderFunction}.
-     */
-    public abstract void open(Configuration parameters) throws Exception;
-
     /**
      * Process one key from the restored state backend.
      *
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
index 17845da02ef..ce4094a4d0b 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
@@ -110,12 +109,6 @@ public class SavepointDeepCopyTest extends 
AbstractTestBaseJUnit4 {
             state = getRuntimeContext().getState(stateDescriptor);
         }
 
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            throw new UnsupportedOperationException(
-                    "This method is deprecated and shouldn't be invoked. 
Please use open(OpenContext) instead.");
-        }
-
         @Override
         public void readKey(String key, Context ctx, Collector<Tuple2<String, 
String>> out)
                 throws Exception {
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java
index 75f9f6f3dee..de3bdcd5f3a 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java
@@ -112,12 +112,6 @@ public abstract class SavepointReaderKeyedStateITCase<B 
extends StateBackend>
             state = getRuntimeContext().getState(valueState);
         }
 
-        @Override
-        public void open(Configuration parameters) {
-            throw new UnsupportedOperationException(
-                    "This method is deprecated and shouldn't be invoked. 
Please use open(OpenContext) instead.");
-        }
-
         @Override
         public void readKey(Integer key, Context ctx, Collector<Pojo> out) 
throws Exception {
             Pojo pojo = new Pojo();
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
index c287cdb5598..9b9f845bba1 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
@@ -262,12 +262,6 @@ public class KeyedStateInputFormatTest {
             state = getRuntimeContext().getState(stateDescriptor);
         }
 
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            throw new UnsupportedOperationException(
-                    "This method is deprecated and shouldn't be invoked. 
Please use open(OpenContext) instead.");
-        }
-
         @Override
         public void readKey(
                 Integer key, KeyedStateReaderFunction.Context ctx, 
Collector<Integer> out)
@@ -284,12 +278,6 @@ public class KeyedStateInputFormatTest {
             state = getRuntimeContext().getState(stateDescriptor);
         }
 
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            throw new UnsupportedOperationException(
-                    "This method is deprecated and shouldn't be invoked. 
Please use open(OpenContext) instead.");
-        }
-
         @Override
         public void readKey(
                 Integer key, KeyedStateReaderFunction.Context ctx, 
Collector<Integer> out)
@@ -306,12 +294,6 @@ public class KeyedStateInputFormatTest {
             getRuntimeContext().getState(stateDescriptor);
         }
 
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            throw new UnsupportedOperationException(
-                    "This method is deprecated and shouldn't be invoked. 
Please use open(OpenContext) instead.");
-        }
-
         @Override
         public void readKey(
                 Integer key, KeyedStateReaderFunction.Context ctx, 
Collector<Integer> out)
@@ -360,12 +342,6 @@ public class KeyedStateInputFormatTest {
             state = getRuntimeContext().getState(stateDescriptor);
         }
 
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            throw new UnsupportedOperationException(
-                    "This method is deprecated and shouldn't be invoked. 
Please use open(OpenContext) instead.");
-        }
-
         @Override
         public void readKey(
                 Integer key, KeyedStateReaderFunction.Context ctx, 
Collector<Integer> out)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index 658e026f630..9388d88cb15 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -1496,7 +1496,7 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable
             FunctionUtils.openFunction(stub, DefaultOpenContext.INSTANCE);
         } catch (Throwable t) {
             throw new Exception(
-                    "The user defined 'open(Configuration)' method in "
+                    "The user defined 'open(OpenContext)' method in "
                             + stub.getClass().toString()
                             + " caused an exception: "
                             + t.getMessage(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
index af3a2c7a9ff..f46855cc559 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
@@ -131,7 +131,7 @@ import 
org.apache.flink.runtime.state.FunctionSnapshotContext;
  *
  *     private ValueState<Long> count;
  *
- *     public void open(Configuration cfg) throws Exception {
+ *     public void open(OpenContext ctx) throws Exception {
  *         count = getRuntimeContext().getState(new 
ValueStateDescriptor<>("myCount", Long.class));
  *     }
  *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
index c0894e52414..f7c8b2c55e9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.functions.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
 import org.apache.flink.api.common.io.OutputFormat;
@@ -57,26 +58,10 @@ public class OutputFormatSinkFunction<IN> extends 
RichSinkFunction<IN>
         this.format = format;
     }
 
-    /**
-     * Initialization method for the {@link OutputFormatSinkFunction}.
-     *
-     * @param parameters The configuration containing the parameters attached 
to the contract.
-     * @throws Exception if an error happens.
-     * @deprecated This method is deprecated since Flink 1.19. The users are 
recommended to
-     *     implement {@code open(OpenContext openContext)} and override {@code 
open(Configuration
-     *     parameters)} with an empty body instead. 1. If you implement {@code 
open(OpenContext
-     *     openContext)}, the {@code open(OpenContext openContext)} will be 
invoked and the {@code
-     *     open(Configuration parameters)} won't be invoked. 2. If you don't 
implement {@code
-     *     open(OpenContext openContext)}, the {@code open(Configuration 
parameters)} will be
-     *     invoked in the default implementation of the {@code 
open(OpenContext openContext)}.
-     * @see <a 
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231";>
-     *     FLIP-344: Remove parameter in RichFunction#open </a>
-     */
-    @Deprecated
     @Override
-    public void open(Configuration parameters) throws Exception {
+    public void open(OpenContext openContext) throws Exception {
         RuntimeContext context = getRuntimeContext();
-        format.configure(parameters);
+        format.configure(new Configuration());
         int indexInSubtaskGroup = 
context.getTaskInfo().getIndexOfThisSubtask();
         int currentNumberOfSubtasks = 
context.getTaskInfo().getNumberOfParallelSubtasks();
         format.open(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
index e9c7b546e91..5ece13b2a54 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.api.functions.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 /**
@@ -69,25 +69,9 @@ public class PrintSinkFunction<IN> extends 
RichSinkFunction<IN>
         writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
     }
 
-    /**
-     * Initialization method for the {@link PrintSinkFunction}.
-     *
-     * @param parameters The configuration containing the parameters attached 
to the contract.
-     * @throws Exception if an error happens.
-     * @deprecated This method is deprecated since Flink 1.19. The users are 
recommended to
-     *     implement {@code open(OpenContext openContext)} and override {@code 
open(Configuration
-     *     parameters)} with an empty body instead. 1. If you implement {@code 
open(OpenContext
-     *     openContext)}, the {@code open(OpenContext openContext)} will be 
invoked and the {@code
-     *     open(Configuration parameters)} won't be invoked. 2. If you don't 
implement {@code
-     *     open(OpenContext openContext)}, the {@code open(Configuration 
parameters)} will be
-     *     invoked in the default implementation of the {@code 
open(OpenContext openContext)}.
-     * @see <a 
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231";>
-     *     FLIP-344: Remove parameter in RichFunction#open </a>
-     */
-    @Deprecated
     @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
+    public void open(OpenContext openContext) throws Exception {
+        super.open(openContext);
         StreamingRuntimeContext context = (StreamingRuntimeContext) 
getRuntimeContext();
         writer.open(
                 context.getTaskInfo().getIndexOfThisSubtask(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index 40162f3ca35..d6736526466 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -18,8 +18,8 @@
 package org.apache.flink.streaming.api.functions.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.SerializableObject;
 
 import org.slf4j.Logger;
@@ -129,25 +129,8 @@ public class SocketClientSink<IN> extends 
RichSinkFunction<IN> {
     // ------------------------------------------------------------------------
     //  Life cycle
     // ------------------------------------------------------------------------
-
-    /**
-     * Initialize the connection with the Socket in the server.
-     *
-     * @param parameters The configuration containing the parameters attached 
to the contract.
-     * @throws Exception if an error happens.
-     * @deprecated This method is deprecated since Flink 1.19. The users are 
recommended to
-     *     implement {@code open(OpenContext openContext)} and override {@code 
open(Configuration
-     *     parameters)} with an empty body instead. 1. If you implement {@code 
open(OpenContext
-     *     openContext)}, the {@code open(OpenContext openContext)} will be 
invoked and the {@code
-     *     open(Configuration parameters)} won't be invoked. 2. If you don't 
implement {@code
-     *     open(OpenContext openContext)}, the {@code open(Configuration 
parameters)} will be
-     *     invoked in the default implementation of the {@code 
open(OpenContext openContext)}.
-     * @see <a 
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231";>
-     *     FLIP-344: Remove parameter in RichFunction#open </a>
-     */
-    @Deprecated
     @Override
-    public void open(Configuration parameters) throws Exception {
+    public void open(OpenContext openContext) throws Exception {
         try {
             synchronized (lock) {
                 createConnection();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
index 78962212951..a44e60c7e09 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api.functions.source;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.util.SplittableIterator;
 
 import java.util.Iterator;
@@ -46,24 +46,8 @@ public class FromSplittableIteratorFunction<T> extends 
RichParallelSourceFunctio
         this.fullIterator = iterator;
     }
 
-    /**
-     * Initialization method for the {@link FromSplittableIteratorFunction}.
-     *
-     * @param parameters The configuration containing the parameters attached 
to the contract.
-     * @throws Exception if an error happens.
-     * @deprecated This method is deprecated since Flink 1.19. The users are 
recommended to
-     *     implement {@code open(OpenContext openContext)} and override {@code 
open(Configuration
-     *     parameters)} with an empty body instead. 1. If you implement {@code 
open(OpenContext
-     *     openContext)}, the {@code open(OpenContext openContext)} will be 
invoked and the {@code
-     *     open(Configuration parameters)} won't be invoked. 2. If you don't 
implement {@code
-     *     open(OpenContext openContext)}, the {@code open(Configuration 
parameters)} will be
-     *     invoked in the default implementation of the {@code 
open(OpenContext openContext)}.
-     * @see <a 
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231";>
-     *     FLIP-344: Remove parameter in RichFunction#open </a>
-     */
-    @Deprecated
     @Override
-    public void open(Configuration parameters) throws Exception {
+    public void open(OpenContext openContext) throws Exception {
         int numberOfSubTasks = 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
         int indexofThisSubTask = 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
         iterator = fullIterator.split(numberOfSubTasks)[indexofThisSubTask];
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index e91f7e9fc03..bf76308424d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -114,7 +114,7 @@ class AbstractUdfStreamOperatorLifecycleTest {
 
     private static final String ALL_METHODS_RICH_FUNCTION =
             "[close[], getIterationRuntimeContext[], getRuntimeContext[]"
-                    + ", open[class 
org.apache.flink.configuration.Configuration], open[interface 
org.apache.flink.api.common.functions.OpenContext], setRuntimeContext[interface 
"
+                    + ", open[interface 
org.apache.flink.api.common.functions.OpenContext], setRuntimeContext[interface 
"
                     + "org.apache.flink.api.common.functions.RuntimeContext]]";
 
     private static final List<String> ACTUAL_ORDER_TRACKING =
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
index aa18c5608ef..5facea46ab4 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
@@ -20,7 +20,9 @@ package org.apache.flink.table.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.WithConfigurationOpenContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -62,10 +64,16 @@ public class FunctionContext {
     public FunctionContext(
             @Nullable RuntimeContext context,
             @Nullable ClassLoader userClassLoader,
-            @Nullable Configuration jobParameters) {
+            @Nullable OpenContext openContext) {
         this.context = context;
         this.userClassLoader = userClassLoader;
-        this.jobParameters = jobParameters != null ? jobParameters.toMap() : 
null;
+        if (openContext instanceof WithConfigurationOpenContext) {
+            Configuration configuration =
+                    ((WithConfigurationOpenContext) 
openContext).getConfiguration();
+            this.jobParameters = configuration.toMap();
+        } else {
+            this.jobParameters = null;
+        }
     }
 
     public FunctionContext(RuntimeContext context) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala
index bd9e4d8f253..5fcede82e7d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.planner.codegen
 
+import org.apache.flink.api.common.functions.{DefaultOpenContext, OpenContext}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
 import org.apache.flink.table.planner.codegen.Indenter.toISC
@@ -74,7 +75,7 @@ object CollectorCodeGenerator {
         }
 
         @Override
-        public void open(${className[Configuration]} parameters) throws 
Exception {
+        public void open(${className[OpenContext]} openContext) throws 
Exception {
           ${ctx.reuseOpenCode()}
         }
 
@@ -142,7 +143,7 @@ object CollectorCodeGenerator {
         }
 
         @Override
-        public void open(${className[Configuration]} parameters) throws 
Exception {
+        public void open(${className[OpenContext]} openContext) throws 
Exception {
           ${ctx.reuseOpenCode()}
         }
 
@@ -180,7 +181,7 @@ object CollectorCodeGenerator {
       s"""
          |$collectorTerm = new ${generatedCollector.getClassName}();
          |$collectorTerm.setRuntimeContext(getRuntimeContext());
-         |$collectorTerm.open(new ${className[Configuration]}());
+         |$collectorTerm.open(new ${className[DefaultOpenContext]}());
          |""".stripMargin
     ctx.addReusableOpenStatement(openCollector)
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
index 63616c0a409..11a4ac1f15b 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.planner.codegen
 
-import org.apache.flink.api.common.functions.{MapFunction, OpenContext, 
RichMapFunction}
+import org.apache.flink.api.common.functions.{DefaultOpenContext, MapFunction, 
OpenContext, RichMapFunction, WithConfigurationOpenContext}
 import org.apache.flink.configuration.{Configuration, PipelineOptions, 
ReadableConfig}
 import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.data.{DecimalData, GenericRowData, TimestampData}
@@ -107,7 +107,7 @@ class ExpressionReducer(
       .getOrElse(new Configuration)
     val reduced =
       try {
-        richMapFunction.open(parameters)
+        richMapFunction.open(new WithConfigurationOpenContext(parameters))
         // execute
         richMapFunction.map(EMPTY_ROW)
       } catch {
@@ -315,7 +315,7 @@ class ConstantCodeGeneratorContext(tableConfig: 
ReadableConfig, classLoader: Cla
     super.addReusableFunction(
       function,
       classOf[FunctionContext],
-      Seq("null", "this.getClass().getClassLoader()", "parameters"))
+      Seq("null", "this.getClass().getClassLoader()", "openContext"))
   }
 
   override def addReusableConverter(dataType: DataType, classLoaderTerm: 
String = null): String = {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
index 76a5c4ca460..3b5943d6ab6 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
@@ -154,7 +154,7 @@ object FunctionCodeGenerator {
         ${ctx.reuseConstructorCode(funcName)}
 
         @Override
-        public void open(${classOf[Configuration].getCanonicalName} 
parameters) throws Exception {
+        public void open(${classOf[OpenContext].getCanonicalName} openContext) 
throws Exception {
           ${ctx.reuseOpenCode()}
         }
 
@@ -230,7 +230,7 @@ object FunctionCodeGenerator {
         ${ctx.reuseConstructorCode(funcName)}
 
         @Override
-        public void open(${className[Configuration]} parameters) throws 
Exception {
+        public void open(${className[OpenContext]} openContext) throws 
Exception {
           ${ctx.reuseOpenCode()}
         }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
index fdc7ac8036b..4256c90b596 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.planner.codegen
 
+import org.apache.flink.api.common.functions.DefaultOpenContext
 import org.apache.flink.configuration.{Configuration, ReadableConfig}
 import org.apache.flink.metrics.Gauge
 import org.apache.flink.table.data.{RowData, TimestampData}
@@ -157,7 +158,7 @@ object LongHashJoinGenerator {
     val condRefs = ctx.addReusableObject(condFunc.getReferences, "condRefs")
     ctx.addReusableInitStatement(s"condFunc = new 
${condFunc.getClassName}($condRefs);")
     
ctx.addReusableOpenStatement(s"condFunc.setRuntimeContext(getRuntimeContext());")
-    ctx.addReusableOpenStatement(s"condFunc.open(new 
${className[Configuration]}());")
+    ctx.addReusableOpenStatement(s"condFunc.open(new 
${className[DefaultOpenContext]}());")
     ctx.addReusableCloseStatement(s"condFunc.close();")
 
     val leftIsBuildTerm = newName(ctx, "leftIsBuild")
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
index 45a615707f8..5aa5b6c8d20 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.planner.codegen
 
-import org.apache.flink.api.common.functions.{FlatMapFunction, Function}
+import org.apache.flink.api.common.functions.{FlatMapFunction, Function, 
OpenContext}
 import org.apache.flink.configuration.{Configuration, ReadableConfig}
 import org.apache.flink.streaming.api.functions.async.AsyncFunction
 import org.apache.flink.table.api.ValidationException
@@ -381,7 +381,7 @@ object LookupJoinCodeGenerator {
         }
 
         @Override
-        public void open(${className[Configuration]} parameters) throws 
Exception {
+        public void open(${className[OpenContext]} openContext) throws 
Exception {
           ${ctx.reuseOpenCode()}
         }
 
@@ -492,7 +492,7 @@ object LookupJoinCodeGenerator {
         }
 
         @Override
-        public void open(${className[Configuration]} parameters) throws 
Exception {
+        public void open(${className[OpenContext]} openContext) throws 
Exception {
           ${ctx.reuseOpenCode()}
         }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
index e15c2c8a205..18f3337902f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier
+import org.apache.flink.api.common.functions.OpenContext
 import org.apache.flink.configuration.{Configuration, ReadableConfig}
 import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
@@ -87,7 +88,7 @@ object WatermarkGeneratorCodeGenerator {
         }
 
         @Override
-        public void open(${classOf[Configuration].getCanonicalName} 
parameters) throws Exception {
+        public void open(${classOf[OpenContext].getCanonicalName} openContext) 
throws Exception {
           ${ctx.reuseOpenCode()}
         }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala
index 5ad355e19ee..f04496d4d90 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.planner.codegen.calls
 
-import org.apache.flink.api.common.functions.{AbstractRichFunction, 
RichFunction}
+import org.apache.flink.api.common.functions.{AbstractRichFunction, 
OpenContext, RichFunction}
 import org.apache.flink.configuration.{Configuration, ReadableConfig}
 import org.apache.flink.table.api.{DataTypes, TableException}
 import org.apache.flink.table.api.Expressions.callSql
@@ -630,7 +630,7 @@ object BridgingFunctionGenUtil {
            |    ${ctx.reuseInitCode()}
            |  }
            |
-           |  public void open(${className[Configuration]} parameters) throws 
Exception {
+           |  public void open(${className[OpenContext]} openContext) throws 
Exception {
            |    ${ctx.reuseOpenCode()}
            |  }
            |
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index ffbdfec752c..3f49c9bf116 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.planner.expressions.utils
 
 import org.apache.flink.api.common.{TaskInfo, TaskInfoImpl}
-import org.apache.flink.api.common.functions.{MapFunction, RichFunction, 
RichMapFunction}
+import org.apache.flink.api.common.functions.{DefaultOpenContext, MapFunction, 
OpenContext, RichFunction, RichMapFunction}
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.configuration.Configuration
@@ -241,7 +241,7 @@ abstract class ExpressionTestBase(isStreaming: Boolean = 
true) {
         Collections.emptyMap(),
         null)
       richMapper.setRuntimeContext(t)
-      richMapper.open(new Configuration())
+      richMapper.open(DefaultOpenContext.INSTANCE)
     }
 
     val testRow = if (containsLegacyTypes) {
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
index 0b8f2a6fc35..2851a100cd3 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.table.runtime.operators.join;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -332,7 +332,7 @@ class LookupJoinHarnessTest {
         private static final long serialVersionUID = 1L;
 
         @Override
-        public void open(Configuration parameters) throws Exception {
+        public void open(OpenContext context) throws Exception {
             // do nothing
         }
 
diff --git a/pom.xml b/pom.xml
index 907b938e62d..596e9ca4946 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2339,6 +2339,19 @@ under the License.
                                                                <!-- Mark these 
2 methods to @Internal. Tracked under FLINK-34130, should be removed in 2.0 -->
                                                                
<exclude>org.apache.flink.configuration.Configuration#getBytes(java.lang.String,byte[])</exclude>
                                                                
<exclude>org.apache.flink.configuration.Configuration#setBytes(java.lang.String,byte[])</exclude>
+                                                               <!-- FLIP-344: 
Remove parameter in RichFunction#open in 2.0 -->
+                                                               
<exclude>org.apache.flink.api.common.functions.AbstractRichFunction#open(org.apache.flink.configuration.Configuration)</exclude>
+                                                               
<exclude>org.apache.flink.api.common.functions.RichCoGroupFunction</exclude>
+                                                               
<exclude>org.apache.flink.api.common.functions.RichCrossFunction</exclude>
+                                                               
<exclude>org.apache.flink.api.common.functions.RichFilterFunction</exclude>
+                                                               
<exclude>org.apache.flink.api.common.functions.RichFlatJoinFunction</exclude>
+                                                               
<exclude>org.apache.flink.api.common.functions.RichFlatMapFunction</exclude>
+                                                               
<exclude>org.apache.flink.api.common.functions.RichGroupCombineFunction</exclude>
+                                                               
<exclude>org.apache.flink.api.common.functions.RichGroupReduceFunction</exclude>
+                                                               
<exclude>org.apache.flink.api.common.functions.RichJoinFunction</exclude>
+                                                               
<exclude>org.apache.flink.api.common.functions.RichMapFunction</exclude>
+                                                               
<exclude>org.apache.flink.api.common.functions.RichMapPartitionFunction</exclude>
+                                                               
<exclude>org.apache.flink.api.common.functions.RichReduceFunction</exclude>
                                                                <!-- 
FLINK-34085 Deprecated string config should be removed in 2.0 -->
                                                                
<exclude>org.apache.flink.configuration.ConfigConstants</exclude>
                                                                <!-- 
FLINK-35886: WatermarksWithIdleness constructor was marked as deprecated -->

Reply via email to