This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 576354cf44a41a1f8d7c0d12801c91badc1e9fcf
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Feb 24 14:23:38 2022 +0100

    [hotfix][tests] Restructure AbstractHaJobRunITCase
    
    Refactors the test to make use of a static MiniClusterExtension, in 
preperation FLINK-26252.
---
 flink-filesystems/flink-s3-fs-base/pom.xml         |  6 ++++
 .../fs/s3/common/HAJobRunOnMinioS3StoreITCase.java | 41 +++++++++++++++++-----
 flink-filesystems/flink-s3-fs-hadoop/pom.xml       |  6 ++++
 flink-filesystems/flink-s3-fs-presto/pom.xml       |  6 ++++
 .../highavailability/AbstractHAJobRunITCase.java   | 40 +++++----------------
 5 files changed, 59 insertions(+), 40 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-base/pom.xml 
b/flink-filesystems/flink-s3-fs-base/pom.xml
index 0b4a337..9ccf84e 100644
--- a/flink-filesystems/flink-s3-fs-base/pom.xml
+++ b/flink-filesystems/flink-s3-fs-base/pom.xml
@@ -251,6 +251,12 @@ under the License.
                                </exclusion>
                        </exclusions>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java
index c8f358d..ebe71da 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java
@@ -20,18 +20,25 @@ package org.apache.flink.fs.s3.common;
 
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.testutils.AllCallbackWrapper;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
 import org.apache.flink.core.testutils.TestContainerExtension;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.highavailability.AbstractHAJobRunITCase;
 import org.apache.flink.runtime.highavailability.FileSystemJobResultStore;
 import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
+import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.time.Duration;
@@ -51,10 +58,29 @@ public abstract class HAJobRunOnMinioS3StoreITCase extends 
AbstractHAJobRunITCas
     private static final String JOB_RESULT_STORE_FOLDER = "jrs";
 
     @RegisterExtension
+    @Order(2)
     private static final 
AllCallbackWrapper<TestContainerExtension<MinioTestContainer>>
             MINIO_EXTENSION =
                     new AllCallbackWrapper<>(new 
TestContainerExtension<>(MinioTestContainer::new));
 
+    @RegisterExtension
+    @Order(3)
+    private static final EachCallbackWrapper<MiniClusterExtension> 
miniClusterExtension =
+            new EachCallbackWrapper<>(
+                    new MiniClusterExtension(
+                            () -> {
+                                final Configuration configuration = 
createConfiguration();
+                                FileSystem.initialize(configuration, null);
+                                return new 
MiniClusterResourceConfiguration.Builder()
+                                        .setConfiguration(configuration)
+                                        .build();
+                            }));
+
+    @Override
+    public MiniCluster getMiniCluster() {
+        return miniClusterExtension.getCustomExtension().getMiniCluster();
+    }
+
     private static MinioTestContainer getMinioContainer() {
         return MINIO_EXTENSION.getCustomExtension().getTestContainer();
     }
@@ -77,13 +103,7 @@ public abstract class HAJobRunOnMinioS3StoreITCase extends 
AbstractHAJobRunITCas
         return pathSeparator + StringUtils.join(subfolders, pathSeparator);
     }
 
-    @Override
-    protected String getHAStoragePath() {
-        return createS3URIWithSubPath(CLUSTER_ID);
-    }
-
-    @Override
-    protected Configuration createConfiguration() {
+    private static Configuration createConfiguration() {
         final Configuration config = new Configuration();
 
         getMinioContainer().setS3ConfigOptions(config);
@@ -94,7 +114,12 @@ public abstract class HAJobRunOnMinioS3StoreITCase extends 
AbstractHAJobRunITCas
                 JobResultStoreOptions.STORAGE_PATH,
                 createS3URIWithSubPath(CLUSTER_ID, JOB_RESULT_STORE_FOLDER));
 
-        return config;
+        return addHaConfiguration(config, createS3URIWithSubPath(CLUSTER_ID));
+    }
+
+    @AfterAll
+    public static void unsetFileSystem() {
+        FileSystem.initialize(new Configuration(), null);
     }
 
     @Override
diff --git a/flink-filesystems/flink-s3-fs-hadoop/pom.xml 
b/flink-filesystems/flink-s3-fs-hadoop/pom.xml
index 58956e8..6c3799d 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-s3-fs-hadoop/pom.xml
@@ -101,6 +101,12 @@ under the License.
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
diff --git a/flink-filesystems/flink-s3-fs-presto/pom.xml 
b/flink-filesystems/flink-s3-fs-presto/pom.xml
index d1f7493..9a87655 100644
--- a/flink-filesystems/flink-s3-fs-presto/pom.xml
+++ b/flink-filesystems/flink-s3-fs-presto/pom.xml
@@ -69,6 +69,12 @@ under the License.
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
 
                <dependency>
                        <groupId>org.apache.curator</groupId>
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java
index f960cac..9398cda 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java
@@ -25,18 +25,15 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.testutils.AllCallbackWrapper;
-import org.apache.flink.core.testutils.EachCallbackWrapper;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.testutils.MiniClusterExtension;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.concurrent.FutureUtils;
 
-import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -54,25 +51,17 @@ import static org.assertj.core.api.Assertions.assertThat;
 public abstract class AbstractHAJobRunITCase {
 
     @RegisterExtension
+    @Order(1)
     private static final AllCallbackWrapper<ZooKeeperExtension> 
ZOOKEEPER_EXTENSION =
             new AllCallbackWrapper<>(new ZooKeeperExtension());
 
-    @RegisterExtension
-    private final EachCallbackWrapper<MiniClusterExtension> 
miniClusterExtension =
-            new EachCallbackWrapper<>(
-                    new MiniClusterExtension(
-                            new MiniClusterResourceConfiguration.Builder()
-                                    .setConfiguration(getFlinkConfiguration())
-                                    .build()));
-
-    private Configuration getFlinkConfiguration() {
-        final Configuration config = createConfiguration();
-
+    protected static Configuration addHaConfiguration(
+            final Configuration config, final String haStoragePath) {
         config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
         config.set(
                 HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
                 ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString());
-        config.set(HighAvailabilityOptions.HA_STORAGE_PATH, 
getHAStoragePath());
+        config.set(HighAvailabilityOptions.HA_STORAGE_PATH, haStoragePath);
 
         // getFlinkConfiguration() is called on each new instantiation of the 
MiniCluster which is
         // happening before each test run
@@ -81,26 +70,13 @@ public abstract class AbstractHAJobRunITCase {
         return config;
     }
 
-    @AfterEach
-    public void unsetFileSystem() {
-        FileSystem.initialize(new Configuration(), null);
-    }
-
-    /**
-     * Should return the path to the HA storage which will be injected into 
the Flink configuration.
-     *
-     * @see HighAvailabilityOptions#HA_STORAGE_PATH
-     */
-    protected abstract String getHAStoragePath();
-
-    /** Initializes the {@link Configuration} used for the Flink cluster. */
-    protected abstract Configuration createConfiguration();
-
     protected void runAfterJobTermination() throws Exception {}
 
+    protected abstract MiniCluster getMiniCluster();
+
     @Test
     public void testJobExecutionInHaMode() throws Exception {
-        final MiniCluster flinkCluster = 
miniClusterExtension.getCustomExtension().getMiniCluster();
+        final MiniCluster flinkCluster = getMiniCluster();
 
         final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
 

Reply via email to