This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 96587abf7f Flink: Remove MiniClusterResource (#10817)
96587abf7f is described below
commit 96587abf7fbbeb9c728d60cbdc7bdd7e2096dad5
Author: Tom Tanaka <[email protected]>
AuthorDate: Wed Jul 31 00:18:39 2024 +0900
Flink: Remove MiniClusterResource (#10817)
---
.../apache/iceberg/flink/MiniClusterResource.java | 53 -----------------
.../java/org/apache/iceberg/flink/TestBase.java | 2 +-
.../apache/iceberg/flink/TestFlinkTableSink.java | 2 +-
.../org/apache/iceberg/flink/TestFlinkUpsert.java | 2 +-
.../apache/iceberg/flink/TestIcebergConnector.java | 2 +-
.../TestBucketPartitionerFlinkIcebergSink.java | 4 +-
.../iceberg/flink/sink/TestFlinkIcebergSink.java | 7 +--
.../flink/sink/TestFlinkIcebergSinkBranch.java | 4 +-
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 5 +-
.../flink/sink/TestFlinkIcebergSinkV2Branch.java | 4 +-
.../flink/source/ChangeLogTableTestBase.java | 4 +-
.../apache/iceberg/flink/source/TestFlinkScan.java | 2 +-
.../TestIcebergSourceWithWatermarkExtractor.java | 2 +-
.../apache/iceberg/flink/source/TestSqlBase.java | 2 +-
.../iceberg/flink/source/TestStreamScanSql.java | 4 +-
.../apache/iceberg/flink/MiniClusterResource.java | 68 ----------------------
.../java/org/apache/iceberg/flink/TestBase.java | 2 +-
.../apache/iceberg/flink/TestFlinkTableSink.java | 2 +-
.../org/apache/iceberg/flink/TestFlinkUpsert.java | 2 +-
.../apache/iceberg/flink/TestIcebergConnector.java | 2 +-
.../TestBucketPartitionerFlinkIcebergSink.java | 4 +-
.../iceberg/flink/sink/TestFlinkIcebergSink.java | 7 +--
.../flink/sink/TestFlinkIcebergSinkBranch.java | 4 +-
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 5 +-
.../flink/sink/TestFlinkIcebergSinkV2Branch.java | 4 +-
.../flink/source/ChangeLogTableTestBase.java | 4 +-
.../apache/iceberg/flink/source/TestFlinkScan.java | 2 +-
.../flink/source/TestIcebergSourceContinuous.java | 2 +-
.../TestIcebergSourceWithWatermarkExtractor.java | 2 +-
.../apache/iceberg/flink/source/TestSqlBase.java | 2 +-
.../iceberg/flink/source/TestStreamScanSql.java | 4 +-
.../apache/iceberg/flink/MiniClusterResource.java | 68 ----------------------
.../java/org/apache/iceberg/flink/TestBase.java | 2 +-
.../apache/iceberg/flink/TestFlinkTableSink.java | 2 +-
.../org/apache/iceberg/flink/TestFlinkUpsert.java | 2 +-
.../apache/iceberg/flink/TestIcebergConnector.java | 2 +-
.../maintenance/operator/OperatorTestBase.java | 4 +-
.../TestBucketPartitionerFlinkIcebergSink.java | 4 +-
.../iceberg/flink/sink/TestFlinkIcebergSink.java | 7 +--
.../flink/sink/TestFlinkIcebergSinkBranch.java | 4 +-
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 5 +-
.../flink/sink/TestFlinkIcebergSinkV2Branch.java | 4 +-
.../flink/source/ChangeLogTableTestBase.java | 4 +-
.../apache/iceberg/flink/source/TestFlinkScan.java | 2 +-
.../TestIcebergSourceWithWatermarkExtractor.java | 2 +-
.../TestIcebergSpeculativeExecutionSupport.java | 2 +-
.../apache/iceberg/flink/source/TestSqlBase.java | 2 +-
.../iceberg/flink/source/TestStreamScanSql.java | 4 +-
48 files changed, 70 insertions(+), 265 deletions(-)
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
deleted file mode 100644
index 45af9241b7..0000000000
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.flink;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
-
-public class MiniClusterResource {
-
- private static final int DEFAULT_TM_NUM = 1;
- private static final int DEFAULT_PARALLELISM = 4;
-
- public static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG =
- new Configuration()
- // disable classloader check as Avro may cache class/object in the
serializers.
- .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
-
- private MiniClusterResource() {}
-
- /**
- * It will start a mini cluster with
classloader.check-leaked-classloader=false, so that we won't
- * break the unit tests because of the class loader leak issue. In our
iceberg integration tests,
- * there're some that will assert the results after finished the flink jobs,
so actually we may
- * access the class loader that has been closed by the flink task managers
if we enable the switch
- * classloader.check-leaked-classloader by default.
- */
- public static MiniClusterWithClientResource
createWithClassloaderCheckDisabled() {
- return new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(DEFAULT_TM_NUM)
- .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
- .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
- .build());
- }
-}
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
index 6367a064f2..a74226092f 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
@@ -43,7 +43,7 @@ import org.junit.jupiter.api.io.TempDir;
public abstract class TestBase extends TestBaseUtils {
@RegisterExtension
- public static MiniClusterExtension miniClusterResource =
+ public static MiniClusterExtension miniClusterExtension =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
@TempDir protected Path temporaryDirectory;
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index b7fce104f4..b73300e3f1 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -91,7 +91,7 @@ public class TestFlinkTableSink extends CatalogTestBase {
settingsBuilder.inStreamingMode();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
env.setMaxParallelism(2);
env.setParallelism(2);
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index 5674c83e40..d52d54e159 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -75,7 +75,7 @@ public class TestFlinkUpsert extends CatalogTestBase {
settingsBuilder.inStreamingMode();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
env.setMaxParallelism(2);
env.setParallelism(2);
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
index 013b98e3b8..b709c0058f 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
@@ -176,7 +176,7 @@ public class TestIcebergConnector extends TestBase {
settingsBuilder.inStreamingMode();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+
MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
env.setMaxParallelism(2);
env.setParallelism(2);
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
index dc3eb93280..ba0ea867ff 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
@@ -18,7 +18,7 @@
*/
package org.apache.iceberg.flink.sink;
-import static
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static
org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
import static org.apache.iceberg.flink.TestFixtures.DATABASE;
import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER;
import static org.assertj.core.api.Assertions.assertThat;
@@ -63,7 +63,7 @@ public class TestBucketPartitionerFlinkIcebergSink {
private static final int SLOTS_PER_TASK_MANAGER = 8;
@RegisterExtension
- private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(NUMBER_TASK_MANAGERS)
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 8cad35c859..61ab087f2c 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -44,7 +44,6 @@ import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.HadoopCatalogExtension;
-import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
@@ -62,7 +61,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
@RegisterExtension
- public static MiniClusterExtension miniClusterResource =
+ public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
@RegisterExtension
@@ -113,7 +112,7 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
@@ -271,7 +270,7 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
index 3edaafca0e..441b5ed2a4 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
@@ -36,7 +36,7 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.HadoopCatalogExtension;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
@@ -88,7 +88,7 @@ public class TestFlinkIcebergSinkBranch extends
TestFlinkIcebergSinkBase {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100);
tableLoader = CATALOG_EXTENSION.tableLoader();
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 55909874cc..577c54976b 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -38,7 +38,6 @@ import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopCatalogExtension;
-import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
@@ -57,7 +56,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
@Timeout(value = 60)
public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base {
@RegisterExtension
- public static MiniClusterExtension miniClusterResource =
+ public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
@RegisterExtension
@@ -89,7 +88,7 @@ public class TestFlinkIcebergSinkV2 extends
TestFlinkIcebergSinkV2Base {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100L)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
index ffeab67338..0b0c55f51c 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
@@ -31,7 +31,7 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.HadoopCatalogExtension;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -71,7 +71,7 @@ public class TestFlinkIcebergSinkV2Branch extends
TestFlinkIcebergSinkV2Base {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100);
tableLoader = CATALOG_EXTENSION.tableLoader();
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/ChangeLogTableTestBase.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/ChangeLogTableTestBase.java
index d3748e008b..5dfbbe3abe 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/ChangeLogTableTestBase.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/ChangeLogTableTestBase.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.TestBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -61,7 +61,7 @@ public class ChangeLogTableTestBase extends TestBase {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+
MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(400)
.setMaxParallelism(1)
.setParallelism(1);
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index 049ddf9e3f..cf6b233dce 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -64,7 +64,7 @@ import org.junit.jupiter.api.io.TempDir;
@ExtendWith(ParameterizedTestExtension.class)
public abstract class TestFlinkScan {
@RegisterExtension
- protected static MiniClusterExtension miniClusterResource =
+ protected static MiniClusterExtension miniClusterExtension =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
@TempDir protected Path temporaryDirectory;
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
index 5e6a2b3cae..70889f4f76 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
@@ -18,7 +18,7 @@
*/
package org.apache.iceberg.flink.source;
-import static
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static
org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
import java.io.Serializable;
import java.nio.file.Path;
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
index 8013bce3f4..f9b776397c 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
@@ -52,7 +52,7 @@ import org.junit.jupiter.api.io.TempDir;
/** Test other more advanced usage of SQL. They don't need to run for every
file format. */
public abstract class TestSqlBase {
@RegisterExtension
- public static MiniClusterExtension miniClusterResource =
+ public static MiniClusterExtension miniClusterExtension =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
@RegisterExtension
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index d6cf679127..57ee7baf20 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -42,7 +42,7 @@ import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.CatalogTestBase;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.AfterEach;
@@ -70,7 +70,7 @@ public class TestStreamScanSql extends CatalogTestBase {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
StreamTableEnvironment streamTableEnv =
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
deleted file mode 100644
index 399d7aaff6..0000000000
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.flink;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.testutils.InMemoryReporter;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
-
-public class MiniClusterResource {
-
- private static final int DEFAULT_TM_NUM = 1;
- private static final int DEFAULT_PARALLELISM = 4;
-
- public static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG =
- new Configuration()
- // disable classloader check as Avro may cache class/object in the
serializers.
- .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
-
- private MiniClusterResource() {}
-
- /**
- * It will start a mini cluster with
classloader.check-leaked-classloader=false, so that we won't
- * break the unit tests because of the class loader leak issue. In our
iceberg integration tests,
- * there're some that will assert the results after finished the flink jobs,
so actually we may
- * access the class loader that has been closed by the flink task managers
if we enable the switch
- * classloader.check-leaked-classloader by default.
- */
- public static MiniClusterWithClientResource
createWithClassloaderCheckDisabled() {
- return new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(DEFAULT_TM_NUM)
- .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
- .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
- .build());
- }
-
- public static MiniClusterWithClientResource
createWithClassloaderCheckDisabled(
- InMemoryReporter inMemoryReporter) {
- Configuration configuration =
- new
Configuration(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
- inMemoryReporter.addToConfiguration(configuration);
-
- return new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(MiniClusterResource.DEFAULT_TM_NUM)
-
.setNumberSlotsPerTaskManager(MiniClusterResource.DEFAULT_PARALLELISM)
- .setConfiguration(configuration)
- .build());
- }
-}
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
index 6367a064f2..a74226092f 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
@@ -43,7 +43,7 @@ import org.junit.jupiter.api.io.TempDir;
public abstract class TestBase extends TestBaseUtils {
@RegisterExtension
- public static MiniClusterExtension miniClusterResource =
+ public static MiniClusterExtension miniClusterExtension =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
@TempDir protected Path temporaryDirectory;
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index b7fce104f4..b73300e3f1 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -91,7 +91,7 @@ public class TestFlinkTableSink extends CatalogTestBase {
settingsBuilder.inStreamingMode();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
env.setMaxParallelism(2);
env.setParallelism(2);
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index 5674c83e40..d52d54e159 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -75,7 +75,7 @@ public class TestFlinkUpsert extends CatalogTestBase {
settingsBuilder.inStreamingMode();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
env.setMaxParallelism(2);
env.setParallelism(2);
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
index 013b98e3b8..b709c0058f 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
@@ -176,7 +176,7 @@ public class TestIcebergConnector extends TestBase {
settingsBuilder.inStreamingMode();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+
MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
env.setMaxParallelism(2);
env.setParallelism(2);
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
index dc3eb93280..ba0ea867ff 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
@@ -18,7 +18,7 @@
*/
package org.apache.iceberg.flink.sink;
-import static
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static
org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
import static org.apache.iceberg.flink.TestFixtures.DATABASE;
import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER;
import static org.assertj.core.api.Assertions.assertThat;
@@ -63,7 +63,7 @@ public class TestBucketPartitionerFlinkIcebergSink {
private static final int SLOTS_PER_TASK_MANAGER = 8;
@RegisterExtension
- private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(NUMBER_TASK_MANAGERS)
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 8cad35c859..61ab087f2c 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -44,7 +44,6 @@ import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.HadoopCatalogExtension;
-import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
@@ -62,7 +61,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
@RegisterExtension
- public static MiniClusterExtension miniClusterResource =
+ public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
@RegisterExtension
@@ -113,7 +112,7 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
@@ -271,7 +270,7 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
index 3edaafca0e..441b5ed2a4 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
@@ -36,7 +36,7 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.HadoopCatalogExtension;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
@@ -88,7 +88,7 @@ public class TestFlinkIcebergSinkBranch extends
TestFlinkIcebergSinkBase {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100);
tableLoader = CATALOG_EXTENSION.tableLoader();
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 55909874cc..577c54976b 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -38,7 +38,6 @@ import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopCatalogExtension;
-import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
@@ -57,7 +56,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
@Timeout(value = 60)
public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base {
@RegisterExtension
- public static MiniClusterExtension miniClusterResource =
+ public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
@RegisterExtension
@@ -89,7 +88,7 @@ public class TestFlinkIcebergSinkV2 extends
TestFlinkIcebergSinkV2Base {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100L)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
index ffeab67338..0b0c55f51c 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
@@ -31,7 +31,7 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.HadoopCatalogExtension;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -71,7 +71,7 @@ public class TestFlinkIcebergSinkV2Branch extends
TestFlinkIcebergSinkV2Base {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100);
tableLoader = CATALOG_EXTENSION.tableLoader();
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/ChangeLogTableTestBase.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/ChangeLogTableTestBase.java
index d3748e008b..5dfbbe3abe 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/ChangeLogTableTestBase.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/ChangeLogTableTestBase.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.TestBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -61,7 +61,7 @@ public class ChangeLogTableTestBase extends TestBase {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+
MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(400)
.setMaxParallelism(1)
.setParallelism(1);
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index 049ddf9e3f..cf6b233dce 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -64,7 +64,7 @@ import org.junit.jupiter.api.io.TempDir;
@ExtendWith(ParameterizedTestExtension.class)
public abstract class TestFlinkScan {
@RegisterExtension
- protected static MiniClusterExtension miniClusterResource =
+ protected static MiniClusterExtension miniClusterExtension =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
@TempDir protected Path temporaryDirectory;
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
index 749cbf8933..e0e2bf5e61 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
@@ -67,7 +67,7 @@ public class TestIcebergSourceContinuous {
@TempDir protected Path temporaryFolder;
@RegisterExtension
- public static MiniClusterExtension miniClusterExtension =
+ public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled(METRIC_REPORTER);
@RegisterExtension
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
index 5e6a2b3cae..70889f4f76 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
@@ -18,7 +18,7 @@
*/
package org.apache.iceberg.flink.source;
-import static
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static
org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
import java.io.Serializable;
import java.nio.file.Path;
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
index 8013bce3f4..f9b776397c 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
@@ -52,7 +52,7 @@ import org.junit.jupiter.api.io.TempDir;
/** Test other more advanced usage of SQL. They don't need to run for every
file format. */
public abstract class TestSqlBase {
@RegisterExtension
- public static MiniClusterExtension miniClusterResource =
+ public static MiniClusterExtension miniClusterExtension =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
@RegisterExtension
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index d6cf679127..57ee7baf20 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -42,7 +42,7 @@ import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.CatalogTestBase;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.AfterEach;
@@ -70,7 +70,7 @@ public class TestStreamScanSql extends CatalogTestBase {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
StreamTableEnvironment streamTableEnv =
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
deleted file mode 100644
index 399d7aaff6..0000000000
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.flink;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.testutils.InMemoryReporter;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
-
-public class MiniClusterResource {
-
- private static final int DEFAULT_TM_NUM = 1;
- private static final int DEFAULT_PARALLELISM = 4;
-
- public static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG =
- new Configuration()
- // disable classloader check as Avro may cache class/object in the
serializers.
- .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
-
- private MiniClusterResource() {}
-
- /**
- * It will start a mini cluster with
classloader.check-leaked-classloader=false, so that we won't
- * break the unit tests because of the class loader leak issue. In our
iceberg integration tests,
- * there're some that will assert the results after finished the flink jobs,
so actually we may
- * access the class loader that has been closed by the flink task managers
if we enable the switch
- * classloader.check-leaked-classloader by default.
- */
- public static MiniClusterWithClientResource
createWithClassloaderCheckDisabled() {
- return new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(DEFAULT_TM_NUM)
- .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
- .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
- .build());
- }
-
- public static MiniClusterWithClientResource
createWithClassloaderCheckDisabled(
- InMemoryReporter inMemoryReporter) {
- Configuration configuration =
- new
Configuration(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
- inMemoryReporter.addToConfiguration(configuration);
-
- return new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(MiniClusterResource.DEFAULT_TM_NUM)
-
.setNumberSlotsPerTaskManager(MiniClusterResource.DEFAULT_PARALLELISM)
- .setConfiguration(configuration)
- .build());
- }
-}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
index 773d22e19e..6336900446 100644
--- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
+++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
@@ -44,7 +44,7 @@ import org.junit.jupiter.api.io.TempDir;
public abstract class TestBase extends TestBaseUtils {
@RegisterExtension
- public static MiniClusterExtension miniClusterResource =
+ public static MiniClusterExtension miniClusterExtension =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
@TempDir protected Path temporaryDirectory;
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index 3f66174049..a0341e6834 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -91,7 +91,7 @@ public class TestFlinkTableSink extends CatalogTestBase {
settingsBuilder.inStreamingMode();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
env.setMaxParallelism(2);
env.setParallelism(2);
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index baf13017ff..c5becb6cac 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -75,7 +75,7 @@ public class TestFlinkUpsert extends CatalogTestBase {
settingsBuilder.inStreamingMode();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
env.setMaxParallelism(2);
env.setParallelism(2);
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
index c978ec6f8b..fdb0e0cf19 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
@@ -176,7 +176,7 @@ public class TestIcebergConnector extends TestBase {
settingsBuilder.inStreamingMode();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+
MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
env.setMaxParallelism(2);
env.setParallelism(2);
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index 269ae681b0..272e0b693f 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -18,7 +18,7 @@
*/
package org.apache.iceberg.flink.maintenance.operator;
-import static
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static
org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -34,7 +34,7 @@ class OperatorTestBase {
static final String TABLE_NAME = "test_table";
@RegisterExtension
- protected static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ protected static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(NUMBER_TASK_MANAGERS)
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
index dc3eb93280..ba0ea867ff 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java
@@ -18,7 +18,7 @@
*/
package org.apache.iceberg.flink.sink;
-import static
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static
org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
import static org.apache.iceberg.flink.TestFixtures.DATABASE;
import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER;
import static org.assertj.core.api.Assertions.assertThat;
@@ -63,7 +63,7 @@ public class TestBucketPartitionerFlinkIcebergSink {
private static final int SLOTS_PER_TASK_MANAGER = 8;
@RegisterExtension
- private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(NUMBER_TASK_MANAGERS)
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 8cad35c859..61ab087f2c 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -44,7 +44,6 @@ import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.HadoopCatalogExtension;
-import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
@@ -62,7 +61,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
@RegisterExtension
- public static MiniClusterExtension miniClusterResource =
+ public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
@RegisterExtension
@@ -113,7 +112,7 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
@@ -271,7 +270,7 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
index 3edaafca0e..441b5ed2a4 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
@@ -36,7 +36,7 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.HadoopCatalogExtension;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
@@ -88,7 +88,7 @@ public class TestFlinkIcebergSinkBranch extends
TestFlinkIcebergSinkBase {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100);
tableLoader = CATALOG_EXTENSION.tableLoader();
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 55909874cc..577c54976b 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -38,7 +38,6 @@ import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopCatalogExtension;
-import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
@@ -57,7 +56,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
@Timeout(value = 60)
public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base {
@RegisterExtension
- public static MiniClusterExtension miniClusterResource =
+ public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
@RegisterExtension
@@ -89,7 +88,7 @@ public class TestFlinkIcebergSinkV2 extends
TestFlinkIcebergSinkV2Base {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100L)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
index ffeab67338..0b0c55f51c 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
@@ -31,7 +31,7 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.HadoopCatalogExtension;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -71,7 +71,7 @@ public class TestFlinkIcebergSinkV2Branch extends
TestFlinkIcebergSinkV2Base {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100);
tableLoader = CATALOG_EXTENSION.tableLoader();
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/ChangeLogTableTestBase.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/ChangeLogTableTestBase.java
index d3748e008b..5dfbbe3abe 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/ChangeLogTableTestBase.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/ChangeLogTableTestBase.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.TestBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -61,7 +61,7 @@ public class ChangeLogTableTestBase extends TestBase {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+
MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(400)
.setMaxParallelism(1)
.setParallelism(1);
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index 049ddf9e3f..cf6b233dce 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -64,7 +64,7 @@ import org.junit.jupiter.api.io.TempDir;
@ExtendWith(ParameterizedTestExtension.class)
public abstract class TestFlinkScan {
@RegisterExtension
- protected static MiniClusterExtension miniClusterResource =
+ protected static MiniClusterExtension miniClusterExtension =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
@TempDir protected Path temporaryDirectory;
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
index 5e6a2b3cae..70889f4f76 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
@@ -18,7 +18,7 @@
*/
package org.apache.iceberg.flink.source;
-import static
org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
+import static
org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
import java.io.Serializable;
import java.nio.file.Path;
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
index 3285a16a12..b21010a91b 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java
@@ -55,7 +55,7 @@ public class TestIcebergSpeculativeExecutionSupport extends
TestBase {
private static final int NUM_TASK_SLOTS = 3;
@RegisterExtension
- public static MiniClusterExtension miniClusterResource =
+ public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(NUM_TASK_MANAGERS)
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
index 8013bce3f4..f9b776397c 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
@@ -52,7 +52,7 @@ import org.junit.jupiter.api.io.TempDir;
/** Test other more advanced usage of SQL. They don't need to run for every
file format. */
public abstract class TestSqlBase {
@RegisterExtension
- public static MiniClusterExtension miniClusterResource =
+ public static MiniClusterExtension miniClusterExtension =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
@RegisterExtension
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index fcf5c1479d..97ed4ca1e9 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -42,7 +42,7 @@ import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.CatalogTestBase;
-import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.AfterEach;
@@ -70,7 +70,7 @@ public class TestStreamScanSql extends CatalogTestBase {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
StreamTableEnvironment streamTableEnv =