This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 76a4134838 [flink] Adopt open(OpenContext) in RichFunction (#4581)
76a4134838 is described below

commit 76a4134838058559fbba2f6a022d7aa3efcd45b0
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Nov 25 20:10:38 2024 +0800

    [flink] Adopt open(OpenContext) in RichFunction (#4581)
---
 .../flink/api/common/functions/OpenContext.java    | 29 ++++++++++++++++++++++
 .../flink/api/common/functions/OpenContext.java    | 29 ++++++++++++++++++++++
 .../flink/api/common/functions/OpenContext.java    | 29 ++++++++++++++++++++++
 .../flink/api/common/functions/OpenContext.java    | 29 ++++++++++++++++++++++
 .../paimon/flink/kafka/KafkaSinkFunction.java      | 12 ++++++++-
 .../cdc/CdcDynamicTableParsingProcessFunction.java | 12 ++++++++-
 .../cdc/CdcMultiTableParsingProcessFunction.java   | 12 ++++++++-
 .../flink/sink/cdc/CdcParsingProcessFunction.java  | 12 ++++++++-
 .../cdc/UpdatedDataFieldsProcessFunctionBase.java  | 12 ++++++++-
 .../paimon/flink/service/QueryAddressRegister.java | 12 ++++++++-
 .../paimon/flink/service/QueryFileMonitor.java     | 12 ++++++++-
 .../apache/paimon/flink/shuffle/RangeShuffle.java  | 15 +++++++++--
 .../flink/sink/RowDataStoreWriteOperator.java      | 28 ++++++++++++++++-----
 .../org/apache/paimon/flink/sorter/SortUtils.java  | 28 ++++++++++++++++++---
 .../flink/source/BucketUnawareCompactSource.java   | 12 ++++++++-
 .../operator/CombinedAwareBatchSourceFunction.java | 12 ++++++++-
 .../CombinedAwareStreamingSourceFunction.java      | 12 ++++++++-
 .../operator/CombinedCompactorSourceFunction.java  | 12 ++++++++-
 .../CombinedUnawareBatchSourceFunction.java        | 12 ++++++++-
 .../CombinedUnawareStreamingSourceFunction.java    | 12 ++++++++-
 20 files changed, 319 insertions(+), 24 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
new file mode 100644
index 0000000000..4ff5484b3b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The {@link OpenContext} interface provides necessary information required 
by the {@link
+ * RichFunction} when it is opened. The {@link OpenContext} is currently empty 
because it can be
+ * used to add more methods without affecting the signature of {@code 
RichFunction#open}.
+ */
+@PublicEvolving
+public interface OpenContext {}
diff --git 
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
 
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
new file mode 100644
index 0000000000..4ff5484b3b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The {@link OpenContext} interface provides necessary information required 
by the {@link
+ * RichFunction} when it is opened. The {@link OpenContext} is currently empty 
because it can be
+ * used to add more methods without affecting the signature of {@code 
RichFunction#open}.
+ */
+@PublicEvolving
+public interface OpenContext {}
diff --git 
a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
 
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
new file mode 100644
index 0000000000..4ff5484b3b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The {@link OpenContext} interface provides necessary information required 
by the {@link
+ * RichFunction} when it is opened. The {@link OpenContext} is currently empty 
because it can be
+ * used to add more methods without affecting the signature of {@code 
RichFunction#open}.
+ */
+@PublicEvolving
+public interface OpenContext {}
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
new file mode 100644
index 0000000000..4ff5484b3b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The {@link OpenContext} interface provides necessary information required 
by the {@link
+ * RichFunction} when it is opened. The {@link OpenContext} is currently empty 
because it can be
+ * used to add more methods without affecting the signature of {@code 
RichFunction#open}.
+ */
+@PublicEvolving
+public interface OpenContext {}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
index 72a177adce..41e7141cf4 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.kafka;
 import org.apache.paimon.flink.sink.LogSinkFunction;
 import org.apache.paimon.table.sink.SinkRecord;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
@@ -65,7 +66,16 @@ public class KafkaSinkFunction extends 
FlinkKafkaProducer<SinkRecord> implements
         this.writeCallback = writeCallback;
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public void open(Configuration configuration) throws Exception {
         super.open(configuration);
         Callback baseCallback = requireNonNull(callback);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 0961ff1600..886e33e204 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.types.DataField;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -74,7 +75,16 @@ public class CdcDynamicTableParsingProcessFunction<T> 
extends ProcessFunction<T,
         this.parserFactory = parserFactory;
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public void open(Configuration parameters) throws Exception {
         parser = parserFactory.create();
         catalog = catalogLoader.load();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
index b18a05c280..4c5e0600bb 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.types.DataField;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ListTypeInfo;
 import org.apache.flink.configuration.Configuration;
@@ -51,7 +52,16 @@ public class CdcMultiTableParsingProcessFunction<T> extends 
ProcessFunction<T, V
         this.parserFactory = parserFactory;
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public void open(Configuration parameters) throws Exception {
         parser = parserFactory.create();
         updatedDataFieldsOutputTags = new HashMap<>();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
index 3456634942..eec228f3c0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.types.DataField;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.java.typeutils.ListTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -50,7 +51,16 @@ public class CdcParsingProcessFunction<T> extends 
ProcessFunction<T, CdcRecord>
         this.parserFactory = parserFactory;
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public void open(Configuration parameters) throws Exception {
         parser = parserFactory.create();
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index c2e928bd4a..4f02b784c2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -31,6 +31,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.slf4j.Logger;
@@ -73,7 +74,16 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
         this.catalogLoader = catalogLoader;
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public void open(Configuration parameters) {
         this.catalog = catalogLoader.load();
         this.allowUpperCase = this.catalog.allowUpperCase();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
index df3cf7abf2..524f2e5f01 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
@@ -23,6 +23,7 @@ import org.apache.paimon.service.ServiceManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -44,7 +45,16 @@ public class QueryAddressRegister extends 
RichSinkFunction<InternalRow> {
         this.serviceManager = ((FileStoreTable) 
table).store().newServiceManager();
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public void open(Configuration parameters) throws Exception {
         this.executors = new TreeMap<>();
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
index 43cf654e91..02f8a65411 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
@@ -31,6 +31,7 @@ import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.system.FileMonitorTable;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -70,7 +71,16 @@ public class QueryFileMonitor extends 
RichSourceFunction<InternalRow> {
                         .toMillis();
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public void open(Configuration parameters) throws Exception {
         FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable) 
table);
         ReadBuilder readBuilder = monitorTable.newReadBuilder();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
index 5410413043..8760f1dc5f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
@@ -27,6 +27,7 @@ import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SerializableSupplier;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -182,9 +183,19 @@ public class RangeShuffle {
             this.isSortBySize = isSortBySize;
         }
 
-        @Override
+        /**
+         * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink
+         * 1.18-.
+         */
+        public void open(OpenContext openContext) throws Exception {
+            open(new Configuration());
+        }
+
+        /**
+         * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink
+         * 2.0+.
+         */
         public void open(Configuration parameters) throws Exception {
-            super.open(parameters);
             InternalRowToSizeVisitor internalRowToSizeVisitor = new 
InternalRowToSizeVisitor();
             fieldSizeCalculator =
                     rowType.getFieldTypes().stream()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index 07fe275543..2b25f07466 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -23,6 +23,8 @@ import org.apache.paimon.flink.log.LogWriteCallback;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.SinkRecord;
 
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -42,6 +44,8 @@ import 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Objects;
 
@@ -97,17 +101,29 @@ public class RowDataStoreWriteOperator extends 
TableWriteOperator<InternalRow> {
 
         this.sinkContext = new SimpleContext(getProcessingTimeService());
         if (logSinkFunction != null) {
-            // to stay compatible with Flink 1.18-
-            if (logSinkFunction instanceof RichFunction) {
-                RichFunction richFunction = (RichFunction) logSinkFunction;
-                richFunction.open(new Configuration());
-            }
-
+            openFunction(logSinkFunction);
             logCallback = new LogWriteCallback();
             logSinkFunction.setWriteCallback(logCallback);
         }
     }
 
+    private static void openFunction(Function function) throws Exception {
+        if (function instanceof RichFunction) {
+            RichFunction richFunction = (RichFunction) function;
+
+            try {
+                Method method = RichFunction.class.getDeclaredMethod("open", 
OpenContext.class);
+                method.invoke(richFunction, new OpenContext() {});
+                return;
+            } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+                // to stay compatible with Flink 1.18-
+            }
+
+            Method method = RichFunction.class.getDeclaredMethod("open", 
Configuration.class);
+            method.invoke(richFunction, new Configuration());
+        }
+    }
+
     @Override
     public void processWatermark(Watermark mark) throws Exception {
         super.processWatermark(mark);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index f590c2fb7f..b30e145512 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -31,6 +31,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.KeyProjectedRow;
 import org.apache.paimon.utils.SerializableSupplier;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -119,9 +120,19 @@ public class SortUtils {
                         .map(
                                 new RichMapFunction<RowData, Tuple2<KEY, 
RowData>>() {
 
-                                    @Override
+                                    /**
+                                     * Do not annotate with 
<code>@override</code> here to maintain
+                                     * compatibility with Flink 1.18-.
+                                     */
+                                    public void open(OpenContext openContext) 
throws Exception {
+                                        open(new Configuration());
+                                    }
+
+                                    /**
+                                     * Do not annotate with 
<code>@override</code> here to maintain
+                                     * compatibility with Flink 2.0+.
+                                     */
                                     public void open(Configuration parameters) 
throws Exception {
-                                        super.open(parameters);
                                         shuffleKeyAbstract.open();
                                     }
 
@@ -172,7 +183,18 @@ public class SortUtils {
 
                                 private transient KeyProjectedRow 
keyProjectedRow;
 
-                                @Override
+                                /**
+                                 * Do not annotate with <code>@override</code> 
here to maintain
+                                 * compatibility with Flink 1.18-.
+                                 */
+                                public void open(OpenContext openContext) {
+                                    open(new Configuration());
+                                }
+
+                                /**
+                                 * Do not annotate with <code>@override</code> 
here to maintain
+                                 * compatibility with Flink 2.0+.
+                                 */
                                 public void open(Configuration parameters) {
                                     keyProjectedRow = new 
KeyProjectedRow(valueProjectionMap);
                                 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
index d306c7d8e1..e768c717dd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
@@ -26,6 +26,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.utils.Preconditions;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -72,7 +73,16 @@ public class BucketUnawareCompactSource extends 
RichSourceFunction<UnawareAppend
         this.filter = filter;
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public void open(Configuration parameters) throws Exception {
         compactionCoordinator =
                 new UnawareAppendTableCompactionCoordinator(table, streaming, 
filter);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
index cee6081aa2..2157be51ae 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
@@ -25,6 +25,7 @@ import org.apache.paimon.flink.utils.JavaTypeInfo;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
@@ -62,7 +63,16 @@ public class CombinedAwareBatchSourceFunction
         super(catalogLoader, includingPattern, excludingPattern, 
databasePattern, false);
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public void open(Configuration parameters) throws Exception {
         super.open(parameters);
         tableScan =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
index bff690ea30..01e0127e9f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
@@ -25,6 +25,7 @@ import org.apache.paimon.flink.utils.JavaTypeInfo;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
@@ -59,7 +60,16 @@ public class CombinedAwareStreamingSourceFunction
         this.monitorInterval = monitorInterval;
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public void open(Configuration parameters) throws Exception {
         super.open(parameters);
         tableScan =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java
index 1964927b5c..02bb878650 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java
@@ -22,6 +22,7 @@ import org.apache.paimon.append.UnawareAppendCompactionTask;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.table.source.Split;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 
@@ -70,7 +71,16 @@ public abstract class CombinedCompactorSourceFunction<T> 
extends RichSourceFunct
         this.isStreaming = isStreaming;
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public void open(Configuration parameters) throws Exception {
         isRunning = new AtomicBoolean(true);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
index 8ec8d5f2c1..6a40f10ada 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
@@ -29,6 +29,7 @@ import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -69,7 +70,16 @@ public class CombinedUnawareBatchSourceFunction
         super(catalogLoader, includingPattern, excludingPattern, 
databasePattern, false);
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public void open(Configuration parameters) throws Exception {
         super.open(parameters);
         tableScan =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
index e398e09a84..b64518a7ef 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
@@ -24,6 +24,7 @@ import org.apache.paimon.flink.compact.MultiTableScanBase;
 import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan;
 import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -55,7 +56,16 @@ public class CombinedUnawareStreamingSourceFunction
         this.monitorInterval = monitorInterval;
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public void open(Configuration parameters) throws Exception {
         super.open(parameters);
         tableScan =


Reply via email to