This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 61ee6f8f45 [flink] Avoid deprecated usages about Configuration (#4584)
61ee6f8f45 is described below

commit 61ee6f8f4526603deae5be44fffb8a0168823565
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Tue Nov 26 14:27:22 2024 +0800

    [flink] Avoid deprecated usages about Configuration (#4584)
---
 .../org/apache/paimon/benchmark/QueryRunner.java   |  2 +-
 .../api/common/serialization/SerializerConfig.java | 22 +++++++
 .../common/serialization/SerializerConfigImpl.java | 22 +++++++
 .../api/common/serialization/SerializerConfig.java | 22 +++++++
 .../common/serialization/SerializerConfigImpl.java | 22 +++++++
 .../api/common/serialization/SerializerConfig.java | 22 +++++++
 .../common/serialization/SerializerConfigImpl.java | 22 +++++++
 .../api/common/serialization/SerializerConfig.java | 22 +++++++
 .../common/serialization/SerializerConfigImpl.java | 22 +++++++
 .../KafkaDebeziumAvroDeserializationSchema.java    |  2 +-
 .../cdc/mongodb/strategy/MongoVersionStrategy.java |  8 +--
 .../flink/action/cdc/mysql/MySqlRecordParser.java  | 15 +++--
 .../PulsarDebeziumAvroDeserializationSchema.java   |  2 +-
 .../action/cdc/mongodb/MongodbSchemaITCase.java    | 67 ++++++++++------------
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 10 +++-
 .../flink/sink/cdc/CdcRecordSerializeITCase.java   | 28 +++++++--
 .../compact/changelog/ChangelogTaskTypeInfo.java   | 13 ++++-
 .../paimon/flink/sink/CommittableTypeInfo.java     | 12 +++-
 .../paimon/flink/sink/CompactionTaskTypeInfo.java  | 12 +++-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  7 +--
 .../flink/sink/MultiTableCommittableTypeInfo.java  | 12 +++-
 .../sink/MultiTableCompactionTaskTypeInfo.java     | 13 ++++-
 .../paimon/flink/source/FlinkSourceBuilder.java    | 16 ++----
 .../align/AlignedContinuousFileStoreSource.java    |  2 +-
 .../paimon/flink/utils/InternalTypeInfo.java       | 14 ++++-
 .../apache/paimon/flink/utils/JavaTypeInfo.java    | 16 +++++-
 .../paimon/flink/FileSystemCatalogITCase.java      |  3 +-
 .../paimon/flink/FlinkJobRecoveryITCase.java       |  9 ++-
 .../apache/paimon/flink/RescaleBucketITCase.java   |  4 +-
 .../flink/UnawareBucketAppendOnlyTableITCase.java  | 14 ++++-
 .../paimon/flink/sink/SinkSavepointITCase.java     |  2 +-
 .../apache/paimon/flink/util/AbstractTestBase.java | 14 +++--
 .../paimon/flink/util/ReadWriteTableTestUtil.java  | 26 ++++++---
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  |  6 +-
 34 files changed, 396 insertions(+), 109 deletions(-)

diff --git 
a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java
 
b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java
index b07cdef846..8bfe4b6c9c 100644
--- 
a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java
+++ 
b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java
@@ -77,7 +77,7 @@ public class QueryRunner {
 
             String sinkPathConfig =
                     BenchmarkGlobalConfiguration.loadConfiguration()
-                            .getString(BenchmarkOptions.SINK_PATH);
+                            .get(BenchmarkOptions.SINK_PATH);
             if (sinkPathConfig == null) {
                 throw new IllegalArgumentException(
                         BenchmarkOptions.SINK_PATH.key() + " must be set");
diff --git 
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
new file mode 100644
index 0000000000..16987469a9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public interface SerializerConfig {}
diff --git 
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
new file mode 100644
index 0000000000..374d33f650
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public class SerializerConfigImpl implements SerializerConfig {}
diff --git 
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
 
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
new file mode 100644
index 0000000000..16987469a9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public interface SerializerConfig {}
diff --git 
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
 
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
new file mode 100644
index 0000000000..374d33f650
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public class SerializerConfigImpl implements SerializerConfig {}
diff --git 
a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
 
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
new file mode 100644
index 0000000000..16987469a9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public interface SerializerConfig {}
diff --git 
a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
 
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
new file mode 100644
index 0000000000..374d33f650
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public class SerializerConfigImpl implements SerializerConfig {}
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
new file mode 100644
index 0000000000..16987469a9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public interface SerializerConfig {}
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
new file mode 100644
index 0000000000..374d33f650
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public class SerializerConfigImpl implements SerializerConfig {}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
index fc672b9dc0..eea364d460 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
@@ -48,7 +48,7 @@ public class KafkaDebeziumAvroDeserializationSchema
 
     public KafkaDebeziumAvroDeserializationSchema(Configuration 
cdcSourceConfig) {
         this.topic = KafkaActionUtils.findOneTopic(cdcSourceConfig);
-        this.schemaRegistryUrl = 
cdcSourceConfig.getString(SCHEMA_REGISTRY_URL);
+        this.schemaRegistryUrl = cdcSourceConfig.get(SCHEMA_REGISTRY_URL);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
index 64f1275711..df288a4150 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -83,7 +83,7 @@ public interface MongoVersionStrategy {
             Configuration mongodbConfig)
             throws JsonProcessingException {
         SchemaAcquisitionMode mode =
-                
SchemaAcquisitionMode.valueOf(mongodbConfig.getString(START_MODE).toUpperCase());
+                
SchemaAcquisitionMode.valueOf(mongodbConfig.get(START_MODE).toUpperCase());
         ObjectNode objectNode =
                 JsonSerdeUtil.asSpecificNodeType(jsonNode.asText(), 
ObjectNode.class);
         JsonNode idNode = objectNode.get(ID_FIELD);
@@ -92,7 +92,7 @@ public interface MongoVersionStrategy {
                     "The provided MongoDB JSON document does not contain an 
_id field.");
         }
         JsonNode document =
-                mongodbConfig.getBoolean(DEFAULT_ID_GENERATION)
+                mongodbConfig.get(DEFAULT_ID_GENERATION)
                         ? objectNode.set(
                                 ID_FIELD,
                                 idNode.get(OID_FIELD) == null ? idNode : 
idNode.get(OID_FIELD))
@@ -101,8 +101,8 @@ public interface MongoVersionStrategy {
             case SPECIFIED:
                 return parseFieldsFromJsonRecord(
                         document.toString(),
-                        mongodbConfig.getString(PARSER_PATH),
-                        mongodbConfig.getString(FIELD_NAME),
+                        mongodbConfig.get(PARSER_PATH),
+                        mongodbConfig.get(FIELD_NAME),
                         computedColumns,
                         rowTypeBuilder);
             case DYNAMIC:
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 502e6237a4..26579e718f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -45,6 +45,8 @@ import io.debezium.relational.history.TableChanges;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
 import org.apache.flink.cdc.debezium.table.DebeziumOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
@@ -99,11 +101,14 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
                 .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
         String stringifyServerTimeZone = 
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
 
-        this.isDebeziumSchemaCommentsEnabled =
-                mySqlConfig.getBoolean(
-                        DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
-                                + 
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
-                        false);
+        ConfigOption<Boolean> includeSchemaCommentsConfig =
+                ConfigOptions.key(
+                                DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
+                                        + 
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS
+                                                .name())
+                        .booleanType()
+                        .defaultValue(false);
+        this.isDebeziumSchemaCommentsEnabled = 
mySqlConfig.get(includeSchemaCommentsConfig);
         this.serverTimeZone =
                 stringifyServerTimeZone == null
                         ? ZoneId.systemDefault()
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarDebeziumAvroDeserializationSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarDebeziumAvroDeserializationSchema.java
index b0d1d1bf62..f45ee034be 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarDebeziumAvroDeserializationSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarDebeziumAvroDeserializationSchema.java
@@ -46,7 +46,7 @@ public class PulsarDebeziumAvroDeserializationSchema
 
     public PulsarDebeziumAvroDeserializationSchema(Configuration 
cdcSourceConfig) {
         this.topic = PulsarActionUtils.findOneTopic(cdcSourceConfig);
-        this.schemaRegistryUrl = 
cdcSourceConfig.getString(SCHEMA_REGISTRY_URL);
+        this.schemaRegistryUrl = cdcSourceConfig.get(SCHEMA_REGISTRY_URL);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
index 394cdd1f14..f0328b5663 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
@@ -76,13 +76,12 @@ public class MongodbSchemaITCase extends 
MongoDBActionITCaseBase {
     @Test
     public void testCreateSchemaFromValidConfig() {
         Configuration mongodbConfig = new Configuration();
-        mongodbConfig.setString(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
-        mongodbConfig.setString(MongoDBSourceOptions.USERNAME, 
MongoDBContainer.PAIMON_USER);
-        mongodbConfig.setString(
-                MongoDBSourceOptions.PASSWORD, 
MongoDBContainer.PAIMON_USER_PASSWORD);
-        mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
-        mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
-        mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, 
"testCollection");
+        mongodbConfig.set(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
+        mongodbConfig.set(MongoDBSourceOptions.USERNAME, 
MongoDBContainer.PAIMON_USER);
+        mongodbConfig.set(MongoDBSourceOptions.PASSWORD, 
MongoDBContainer.PAIMON_USER_PASSWORD);
+        mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
+        mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase");
+        mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection");
         Schema schema = MongodbSchemaUtils.getMongodbSchema(mongodbConfig);
         assertNotNull(schema);
     }
@@ -90,13 +89,12 @@ public class MongodbSchemaITCase extends 
MongoDBActionITCaseBase {
     @Test
     public void testCreateSchemaFromInvalidHost() {
         Configuration mongodbConfig = new Configuration();
-        mongodbConfig.setString(MongoDBSourceOptions.HOSTS, "127.0.0.1:12345");
-        mongodbConfig.setString(MongoDBSourceOptions.USERNAME, 
MongoDBContainer.PAIMON_USER);
-        mongodbConfig.setString(
-                MongoDBSourceOptions.PASSWORD, 
MongoDBContainer.PAIMON_USER_PASSWORD);
-        mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
-        mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
-        mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, 
"testCollection");
+        mongodbConfig.set(MongoDBSourceOptions.HOSTS, "127.0.0.1:12345");
+        mongodbConfig.set(MongoDBSourceOptions.USERNAME, 
MongoDBContainer.PAIMON_USER);
+        mongodbConfig.set(MongoDBSourceOptions.PASSWORD, 
MongoDBContainer.PAIMON_USER_PASSWORD);
+        mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
+        mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase");
+        mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection");
 
         assertThrows(
                 RuntimeException.class, () -> 
MongodbSchemaUtils.getMongodbSchema(mongodbConfig));
@@ -106,7 +104,7 @@ public class MongodbSchemaITCase extends 
MongoDBActionITCaseBase {
     public void testCreateSchemaFromIncompleteConfig() {
         // Create a Configuration object with missing necessary settings
         Configuration mongodbConfig = new Configuration();
-        mongodbConfig.setString(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
+        mongodbConfig.set(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
         // Expect an exception to be thrown due to missing necessary settings
         assertThrows(
                 NullPointerException.class,
@@ -117,13 +115,12 @@ public class MongodbSchemaITCase extends 
MongoDBActionITCaseBase {
     public void testCreateSchemaFromDynamicConfig() {
         // Create a Configuration object with the necessary settings
         Configuration mongodbConfig = new Configuration();
-        mongodbConfig.setString(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
-        mongodbConfig.setString(MongoDBSourceOptions.USERNAME, 
MongoDBContainer.PAIMON_USER);
-        mongodbConfig.setString(
-                MongoDBSourceOptions.PASSWORD, 
MongoDBContainer.PAIMON_USER_PASSWORD);
-        mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
-        mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
-        mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, 
"testCollection");
+        mongodbConfig.set(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
+        mongodbConfig.set(MongoDBSourceOptions.USERNAME, 
MongoDBContainer.PAIMON_USER);
+        mongodbConfig.set(MongoDBSourceOptions.PASSWORD, 
MongoDBContainer.PAIMON_USER_PASSWORD);
+        mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
+        mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase");
+        mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection");
 
         // Call the method and check the results
         Schema schema = MongodbSchemaUtils.getMongodbSchema(mongodbConfig);
@@ -142,13 +139,12 @@ public class MongodbSchemaITCase extends 
MongoDBActionITCaseBase {
     @Test
     public void testCreateSchemaFromInvalidDatabase() {
         Configuration mongodbConfig = new Configuration();
-        mongodbConfig.setString(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
-        mongodbConfig.setString(MongoDBSourceOptions.USERNAME, 
MongoDBContainer.PAIMON_USER);
-        mongodbConfig.setString(
-                MongoDBSourceOptions.PASSWORD, 
MongoDBContainer.PAIMON_USER_PASSWORD);
-        mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
-        mongodbConfig.setString(MongoDBSourceOptions.DATABASE, 
"invalidDatabase");
-        mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, 
"testCollection");
+        mongodbConfig.set(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
+        mongodbConfig.set(MongoDBSourceOptions.USERNAME, 
MongoDBContainer.PAIMON_USER);
+        mongodbConfig.set(MongoDBSourceOptions.PASSWORD, 
MongoDBContainer.PAIMON_USER_PASSWORD);
+        mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
+        mongodbConfig.set(MongoDBSourceOptions.DATABASE, "invalidDatabase");
+        mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection");
 
         assertThrows(
                 RuntimeException.class, () -> 
MongodbSchemaUtils.getMongodbSchema(mongodbConfig));
@@ -157,13 +153,12 @@ public class MongodbSchemaITCase extends 
MongoDBActionITCaseBase {
     @Test
     public void testCreateSchemaFromInvalidCollection() {
         Configuration mongodbConfig = new Configuration();
-        mongodbConfig.setString(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
-        mongodbConfig.setString(MongoDBSourceOptions.USERNAME, 
MongoDBContainer.PAIMON_USER);
-        mongodbConfig.setString(
-                MongoDBSourceOptions.PASSWORD, 
MongoDBContainer.PAIMON_USER_PASSWORD);
-        mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
-        mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
-        mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, 
"invalidCollection");
+        mongodbConfig.set(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
+        mongodbConfig.set(MongoDBSourceOptions.USERNAME, 
MongoDBContainer.PAIMON_USER);
+        mongodbConfig.set(MongoDBSourceOptions.PASSWORD, 
MongoDBContainer.PAIMON_USER_PASSWORD);
+        mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
+        mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase");
+        mongodbConfig.set(MongoDBSourceOptions.COLLECTION, 
"invalidCollection");
 
         assertThrows(
                 RuntimeException.class, () -> 
MongodbSchemaUtils.getMongodbSchema(mongodbConfig));
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index bdeab07a74..febbe4e1de 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -31,7 +31,8 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommonTestUtils;
 import org.apache.paimon.utils.JsonSerdeUtil;
 
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.jupiter.api.BeforeAll;
@@ -1285,8 +1286,11 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         mySqlConfig.put("database-name", "default_checkpoint");
         mySqlConfig.put("table-name", "t");
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(RestartStrategies.noRestart());
+        // Using `none` to avoid compatibility issues with Flink 1.18-.
+        Configuration configuration = new Configuration();
+        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
         MySqlSyncTableAction action = 
syncTableActionBuilder(mySqlConfig).build();
         action.withStreamExecutionEnvironment(env);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
index 698900436e..b202ca53c9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
@@ -25,6 +25,8 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -35,6 +37,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -49,7 +53,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 public class CdcRecordSerializeITCase {
 
     @Test
-    public void testCdcRecordKryoSerialize() throws IOException {
+    public void testCdcRecordKryoSerialize() throws Exception {
         KryoSerializer<RichCdcMultiplexRecord> kr =
                 createFlinkKryoSerializer(RichCdcMultiplexRecord.class);
         RowType.Builder rowType = RowType.builder();
@@ -78,7 +82,7 @@ public class CdcRecordSerializeITCase {
     }
 
     @Test
-    public void testUnmodifiableListKryoSerialize() throws IOException {
+    public void testUnmodifiableListKryoSerialize() throws Exception {
         KryoSerializer<List> kryoSerializer = 
createFlinkKryoSerializer(List.class);
         RowType.Builder rowType = RowType.builder();
         rowType.field("id", new BigIntType());
@@ -101,8 +105,24 @@ public class CdcRecordSerializeITCase {
         assertThat(deserializeRecord).isEqualTo(fields);
     }
 
-    public static <T> KryoSerializer<T> createFlinkKryoSerializer(Class<T> 
type) {
-        return new KryoSerializer<>(type, new ExecutionConfig());
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public static <T> KryoSerializer<T> createFlinkKryoSerializer(Class<T> 
type)
+            throws NoSuchMethodException, InvocationTargetException, 
InstantiationException,
+                    IllegalAccessException {
+        try {
+            Constructor<KryoSerializer> constructor =
+                    KryoSerializer.class.getConstructor(Class.class, 
SerializerConfig.class);
+            return (KryoSerializer<T>) constructor.newInstance(type, new 
SerializerConfigImpl());
+        } catch (NoSuchMethodException
+                | InvocationTargetException
+                | IllegalAccessException
+                | InstantiationException e) {
+            // to stay compatible with Flink 1.18-
+        }
+
+        Constructor<KryoSerializer> constructor =
+                KryoSerializer.class.getConstructor(Class.class, 
ExecutionConfig.class);
+        return (KryoSerializer<T>) constructor.newInstance(type, new 
ExecutionConfig());
     }
 
     private static final class TestOutputView extends DataOutputStream 
implements DataOutputView {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java
index 5cae899a07..a529e6764f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.compact.changelog;
 import 
org.apache.paimon.flink.sink.NoneCopyVersionedSerializerTypeSerializerProxy;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
@@ -56,7 +57,17 @@ public class ChangelogTaskTypeInfo extends 
TypeInformation<ChangelogCompactTask>
         return false;
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public TypeSerializer<ChangelogCompactTask> createSerializer(
+            SerializerConfig serializerConfig) {
+        return this.createSerializer((ExecutionConfig) null);
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public TypeSerializer<ChangelogCompactTask> 
createSerializer(ExecutionConfig config) {
         // we don't need copy for task
         return new 
NoneCopyVersionedSerializerTypeSerializerProxy<ChangelogCompactTask>(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
index dcb87238b8..92e826a913 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink;
 import org.apache.paimon.table.sink.CommitMessageSerializer;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
@@ -57,7 +58,16 @@ public class CommittableTypeInfo extends 
TypeInformation<Committable> {
         return false;
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public TypeSerializer<Committable> createSerializer(SerializerConfig 
config) {
+        return this.createSerializer((ExecutionConfig) null);
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public TypeSerializer<Committable> createSerializer(ExecutionConfig 
config) {
         // no copy, so that data from writer is directly going into committer 
while chaining
         return new NoneCopyVersionedSerializerTypeSerializerProxy<Committable>(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
index 47defa61a9..6510a85b80 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
@@ -22,6 +22,7 @@ import org.apache.paimon.append.UnawareAppendCompactionTask;
 import org.apache.paimon.table.sink.CompactionTaskSerializer;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
@@ -58,7 +59,16 @@ public class CompactionTaskTypeInfo extends 
TypeInformation<UnawareAppendCompact
         return false;
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public TypeSerializer<UnawareAppendCompactionTask> 
createSerializer(SerializerConfig config) {
+        return this.createSerializer((ExecutionConfig) null);
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public TypeSerializer<UnawareAppendCompactionTask> 
createSerializer(ExecutionConfig config) {
         // we don't need copy for task
         return new 
NoneCopyVersionedSerializerTypeSerializerProxy<UnawareAppendCompactionTask>(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 59f2f4b103..dd364c196d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -338,13 +337,11 @@ public abstract class FlinkSink<T> implements 
Serializable {
         checkArgument(
                 !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
                 "Paimon sink currently does not support unaligned checkpoints. 
Please set "
-                        + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key()
-                        + " to false.");
+                        + "execution.checkpointing.unaligned.enabled to 
false.");
         checkArgument(
                 env.getCheckpointConfig().getCheckpointingMode() == 
CheckpointingMode.EXACTLY_ONCE,
                 "Paimon sink currently only supports EXACTLY_ONCE checkpoint 
mode. Please set "
-                        + 
ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()
-                        + " to exactly-once");
+                        + "execution.checkpointing.mode to exactly-once");
     }
 
     public static void assertBatchAdaptiveParallelism(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
index f82f082098..7da0ae0e20 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink;
 import org.apache.paimon.table.sink.CommitMessageSerializer;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
@@ -57,7 +58,16 @@ public class MultiTableCommittableTypeInfo extends 
TypeInformation<MultiTableCom
         return false;
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public TypeSerializer<MultiTableCommittable> 
createSerializer(SerializerConfig config) {
+        return this.createSerializer((ExecutionConfig) null);
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public TypeSerializer<MultiTableCommittable> 
createSerializer(ExecutionConfig config) {
         // no copy, so that data from writer is directly going into committer 
while chaining
         return new 
NoneCopyVersionedSerializerTypeSerializerProxy<MultiTableCommittable>(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java
index f27f29f87f..0116ff1988 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java
@@ -23,6 +23,7 @@ import org.apache.paimon.flink.VersionedSerializerWrapper;
 import org.apache.paimon.table.sink.MultiTableCompactionTaskSerializer;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
@@ -60,7 +61,17 @@ public class MultiTableCompactionTaskTypeInfo
         return false;
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public TypeSerializer<MultiTableUnawareAppendCompactionTask> 
createSerializer(
+            SerializerConfig serializerConfig) {
+        return this.createSerializer((ExecutionConfig) null);
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public TypeSerializer<MultiTableUnawareAppendCompactionTask> 
createSerializer(
             ExecutionConfig executionConfig) {
         return new SimpleVersionedSerializerTypeSerializerProxy<
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index b3dcd4840c..e864ec0500 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -46,7 +46,6 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
@@ -331,30 +330,25 @@ public class FlinkSourceBuilder {
         checkArgument(
                 checkpointConfig.isCheckpointingEnabled(),
                 "The align mode of paimon source is only supported when 
checkpoint enabled. Please set "
-                        + 
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key()
-                        + "larger than 0");
+                        + "execution.checkpointing.interval larger than 0");
         checkArgument(
                 checkpointConfig.getMaxConcurrentCheckpoints() == 1,
                 "The align mode of paimon source supports at most one ongoing 
checkpoint at the same time. Please set "
-                        + 
ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.key()
-                        + " to 1");
+                        + "execution.checkpointing.max-concurrent-checkpoints 
to 1");
         checkArgument(
                 checkpointConfig.getCheckpointTimeout()
                         > 
conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT)
                                 .toMillis(),
                 "The align mode of paimon source requires that the timeout of 
checkpoint is greater than the timeout of the source's snapshot alignment. 
Please increase "
-                        + 
ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()
-                        + " or decrease "
+                        + "execution.checkpointing.timeout or decrease "
                         + 
FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT.key());
         checkArgument(
                 !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
                 "The align mode of paimon source currently does not support 
unaligned checkpoints. Please set "
-                        + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key()
-                        + " to false.");
+                        + "execution.checkpointing.unaligned.enabled to 
false.");
         checkArgument(
                 env.getCheckpointConfig().getCheckpointingMode() == 
CheckpointingMode.EXACTLY_ONCE,
                 "The align mode of paimon source currently only supports 
EXACTLY_ONCE checkpoint mode. Please set "
-                        + 
ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()
-                        + " to exactly-once");
+                        + "execution.checkpointing.mode to exactly-once");
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index d6b7060763..705e1d9a7a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -73,7 +73,7 @@ public class AlignedContinuousFileStoreSource extends 
ContinuousFileStoreSource
                 limit,
                 new FutureCompletingBlockingQueue<>(
                         context.getConfiguration()
-                                
.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)));
+                                
.get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
index 4ea5db9f34..60898421dd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.types.RowType;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
@@ -73,8 +74,17 @@ public class InternalTypeInfo<T> extends TypeInformation<T> {
         return false;
     }
 
-    @Override
-    public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public TypeSerializer<T> createSerializer(SerializerConfig config) {
+        return this.createSerializer((ExecutionConfig) null);
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
+    public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) 
{
         return serializer.duplicate();
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java
index a36243c5bd..4aea809b51 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.utils;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -78,7 +79,16 @@ public class JavaTypeInfo<T extends Serializable> extends 
TypeInformation<T>
         return Comparable.class.isAssignableFrom(typeClass);
     }
 
-    @Override
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public TypeSerializer<T> createSerializer(SerializerConfig config) {
+        return this.createSerializer((ExecutionConfig) null);
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
     public TypeSerializer<T> createSerializer(ExecutionConfig config) {
         return new JavaSerializer<>(this.typeClass);
     }
@@ -91,7 +101,9 @@ public class JavaTypeInfo<T extends Serializable> extends 
TypeInformation<T>
             @SuppressWarnings("rawtypes")
             GenericTypeComparator comparator =
                     new GenericTypeComparator(
-                            sortOrderAscending, 
createSerializer(executionConfig), this.typeClass);
+                            sortOrderAscending,
+                            new JavaSerializer<>(this.typeClass),
+                            this.typeClass);
             return (TypeComparator<T>) comparator;
         }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index 239043ff79..915c93680a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -27,7 +27,6 @@ import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.utils.BlockingIterator;
 
-import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
@@ -60,7 +59,7 @@ public class FileSystemCatalogITCase extends AbstractTestBase 
{
                 tableEnvironmentBuilder()
                         .streamingMode()
                         .parallelism(1)
-                        
.setConf(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false)
+                        
.setString("execution.checkpointing.unaligned.enabled", "false")
                         .build();
         path = getTempDirPath();
         tEnv.executeSql(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
index c46c4c3589..8df379a71b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
@@ -65,7 +65,7 @@ public class FlinkJobRecoveryITCase extends CatalogITCaseBase 
{
                 .set(
                         CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
                         ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
-                .removeConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL);
+                .removeKey("execution.checkpointing.interval");
 
         // insert source data
         batchSql("INSERT INTO source_table1 VALUES (1, 'test-1', '20241030')");
@@ -219,10 +219,9 @@ public class FlinkJobRecoveryITCase extends 
CatalogITCaseBase {
             batchSql(sql);
         }
 
-        Configuration config =
-                sEnv.getConfig()
-                        .getConfiguration()
-                        .set(StateRecoveryOptions.SAVEPOINT_PATH, 
checkpointPath);
+        Configuration config = sEnv.getConfig().getConfiguration();
+        // use config string to stay compatible with flink 1.19-
+        config.setString("execution.state-recovery.path", checkpointPath);
         for (Map.Entry<String, String> entry : recoverOptions.entrySet()) {
             config.setString(entry.getKey(), entry.getValue());
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
index 08969bddfd..d5747d2e28 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
@@ -26,7 +26,6 @@ import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.SavepointFormatType;
-import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
 
@@ -106,9 +105,10 @@ public class RescaleBucketITCase extends CatalogITCaseBase 
{
         assertThat(batchSql("SELECT * FROM 
T3")).containsExactlyInAnyOrderElementsOf(committedData);
 
         // step5: resume streaming job
+        // use config string to stay compatible with flink 1.19-
         sEnv.getConfig()
                 .getConfiguration()
-                .set(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
+                .setString("execution.state-recovery.path", savepointPath);
         JobClient resumedJobClient =
                 startJobAndCommitSnapshot(streamSql, 
snapshotAfterRescale.id());
         // stop job
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index f6dfb1b230..6ca78b088f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -28,6 +28,7 @@ import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.utils.FailingFileIO;
+import org.apache.paimon.utils.TimeUtils;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -50,7 +51,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
-import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -203,7 +203,11 @@ public class UnawareBucketAppendOnlyTableITCase extends 
CatalogITCaseBase {
         batchSql("ALTER TABLE append_table SET 
('compaction.early-max.file-num' = '4')");
         batchSql("ALTER TABLE append_table SET 
('continuous.discovery-interval' = '1 s')");
 
-        sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(500));
+        sEnv.getConfig()
+                .getConfiguration()
+                .setString(
+                        "execution.checkpointing.interval",
+                        
TimeUtils.formatWithHighestUnit(Duration.ofMillis(500)));
         sEnv.executeSql(
                 "CREATE TEMPORARY TABLE Orders_in (\n"
                         + "    f0        INT,\n"
@@ -224,7 +228,11 @@ public class UnawareBucketAppendOnlyTableITCase extends 
CatalogITCaseBase {
         batchSql("ALTER TABLE append_table SET 
('compaction.early-max.file-num' = '4')");
         batchSql("ALTER TABLE append_table SET 
('continuous.discovery-interval' = '1 s')");
 
-        sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(500));
+        sEnv.getConfig()
+                .getConfiguration()
+                .setString(
+                        "execution.checkpointing.interval",
+                        
TimeUtils.formatWithHighestUnit(Duration.ofMillis(500)));
         sEnv.executeSql(
                 "CREATE TEMPORARY TABLE Orders_in (\n"
                         + "    f0        INT,\n"
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
index 6b912d2e57..b1486deacb 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
@@ -137,7 +137,7 @@ public class SinkSavepointITCase extends AbstractTestBase {
                         .parallelism(1)
                         .allowRestart()
                         .setConf(conf)
-                        .setConf(StateBackendOptions.STATE_BACKEND, 
"filesystem")
+                        .setConf(StateBackendOptions.STATE_BACKEND, "hashmap")
                         .setConf(
                                 CheckpointingOptions.CHECKPOINTS_DIRECTORY,
                                 "file://" + path + "/checkpoint")
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
index ce0017eb18..ee838ed682 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.util;
 
 import org.apache.paimon.utils.FileIOUtils;
+import org.apache.paimon.utils.TimeUtils;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.dag.Transformation;
@@ -29,7 +30,6 @@ import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.CheckpointingMode;
-import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
@@ -164,6 +164,11 @@ public class AbstractTestBase {
             return this;
         }
 
+        public TableEnvironmentBuilder setString(String key, String value) {
+            conf.setString(key, value);
+            return this;
+        }
+
         public TableEnvironmentBuilder setConf(Configuration conf) {
             this.conf.addAll(conf);
             return this;
@@ -182,9 +187,10 @@ public class AbstractTestBase {
                 if (checkpointIntervalMs != null) {
                     tEnv.getConfig()
                             .getConfiguration()
-                            .set(
-                                    
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
-                                    Duration.ofMillis(checkpointIntervalMs));
+                            .setString(
+                                    "execution.checkpointing.interval",
+                                    TimeUtils.formatWithHighestUnit(
+                                            
Duration.ofMillis(checkpointIntervalMs)));
                 }
             } else {
                 tEnv =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index 86b0014eb3..9c3170f9a9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -23,8 +23,9 @@ import org.apache.paimon.flink.ReadWriteTableITCase;
 import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
@@ -75,12 +76,11 @@ public class ReadWriteTableTestUtil {
     }
 
     public static void init(String warehouse, int parallelism) {
-        StreamExecutionEnvironment sExeEnv = buildStreamEnv(parallelism);
-        sExeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+        // Using `none` to avoid compatibility issues with Flink 1.18-.
+        StreamExecutionEnvironment sExeEnv = buildStreamEnv(parallelism, 
"none");
         sEnv = StreamTableEnvironment.create(sExeEnv);
 
-        bExeEnv = buildBatchEnv(parallelism);
-        bExeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+        bExeEnv = buildBatchEnv(parallelism, "none");
         bEnv = StreamTableEnvironment.create(bExeEnv, 
EnvironmentSettings.inBatchMode());
 
         ReadWriteTableTestUtil.warehouse = warehouse;
@@ -95,16 +95,24 @@ public class ReadWriteTableTestUtil {
         bEnv.useCatalog(catalog);
     }
 
-    public static StreamExecutionEnvironment buildStreamEnv(int parallelism) {
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    public static StreamExecutionEnvironment buildStreamEnv(
+            int parallelism, String restartStrategy) {
+        Configuration configuration = new Configuration();
+        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, 
restartStrategy);
+        final StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
         env.enableCheckpointing(100);
         env.setParallelism(parallelism);
         return env;
     }
 
-    public static StreamExecutionEnvironment buildBatchEnv(int parallelism) {
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    public static StreamExecutionEnvironment buildBatchEnv(
+            int parallelism, String restartStrategy) {
+        Configuration configuration = new Configuration();
+        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, 
restartStrategy);
+        final StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         env.setParallelism(parallelism);
         return env;
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 74d2d7e1c3..2266a8484d 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -31,12 +31,12 @@ import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.TimeUtils;
 
 import com.klarna.hiverunner.HiveShell;
 import com.klarna.hiverunner.annotations.HiveSQL;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
-import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
@@ -139,7 +139,9 @@ public abstract class HiveCatalogITCaseBase {
                         
EnvironmentSettings.newInstance().inStreamingMode().build());
         sEnv.getConfig()
                 .getConfiguration()
-                .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofSeconds(1));
+                .setString(
+                        "execution.checkpointing.interval",
+                        
TimeUtils.formatWithHighestUnit(Duration.ofSeconds(1)));
         
sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
 
         tEnv.executeSql(

Reply via email to