zentol commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r803621829



##########
File path: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainer.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.Base58;
+
+import java.time.Duration;
+import java.util.Locale;
+
+/**
+ * {@code MinioTestContainer} provides a {@code Minio} test instance. This 
code is based on the work
+ * done by {@code alexkirnsu/minio-testcontainer}.
+ */
+public class MinioTestContainer extends GenericContainer<MinioTestContainer> {
+
+    private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint";
+
+    private static final int DEFAULT_PORT = 9000;
+
+    private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER";
+    private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD";
+
+    private static final String DEFAULT_STORAGE_DIRECTORY = "/data";
+    private static final String HEALTH_ENDPOINT = "/minio/health/ready";
+
+    private final String accessKey;
+    private final String secretKey;
+    private final String defaultBucketName;
+
+    public MinioTestContainer() {
+        this(randomString("bucket", 6));
+    }
+
+    public MinioTestContainer(String defaultBucketName) {
+        super(DockerImageVersions.MINIO);
+
+        this.accessKey = randomString("accessKey", 10);
+        // secrets must have at least 8 characters
+        this.secretKey = randomString("secret", 10);
+        this.defaultBucketName = Preconditions.checkNotNull(defaultBucketName);
+
+        withNetworkAliases(randomString("minio", 6));
+        addExposedPort(DEFAULT_PORT);

Review comment:
       For my understanding; this is the port that minio runs on within the 
container; but the port that we use to query minio is then determined by 
`getMappedPort` (which presumably returns some random port)?

##########
File path: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainer.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.Base58;
+
+import java.time.Duration;
+import java.util.Locale;
+
+/**
+ * {@code MinioTestContainer} provides a {@code Minio} test instance. This 
code is based on the work
+ * done by {@code alexkirnsu/minio-testcontainer}.
+ */
+public class MinioTestContainer extends GenericContainer<MinioTestContainer> {
+
+    private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint";
+
+    private static final int DEFAULT_PORT = 9000;
+
+    private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER";
+    private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD";
+
+    private static final String DEFAULT_STORAGE_DIRECTORY = "/data";
+    private static final String HEALTH_ENDPOINT = "/minio/health/ready";
+
+    private final String accessKey;
+    private final String secretKey;
+    private final String defaultBucketName;
+
+    public MinioTestContainer() {
+        this(randomString("bucket", 6));
+    }
+
+    public MinioTestContainer(String defaultBucketName) {
+        super(DockerImageVersions.MINIO);
+
+        this.accessKey = randomString("accessKey", 10);
+        // secrets must have at least 8 characters
+        this.secretKey = randomString("secret", 10);
+        this.defaultBucketName = Preconditions.checkNotNull(defaultBucketName);
+
+        withNetworkAliases(randomString("minio", 6));
+        addExposedPort(DEFAULT_PORT);
+        withEnv(MINIO_ACCESS_KEY, this.accessKey);
+        withEnv(MINIO_SECRET_KEY, this.secretKey);
+        withCommand("server", DEFAULT_STORAGE_DIRECTORY);
+        setWaitStrategy(
+                new HttpWaitStrategy()
+                        .forPort(DEFAULT_PORT)
+                        .forPath(HEALTH_ENDPOINT)
+                        .withStartupTimeout(Duration.ofMinutes(2)));
+    }
+
+    @Override
+    protected void containerIsStarted(InspectContainerResponse containerInfo) {
+        super.containerIsStarted(containerInfo);
+        createDefaultBucket();
+    }
+
+    private static String randomString(String prefix, int length) {
+        return String.format("%s-%s", prefix, 
Base58.randomString(length).toLowerCase(Locale.ROOT));
+    }
+
+    /** Creates {@link AmazonS3} client for accessing the {@code Minio} 
instance. */
+    public AmazonS3 getClient() {
+        return AmazonS3Client.builder()
+                .withCredentials(
+                        new AWSStaticCredentialsProvider(
+                                new BasicAWSCredentials(accessKey, secretKey)))
+                .withPathStyleAccessEnabled(true)
+                .withEndpointConfiguration(
+                        new AwsClientBuilder.EndpointConfiguration(
+                                getHttpEndpoint(), "unused-region"))
+                .build();
+    }
+
+    private String getHttpEndpoint() {
+        return String.format("http://%s:%s";, getContainerIpAddress(), 
getMappedPort(DEFAULT_PORT));
+    }
+
+    /**
+     * Initializes the Minio instance (i.e. creating the default bucket and 
initializing Flink's
+     * FileSystems). Additionally, the passed Flink {@link Configuration} is 
extended by all
+     * relevant parameter to access the {@code Minio} instance.
+     */
+    public void updateConfigAccordingly(Configuration config) {

Review comment:
       Not a huuuge fan of the name; why not "setS3Options"?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
##########
@@ -155,18 +155,15 @@ public boolean hasCleanJobResultEntryInternal(JobID 
jobId) throws IOException {
     @Override
     public Set<JobResult> getDirtyResultsInternal() throws IOException {
         final Set<JobResult> dirtyResults = new HashSet<>();
-        final FileStatus fs = fileSystem.getFileStatus(this.basePath);
-        if (fs.isDir()) {
-            FileStatus[] statuses = fileSystem.listStatus(this.basePath);

Review comment:
       I don't understand how this can fix anything, because the 
S3AFileSystem#listStatus uses getFileStatus internally. The presto 
implementation for both methods also do the same request against the server 
(#listPrefix).
   
   Are we sure we're drawing the right conclusions?

##########
File path: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainer.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.Base58;
+
+import java.time.Duration;
+import java.util.Locale;
+
+/**
+ * {@code MinioTestContainer} provides a {@code Minio} test instance. This 
code is based on the work
+ * done by {@code alexkirnsu/minio-testcontainer}.
+ */
+public class MinioTestContainer extends GenericContainer<MinioTestContainer> {
+
+    private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint";
+
+    private static final int DEFAULT_PORT = 9000;
+
+    private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER";
+    private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD";
+
+    private static final String DEFAULT_STORAGE_DIRECTORY = "/data";
+    private static final String HEALTH_ENDPOINT = "/minio/health/ready";
+
+    private final String accessKey;
+    private final String secretKey;
+    private final String defaultBucketName;
+
+    public MinioTestContainer() {
+        this(randomString("bucket", 6));
+    }
+
+    public MinioTestContainer(String defaultBucketName) {
+        super(DockerImageVersions.MINIO);
+
+        this.accessKey = randomString("accessKey", 10);
+        // secrets must have at least 8 characters
+        this.secretKey = randomString("secret", 10);
+        this.defaultBucketName = Preconditions.checkNotNull(defaultBucketName);
+
+        withNetworkAliases(randomString("minio", 6));
+        addExposedPort(DEFAULT_PORT);
+        withEnv(MINIO_ACCESS_KEY, this.accessKey);
+        withEnv(MINIO_SECRET_KEY, this.secretKey);
+        withCommand("server", DEFAULT_STORAGE_DIRECTORY);
+        setWaitStrategy(
+                new HttpWaitStrategy()
+                        .forPort(DEFAULT_PORT)
+                        .forPath(HEALTH_ENDPOINT)
+                        .withStartupTimeout(Duration.ofMinutes(2)));
+    }
+
+    @Override
+    protected void containerIsStarted(InspectContainerResponse containerInfo) {
+        super.containerIsStarted(containerInfo);
+        createDefaultBucket();
+    }
+
+    private static String randomString(String prefix, int length) {
+        return String.format("%s-%s", prefix, 
Base58.randomString(length).toLowerCase(Locale.ROOT));
+    }
+
+    /** Creates {@link AmazonS3} client for accessing the {@code Minio} 
instance. */
+    public AmazonS3 getClient() {
+        return AmazonS3Client.builder()
+                .withCredentials(
+                        new AWSStaticCredentialsProvider(
+                                new BasicAWSCredentials(accessKey, secretKey)))
+                .withPathStyleAccessEnabled(true)
+                .withEndpointConfiguration(
+                        new AwsClientBuilder.EndpointConfiguration(
+                                getHttpEndpoint(), "unused-region"))
+                .build();
+    }
+
+    private String getHttpEndpoint() {
+        return String.format("http://%s:%s";, getContainerIpAddress(), 
getMappedPort(DEFAULT_PORT));
+    }
+
+    /**
+     * Initializes the Minio instance (i.e. creating the default bucket and 
initializing Flink's
+     * FileSystems). Additionally, the passed Flink {@link Configuration} is 
extended by all
+     * relevant parameter to access the {@code Minio} instance.
+     */
+    public void updateConfigAccordingly(Configuration config) {
+        config.setString(FLINK_CONFIG_S3_ENDPOINT, getHttpEndpoint());
+        config.setString("s3.path.style.access", "true");
+        config.setString("s3.access.key", accessKey);
+        config.setString("s3.secret.key", secretKey);
+    }
+
+    public void initializeFileSystem(Configuration config) {

Review comment:
       can we move this into the test itself?
   ...
   Actually, do we need it at all?

##########
File path: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainer.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.Base58;
+
+import java.time.Duration;
+import java.util.Locale;
+
+/**
+ * {@code MinioTestContainer} provides a {@code Minio} test instance. This 
code is based on the work
+ * done by {@code alexkirnsu/minio-testcontainer}.
+ */
+public class MinioTestContainer extends GenericContainer<MinioTestContainer> {
+
+    private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint";
+
+    private static final int DEFAULT_PORT = 9000;
+
+    private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER";
+    private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD";
+
+    private static final String DEFAULT_STORAGE_DIRECTORY = "/data";
+    private static final String HEALTH_ENDPOINT = "/minio/health/ready";
+
+    private final String accessKey;
+    private final String secretKey;
+    private final String defaultBucketName;
+
+    public MinioTestContainer() {
+        this(randomString("bucket", 6));
+    }
+
+    public MinioTestContainer(String defaultBucketName) {
+        super(DockerImageVersions.MINIO);
+
+        this.accessKey = randomString("accessKey", 10);
+        // secrets must have at least 8 characters
+        this.secretKey = randomString("secret", 10);
+        this.defaultBucketName = Preconditions.checkNotNull(defaultBucketName);
+
+        withNetworkAliases(randomString("minio", 6));
+        addExposedPort(DEFAULT_PORT);
+        withEnv(MINIO_ACCESS_KEY, this.accessKey);
+        withEnv(MINIO_SECRET_KEY, this.secretKey);
+        withCommand("server", DEFAULT_STORAGE_DIRECTORY);
+        setWaitStrategy(
+                new HttpWaitStrategy()
+                        .forPort(DEFAULT_PORT)
+                        .forPath(HEALTH_ENDPOINT)
+                        .withStartupTimeout(Duration.ofMinutes(2)));
+    }
+
+    @Override
+    protected void containerIsStarted(InspectContainerResponse containerInfo) {
+        super.containerIsStarted(containerInfo);
+        createDefaultBucket();
+    }
+
+    private static String randomString(String prefix, int length) {
+        return String.format("%s-%s", prefix, 
Base58.randomString(length).toLowerCase(Locale.ROOT));
+    }
+
+    /** Creates {@link AmazonS3} client for accessing the {@code Minio} 
instance. */
+    public AmazonS3 getClient() {
+        return AmazonS3Client.builder()
+                .withCredentials(
+                        new AWSStaticCredentialsProvider(
+                                new BasicAWSCredentials(accessKey, secretKey)))
+                .withPathStyleAccessEnabled(true)
+                .withEndpointConfiguration(
+                        new AwsClientBuilder.EndpointConfiguration(
+                                getHttpEndpoint(), "unused-region"))
+                .build();
+    }
+
+    private String getHttpEndpoint() {
+        return String.format("http://%s:%s";, getContainerIpAddress(), 
getMappedPort(DEFAULT_PORT));
+    }
+
+    /**
+     * Initializes the Minio instance (i.e. creating the default bucket and 
initializing Flink's
+     * FileSystems). Additionally, the passed Flink {@link Configuration} is 
extended by all
+     * relevant parameter to access the {@code Minio} instance.
+     */
+    public void updateConfigAccordingly(Configuration config) {
+        config.setString(FLINK_CONFIG_S3_ENDPOINT, getHttpEndpoint());
+        config.setString("s3.path.style.access", "true");
+        config.setString("s3.access.key", accessKey);
+        config.setString("s3.secret.key", secretKey);
+    }
+
+    public void initializeFileSystem(Configuration config) {
+        Preconditions.checkArgument(
+                config.containsKey(FLINK_CONFIG_S3_ENDPOINT),
+                FLINK_CONFIG_S3_ENDPOINT
+                        + " needs to be specified before initializing the 
FileSystems.");
+        FileSystem.initialize(config, null);

Review comment:
       Whenever you call initialize you ideally also reset it once the test is 
complete.
   
   Generally relying on FileSystem.initialize in tests is...problematic, 
because it can interfere with other tests. This is fine-ish so long as ITCases 
run in separate JVMs, but it's still good to keep that in mind.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides 
{@code abstract}
+ * methods for initializing a specific {@link FileSystem}.
+ */
+@ExtendWith(TestLoggerExtension.class)
+public abstract class AbstractHaJobRunITCase {
+
+    @RegisterExtension
+    private static final AllCallbackWrapper<ZooKeeperExtension> 
ZOOKEEPER_EXTENSION =
+            new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+    @RegisterExtension
+    public final EachCallbackWrapper<MiniClusterExtension> 
miniClusterExtension =
+            new EachCallbackWrapper<>(
+                    new MiniClusterExtension(
+                            new MiniClusterResourceConfiguration.Builder()
+                                    .setNumberTaskManagers(1)
+                                    .setNumberSlotsPerTaskManager(1)
+                                    
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                                    .setConfiguration(getFlinkConfiguration())
+                                    .build()));
+
+    private Configuration getFlinkConfiguration() {
+        Configuration config = new Configuration();
+        config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+        config.set(
+                HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+                ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString());
+
+        config.set(HighAvailabilityOptions.HA_STORAGE_PATH, 
createHAStoragePath());
+
+        updateConfiguration(config);
+
+        FileSystem.initialize(config, loadPluginManager());
+        initializeStorageBackend(config);
+
+        return config;
+    }
+
+    /**
+     * Should return the path to the HA storage which will be injected into 
the Flink configuration.
+     *
+     * @see HighAvailabilityOptions#HA_STORAGE_PATH
+     */
+    protected abstract String createHAStoragePath();
+
+    /**
+     * Updates the passed {@link Configuration} to point to the {@link 
FileSystem} that's subject to
+     * test.
+     */
+    protected abstract void updateConfiguration(Configuration config);
+
+    /** Runs any additional initialization that are necessary before running 
the actual test. */
+    protected void initializeStorageBackend(Configuration config) {}

Review comment:
       Is this currently required?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides 
{@code abstract}
+ * methods for initializing a specific {@link FileSystem}.
+ */
+@ExtendWith(TestLoggerExtension.class)
+public abstract class AbstractHaJobRunITCase {
+
+    @RegisterExtension
+    private static final AllCallbackWrapper<ZooKeeperExtension> 
ZOOKEEPER_EXTENSION =
+            new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+    @RegisterExtension
+    public final EachCallbackWrapper<MiniClusterExtension> 
miniClusterExtension =
+            new EachCallbackWrapper<>(
+                    new MiniClusterExtension(
+                            new MiniClusterResourceConfiguration.Builder()
+                                    .setNumberTaskManagers(1)
+                                    .setNumberSlotsPerTaskManager(1)
+                                    
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                                    .setConfiguration(getFlinkConfiguration())
+                                    .build()));
+
+    private Configuration getFlinkConfiguration() {
+        Configuration config = new Configuration();
+        config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+        config.set(
+                HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+                ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString());
+
+        config.set(HighAvailabilityOptions.HA_STORAGE_PATH, 
createHAStoragePath());
+
+        updateConfiguration(config);
+
+        FileSystem.initialize(config, loadPluginManager());
+        initializeStorageBackend(config);
+
+        return config;
+    }
+
+    /**
+     * Should return the path to the HA storage which will be injected into 
the Flink configuration.
+     *
+     * @see HighAvailabilityOptions#HA_STORAGE_PATH
+     */
+    protected abstract String createHAStoragePath();
+
+    /**
+     * Updates the passed {@link Configuration} to point to the {@link 
FileSystem} that's subject to
+     * test.
+     */
+    protected abstract void updateConfiguration(Configuration config);
+
+    /** Runs any additional initialization that are necessary before running 
the actual test. */
+    protected void initializeStorageBackend(Configuration config) {}
+
+    @Nullable
+    protected PluginManager loadPluginManager() {

Review comment:
       Whats the benefit in having this method?

##########
File path: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainerTest.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
+import org.apache.flink.core.testutils.TestContainerExtension;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.Bucket;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * {@code MinioTestContainerTest} tests some basic functionality provided by 
{@link
+ * MinioTestContainer}.
+ */
+public class MinioTestContainerTest {
+
+    private static final String DEFAULT_BUCKET_NAME = "test-bucket";
+
+    @RegisterExtension
+    private static final 
EachCallbackWrapper<TestContainerExtension<MinioTestContainer>>
+            MINIO_EXTENSION =
+                    new EachCallbackWrapper<>(
+                            new TestContainerExtension<>(
+                                    () -> new 
MinioTestContainer(DEFAULT_BUCKET_NAME)));
+
+    private static MinioTestContainer getTestContainer() {
+        return MINIO_EXTENSION.getCustomExtension().getTestContainer();
+    }
+
+    private static AmazonS3 getClient() {
+        return getTestContainer().getClient();
+    }
+
+    @Test
+    public void testBucketCreation() {
+        final String otherBucketName = "other-bucket";

Review comment:
       rename to `bucketName`?

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/TestContainerExtension.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.testcontainers.containers.GenericContainer;
+
+import javax.annotation.Nullable;
+
+import java.util.function.Supplier;
+
+/**
+ * {@code TestContainerExtension} provides common functionality for {@code 
TestContainer}
+ * implementations.
+ *
+ * @param <T> The {@link GenericContainer} that shall be managed.
+ */
+public class TestContainerExtension<T extends GenericContainer<T>> implements 
CustomExtension {

Review comment:
       still think this could be generalized to a "RuleToExtensionAdapter".

##########
File path: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainerTest.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
+import org.apache.flink.core.testutils.TestContainerExtension;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.Bucket;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * {@code MinioTestContainerTest} tests some basic functionality provided by 
{@link
+ * MinioTestContainer}.
+ */
+public class MinioTestContainerTest {
+
+    private static final String DEFAULT_BUCKET_NAME = "test-bucket";
+
+    @RegisterExtension
+    private static final 
EachCallbackWrapper<TestContainerExtension<MinioTestContainer>>
+            MINIO_EXTENSION =
+                    new EachCallbackWrapper<>(
+                            new TestContainerExtension<>(
+                                    () -> new 
MinioTestContainer(DEFAULT_BUCKET_NAME)));
+
+    private static MinioTestContainer getTestContainer() {
+        return MINIO_EXTENSION.getCustomExtension().getTestContainer();
+    }
+
+    private static AmazonS3 getClient() {
+        return getTestContainer().getClient();
+    }
+
+    @Test
+    public void testBucketCreation() {
+        final String otherBucketName = "other-bucket";
+        final Bucket otherBucket = getClient().createBucket(otherBucketName);
+
+        assertThat(otherBucket).isNotNull();
+        
assertThat(otherBucket).extracting(Bucket::getName).isEqualTo(otherBucketName);
+
+        assertThat(getClient().listBuckets())
+                .map(Bucket::getName)
+                .containsExactlyInAnyOrder(
+                        getTestContainer().getDefaultBucketName(), 
otherBucketName);
+    }
+
+    @Test
+    public void testPutObject() throws IOException {
+        final String bucketName = "other-bucket";
+
+        getClient().createBucket(bucketName);
+        final String objectId = "test-object";
+        final String content = "test content";
+        getClient().putObject(bucketName, objectId, content);
+
+        final BufferedReader reader =
+                new BufferedReader(
+                        new InputStreamReader(
+                                getClient().getObject(bucketName, 
objectId).getObjectContent()));
+        assertThat(reader.readLine()).isEqualTo(content);
+    }
+
+    @Test
+    public void testFlinkConfigurationInitialization() {
+        final Configuration config = new Configuration();
+        getTestContainer().updateConfigAccordingly(config);
+
+        assertThat(config.containsKey("s3.endpoint")).isTrue();
+        assertThat(config.containsKey("s3.path.style.access")).isTrue();
+        assertThat(config.containsKey("s3.access.key")).isTrue();
+        assertThat(config.containsKey("s3.secret.key")).isTrue();
+    }
+
+    @Test
+    public void testDefaultBucketCreation() {
+        final Configuration config = new Configuration();
+        getTestContainer().updateConfigAccordingly(config);

Review comment:
       is this required? If so, why isnt it for testPutObject?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to