xintongsong commented on code in PR #23058:
URL: https://github.com/apache/flink/pull/23058#discussion_r1307175479


##########
flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java:
##########
@@ -61,9 +61,30 @@ public interface RichFunction extends Function {
      *     When the runtime catches an exception, it aborts the task and lets 
the fail-over logic
      *     decide whether to retry the task execution.
      * @see org.apache.flink.configuration.Configuration
+     * @deprecated This method is deprecated since 1.18 and will be replaced 
by {@link

Review Comment:
   ```suggestion
        * @deprecated This method is deprecated since 1.19 and will be replaced 
by {@link
   ```



##########
flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java:
##########
@@ -61,9 +61,30 @@ public interface RichFunction extends Function {
      *     When the runtime catches an exception, it aborts the task and lets 
the fail-over logic
      *     decide whether to retry the task execution.
      * @see org.apache.flink.configuration.Configuration
+     * @deprecated This method is deprecated since 1.18 and will be replaced 
by {@link
+     *     #open(OpenContext)}.
      */
+    @Deprecated
     void open(Configuration parameters) throws Exception;

Review Comment:
   Uses cases that calls this method should be migrated.



##########
flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.Public;
+
+/** A OpenContext contains information about the context in which functions 
are opened. */
+@Public
+public class OpenContext {}

Review Comment:
   Let's make this `@PublicEvolving`, and explain why the empty context is 
needed.



##########
flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java:
##########
@@ -61,9 +61,30 @@ public interface RichFunction extends Function {
      *     When the runtime catches an exception, it aborts the task and lets 
the fail-over logic
      *     decide whether to retry the task execution.
      * @see org.apache.flink.configuration.Configuration
+     * @deprecated This method is deprecated since 1.18 and will be replaced 
by {@link
+     *     #open(OpenContext)}.
      */
+    @Deprecated
     void open(Configuration parameters) throws Exception;
 
+    /**
+     * Initialization method for the function. It is called before the actual 
working methods (like
+     * <i>map</i> or <i>join</i>) and thus suitable for one time setup work. 
For functions that are
+     * part of an iteration, this method will be invoked at the beginning of 
each iteration
+     * superstep.
+     *
+     * <p>By default, this method does nothing.
+     *
+     * @param openContext The context containing information about the context 
in which the function
+     *     is opened.
+     * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime.
+     *     When the runtime catches an exception, it aborts the task and lets 
the fail-over logic
+     *     decide whether to retry the task execution.
+     */
+    default void open(OpenContext openContext) throws Exception {
+        open(new Configuration());

Review Comment:
   I think we should explain that if not implemented, the default behavior will 
be calling the deprecated open method with an empty configuration.



##########
flink-core/src/main/java/org/apache/flink/api/common/functions/OpenContext.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.Public;
+
+/** A OpenContext contains information about the context in which functions 
are opened. */
+@Public
+public class OpenContext {}

Review Comment:
   And this should be an interface.



##########
flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java:
##########
@@ -61,9 +61,30 @@ public interface RichFunction extends Function {
      *     When the runtime catches an exception, it aborts the task and lets 
the fail-over logic
      *     decide whether to retry the task execution.
      * @see org.apache.flink.configuration.Configuration
+     * @deprecated This method is deprecated since 1.18 and will be replaced 
by {@link
+     *     #open(OpenContext)}.
      */
+    @Deprecated
     void open(Configuration parameters) throws Exception;
 
+    /**
+     * Initialization method for the function. It is called before the actual 
working methods (like
+     * <i>map</i> or <i>join</i>) and thus suitable for one time setup work. 
For functions that are
+     * part of an iteration, this method will be invoked at the beginning of 
each iteration
+     * superstep.
+     *
+     * <p>By default, this method does nothing.
+     *
+     * @param openContext The context containing information about the context 
in which the function
+     *     is opened.
+     * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime.
+     *     When the runtime catches an exception, it aborts the task and lets 
the fail-over logic
+     *     decide whether to retry the task execution.
+     */
+    default void open(OpenContext openContext) throws Exception {
+        open(new Configuration());

Review Comment:
   Should explain with comment why the context is needed.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java:
##########
@@ -111,7 +111,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 
     private static final String ALL_METHODS_RICH_FUNCTION =
             "[close[], getIterationRuntimeContext[], getRuntimeContext[]"
-                    + ", open[class 
org.apache.flink.configuration.Configuration], setRuntimeContext[interface "
+                    + ", open[class 
org.apache.flink.api.common.functions.OpenContext], open[class 
org.apache.flink.configuration.Configuration], setRuntimeContext[interface "

Review Comment:
   I think we should also verify that the new method is called at right places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to