This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 576354cf44a41a1f8d7c0d12801c91badc1e9fcf Author: Chesnay Schepler <[email protected]> AuthorDate: Thu Feb 24 14:23:38 2022 +0100 [hotfix][tests] Restructure AbstractHaJobRunITCase Refactors the test to make use of a static MiniClusterExtension, in preperation FLINK-26252. --- flink-filesystems/flink-s3-fs-base/pom.xml | 6 ++++ .../fs/s3/common/HAJobRunOnMinioS3StoreITCase.java | 41 +++++++++++++++++----- flink-filesystems/flink-s3-fs-hadoop/pom.xml | 6 ++++ flink-filesystems/flink-s3-fs-presto/pom.xml | 6 ++++ .../highavailability/AbstractHAJobRunITCase.java | 40 +++++---------------- 5 files changed, 59 insertions(+), 40 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-base/pom.xml b/flink-filesystems/flink-s3-fs-base/pom.xml index 0b4a337..9ccf84e 100644 --- a/flink-filesystems/flink-s3-fs-base/pom.xml +++ b/flink-filesystems/flink-s3-fs-base/pom.xml @@ -251,6 +251,12 @@ under the License. </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java index c8f358d..ebe71da 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java @@ -20,18 +20,25 @@ package org.apache.flink.fs.s3.common; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.core.testutils.EachCallbackWrapper; import org.apache.flink.core.testutils.TestContainerExtension; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.highavailability.AbstractHAJobRunITCase; import org.apache.flink.runtime.highavailability.FileSystemJobResultStore; import org.apache.flink.runtime.highavailability.JobResultStoreOptions; +import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.MiniClusterExtension; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; import com.amazonaws.services.s3.model.S3ObjectSummary; import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.extension.RegisterExtension; import java.time.Duration; @@ -51,10 +58,29 @@ public abstract class HAJobRunOnMinioS3StoreITCase extends AbstractHAJobRunITCas private static final String JOB_RESULT_STORE_FOLDER = "jrs"; @RegisterExtension + @Order(2) private static final AllCallbackWrapper<TestContainerExtension<MinioTestContainer>> MINIO_EXTENSION = new AllCallbackWrapper<>(new TestContainerExtension<>(MinioTestContainer::new)); + @RegisterExtension + @Order(3) + private static final EachCallbackWrapper<MiniClusterExtension> miniClusterExtension = + new EachCallbackWrapper<>( + new MiniClusterExtension( + () -> { + final Configuration configuration = createConfiguration(); + FileSystem.initialize(configuration, null); + return new MiniClusterResourceConfiguration.Builder() + .setConfiguration(configuration) + .build(); + })); + + @Override + public MiniCluster getMiniCluster() { + return miniClusterExtension.getCustomExtension().getMiniCluster(); + } + private static MinioTestContainer getMinioContainer() { return MINIO_EXTENSION.getCustomExtension().getTestContainer(); } @@ -77,13 +103,7 @@ public abstract class HAJobRunOnMinioS3StoreITCase extends AbstractHAJobRunITCas return pathSeparator + StringUtils.join(subfolders, pathSeparator); } - @Override - protected String getHAStoragePath() { - return createS3URIWithSubPath(CLUSTER_ID); - } - - @Override - protected Configuration createConfiguration() { + private static Configuration createConfiguration() { final Configuration config = new Configuration(); getMinioContainer().setS3ConfigOptions(config); @@ -94,7 +114,12 @@ public abstract class HAJobRunOnMinioS3StoreITCase extends AbstractHAJobRunITCas JobResultStoreOptions.STORAGE_PATH, createS3URIWithSubPath(CLUSTER_ID, JOB_RESULT_STORE_FOLDER)); - return config; + return addHaConfiguration(config, createS3URIWithSubPath(CLUSTER_ID)); + } + + @AfterAll + public static void unsetFileSystem() { + FileSystem.initialize(new Configuration(), null); } @Override diff --git a/flink-filesystems/flink-s3-fs-hadoop/pom.xml b/flink-filesystems/flink-s3-fs-hadoop/pom.xml index 58956e8..6c3799d 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/pom.xml +++ b/flink-filesystems/flink-s3-fs-hadoop/pom.xml @@ -101,6 +101,12 @@ under the License. <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.flink</groupId> diff --git a/flink-filesystems/flink-s3-fs-presto/pom.xml b/flink-filesystems/flink-s3-fs-presto/pom.xml index d1f7493..9a87655 100644 --- a/flink-filesystems/flink-s3-fs-presto/pom.xml +++ b/flink-filesystems/flink-s3-fs-presto/pom.xml @@ -69,6 +69,12 @@ under the License. <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.curator</groupId> diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java index f960cac..9398cda 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java @@ -25,18 +25,15 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.testutils.AllCallbackWrapper; -import org.apache.flink.core.testutils.EachCallbackWrapper; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.testutils.MiniClusterExtension; -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.runtime.zookeeper.ZooKeeperExtension; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.util.concurrent.FutureUtils; -import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; @@ -54,25 +51,17 @@ import static org.assertj.core.api.Assertions.assertThat; public abstract class AbstractHAJobRunITCase { @RegisterExtension + @Order(1) private static final AllCallbackWrapper<ZooKeeperExtension> ZOOKEEPER_EXTENSION = new AllCallbackWrapper<>(new ZooKeeperExtension()); - @RegisterExtension - private final EachCallbackWrapper<MiniClusterExtension> miniClusterExtension = - new EachCallbackWrapper<>( - new MiniClusterExtension( - new MiniClusterResourceConfiguration.Builder() - .setConfiguration(getFlinkConfiguration()) - .build())); - - private Configuration getFlinkConfiguration() { - final Configuration config = createConfiguration(); - + protected static Configuration addHaConfiguration( + final Configuration config, final String haStoragePath) { config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.set( HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString()); - config.set(HighAvailabilityOptions.HA_STORAGE_PATH, getHAStoragePath()); + config.set(HighAvailabilityOptions.HA_STORAGE_PATH, haStoragePath); // getFlinkConfiguration() is called on each new instantiation of the MiniCluster which is // happening before each test run @@ -81,26 +70,13 @@ public abstract class AbstractHAJobRunITCase { return config; } - @AfterEach - public void unsetFileSystem() { - FileSystem.initialize(new Configuration(), null); - } - - /** - * Should return the path to the HA storage which will be injected into the Flink configuration. - * - * @see HighAvailabilityOptions#HA_STORAGE_PATH - */ - protected abstract String getHAStoragePath(); - - /** Initializes the {@link Configuration} used for the Flink cluster. */ - protected abstract Configuration createConfiguration(); - protected void runAfterJobTermination() throws Exception {} + protected abstract MiniCluster getMiniCluster(); + @Test public void testJobExecutionInHaMode() throws Exception { - final MiniCluster flinkCluster = miniClusterExtension.getCustomExtension().getMiniCluster(); + final MiniCluster flinkCluster = getMiniCluster(); final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
