This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 76a4134838 [flink] Adopt open(OpenContext) in RichFunction (#4581)
76a4134838 is described below
commit 76a4134838058559fbba2f6a022d7aa3efcd45b0
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Nov 25 20:10:38 2024 +0800
[flink] Adopt open(OpenContext) in RichFunction (#4581)
---
.../flink/api/common/functions/OpenContext.java | 29 ++++++++++++++++++++++
.../flink/api/common/functions/OpenContext.java | 29 ++++++++++++++++++++++
.../flink/api/common/functions/OpenContext.java | 29 ++++++++++++++++++++++
.../flink/api/common/functions/OpenContext.java | 29 ++++++++++++++++++++++
.../paimon/flink/kafka/KafkaSinkFunction.java | 12 ++++++++-
.../cdc/CdcDynamicTableParsingProcessFunction.java | 12 ++++++++-
.../cdc/CdcMultiTableParsingProcessFunction.java | 12 ++++++++-
.../flink/sink/cdc/CdcParsingProcessFunction.java | 12 ++++++++-
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 12 ++++++++-
.../paimon/flink/service/QueryAddressRegister.java | 12 ++++++++-
.../paimon/flink/service/QueryFileMonitor.java | 12 ++++++++-
.../apache/paimon/flink/shuffle/RangeShuffle.java | 15 +++++++++--
.../flink/sink/RowDataStoreWriteOperator.java | 28 ++++++++++++++++-----
.../org/apache/paimon/flink/sorter/SortUtils.java | 28 ++++++++++++++++++---
.../flink/source/BucketUnawareCompactSource.java | 12 ++++++++-
.../operator/CombinedAwareBatchSourceFunction.java | 12 ++++++++-
.../CombinedAwareStreamingSourceFunction.java | 12 ++++++++-
.../operator/CombinedCompactorSourceFunction.java | 12 ++++++++-
.../CombinedUnawareBatchSourceFunction.java | 12 ++++++++-
.../CombinedUnawareStreamingSourceFunction.java | 12 ++++++++-
20 files changed, 319 insertions(+), 24 deletions(-)
diff --git
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
new file mode 100644
index 0000000000..4ff5484b3b
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The {@link OpenContext} interface provides necessary information required
by the {@link
+ * RichFunction} when it is opened. The {@link OpenContext} is currently empty
because it can be
+ * used to add more methods without affecting the signature of {@code
RichFunction#open}.
+ */
+@PublicEvolving
+public interface OpenContext {}
diff --git
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
new file mode 100644
index 0000000000..4ff5484b3b
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The {@link OpenContext} interface provides necessary information required
by the {@link
+ * RichFunction} when it is opened. The {@link OpenContext} is currently empty
because it can be
+ * used to add more methods without affecting the signature of {@code
RichFunction#open}.
+ */
+@PublicEvolving
+public interface OpenContext {}
diff --git
a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
new file mode 100644
index 0000000000..4ff5484b3b
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The {@link OpenContext} interface provides necessary information required
by the {@link
+ * RichFunction} when it is opened. The {@link OpenContext} is currently empty
because it can be
+ * used to add more methods without affecting the signature of {@code
RichFunction#open}.
+ */
+@PublicEvolving
+public interface OpenContext {}
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
new file mode 100644
index 0000000000..4ff5484b3b
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/functions/OpenContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The {@link OpenContext} interface provides necessary information required
by the {@link
+ * RichFunction} when it is opened. The {@link OpenContext} is currently empty
because it can be
+ * used to add more methods without affecting the signature of {@code
RichFunction#open}.
+ */
+@PublicEvolving
+public interface OpenContext {}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
index 72a177adce..41e7141cf4 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.kafka;
import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
@@ -65,7 +66,16 @@ public class KafkaSinkFunction extends
FlinkKafkaProducer<SinkRecord> implements
this.writeCallback = writeCallback;
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public void open(Configuration configuration) throws Exception {
super.open(configuration);
Callback baseCallback = requireNonNull(callback);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 0961ff1600..886e33e204 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.types.DataField;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -74,7 +75,16 @@ public class CdcDynamicTableParsingProcessFunction<T>
extends ProcessFunction<T,
this.parserFactory = parserFactory;
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
parser = parserFactory.create();
catalog = catalogLoader.load();
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
index b18a05c280..4c5e0600bb 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.types.DataField;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
@@ -51,7 +52,16 @@ public class CdcMultiTableParsingProcessFunction<T> extends
ProcessFunction<T, V
this.parserFactory = parserFactory;
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
parser = parserFactory.create();
updatedDataFieldsOutputTags = new HashMap<>();
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
index 3456634942..eec228f3c0 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.types.DataField;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -50,7 +51,16 @@ public class CdcParsingProcessFunction<T> extends
ProcessFunction<T, CdcRecord>
this.parserFactory = parserFactory;
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
parser = parserFactory.create();
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index c2e928bd4a..4f02b784c2 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -31,6 +31,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.slf4j.Logger;
@@ -73,7 +74,16 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
this.catalogLoader = catalogLoader;
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) {
this.catalog = catalogLoader.load();
this.allowUpperCase = this.catalog.allowUpperCase();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
index df3cf7abf2..524f2e5f01 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
@@ -23,6 +23,7 @@ import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -44,7 +45,16 @@ public class QueryAddressRegister extends
RichSinkFunction<InternalRow> {
this.serviceManager = ((FileStoreTable)
table).store().newServiceManager();
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
this.executors = new TreeMap<>();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
index 43cf654e91..02f8a65411 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
@@ -31,6 +31,7 @@ import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.system.FileMonitorTable;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -70,7 +71,16 @@ public class QueryFileMonitor extends
RichSourceFunction<InternalRow> {
.toMillis();
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable)
table);
ReadBuilder readBuilder = monitorTable.newReadBuilder();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
index 5410413043..8760f1dc5f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
@@ -27,6 +27,7 @@ import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -182,9 +183,19 @@ public class RangeShuffle {
this.isSortBySize = isSortBySize;
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink
+ * 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink
+ * 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
- super.open(parameters);
InternalRowToSizeVisitor internalRowToSizeVisitor = new
InternalRowToSizeVisitor();
fieldSizeCalculator =
rowType.getFieldTypes().stream()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index 07fe275543..2b25f07466 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -23,6 +23,8 @@ import org.apache.paimon.flink.log.LogWriteCallback;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.CheckpointListener;
@@ -42,6 +44,8 @@ import
org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.List;
import java.util.Objects;
@@ -97,17 +101,29 @@ public class RowDataStoreWriteOperator extends
TableWriteOperator<InternalRow> {
this.sinkContext = new SimpleContext(getProcessingTimeService());
if (logSinkFunction != null) {
- // to stay compatible with Flink 1.18-
- if (logSinkFunction instanceof RichFunction) {
- RichFunction richFunction = (RichFunction) logSinkFunction;
- richFunction.open(new Configuration());
- }
-
+ openFunction(logSinkFunction);
logCallback = new LogWriteCallback();
logSinkFunction.setWriteCallback(logCallback);
}
}
+ private static void openFunction(Function function) throws Exception {
+ if (function instanceof RichFunction) {
+ RichFunction richFunction = (RichFunction) function;
+
+ try {
+ Method method = RichFunction.class.getDeclaredMethod("open",
OpenContext.class);
+ method.invoke(richFunction, new OpenContext() {});
+ return;
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
+ // to stay compatible with Flink 1.18-
+ }
+
+ Method method = RichFunction.class.getDeclaredMethod("open",
Configuration.class);
+ method.invoke(richFunction, new Configuration());
+ }
+ }
+
@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index f590c2fb7f..b30e145512 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -31,6 +31,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyProjectedRow;
import org.apache.paimon.utils.SerializableSupplier;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -119,9 +120,19 @@ public class SortUtils {
.map(
new RichMapFunction<RowData, Tuple2<KEY,
RowData>>() {
- @Override
+ /**
+ * Do not annotate with
<code>@override</code> here to maintain
+ * compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext)
throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with
<code>@override</code> here to maintain
+ * compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters)
throws Exception {
- super.open(parameters);
shuffleKeyAbstract.open();
}
@@ -172,7 +183,18 @@ public class SortUtils {
private transient KeyProjectedRow
keyProjectedRow;
- @Override
+ /**
+ * Do not annotate with <code>@override</code>
here to maintain
+ * compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code>
here to maintain
+ * compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) {
keyProjectedRow = new
KeyProjectedRow(valueProjectionMap);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
index d306c7d8e1..e768c717dd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
@@ -26,6 +26,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.utils.Preconditions;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -72,7 +73,16 @@ public class BucketUnawareCompactSource extends
RichSourceFunction<UnawareAppend
this.filter = filter;
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
compactionCoordinator =
new UnawareAppendTableCompactionCoordinator(table, streaming,
filter);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
index cee6081aa2..2157be51ae 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
@@ -25,6 +25,7 @@ import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
@@ -62,7 +63,16 @@ public class CombinedAwareBatchSourceFunction
super(catalogLoader, includingPattern, excludingPattern,
databasePattern, false);
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
super.open(parameters);
tableScan =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
index bff690ea30..01e0127e9f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
@@ -25,6 +25,7 @@ import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
@@ -59,7 +60,16 @@ public class CombinedAwareStreamingSourceFunction
this.monitorInterval = monitorInterval;
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
super.open(parameters);
tableScan =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java
index 1964927b5c..02bb878650 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java
@@ -22,6 +22,7 @@ import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.table.source.Split;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -70,7 +71,16 @@ public abstract class CombinedCompactorSourceFunction<T>
extends RichSourceFunct
this.isStreaming = isStreaming;
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
isRunning = new AtomicBoolean(true);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
index 8ec8d5f2c1..6a40f10ada 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
@@ -29,6 +29,7 @@ import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -69,7 +70,16 @@ public class CombinedUnawareBatchSourceFunction
super(catalogLoader, includingPattern, excludingPattern,
databasePattern, false);
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
super.open(parameters);
tableScan =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
index e398e09a84..b64518a7ef 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
@@ -24,6 +24,7 @@ import org.apache.paimon.flink.compact.MultiTableScanBase;
import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan;
import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -55,7 +56,16 @@ public class CombinedUnawareStreamingSourceFunction
this.monitorInterval = monitorInterval;
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public void open(Configuration parameters) throws Exception {
super.open(parameters);
tableScan =