This is an automated email from the ASF dual-hosted git repository.

yzheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d8dc0010 Add support for rustfs test-containers (#3679)
1d8dc0010 is described below

commit 1d8dc001026e0ce4498c6a3aa3b831d1109a02db
Author: Yong Zheng <[email protected]>
AuthorDate: Sat Feb 7 15:47:12 2026 -0600

    Add support for rustfs test-containers (#3679)
---
 LICENSE                                            |   4 +
 README.md                                          |   1 +
 bom/build.gradle.kts                               |   1 +
 gradle/projects.main.properties                    |   1 +
 integration-tests/build.gradle.kts                 |   1 +
 .../it/test/CatalogFederationIntegrationTest.java  |  37 +-
 runtime/service/build.gradle.kts                   |   1 +
 .../service/it/PolarisRestCatalogRustFSIT.java     |  92 +++++
 .../service/it/RestCatalogRustFSSpecialIT.java     | 454 +++++++++++++++++++++
 .../polaris/service/it/nosql/NoSqlCatalogIT.java   |   8 +-
 .../service/spark/it/CatalogFederationIT.java      |   4 +-
 tools/rustfs-testcontainer/build.gradle.kts        |  39 ++
 .../org/apache/polaris/test/rustfs/Rustfs.java     |  46 +++
 .../apache/polaris/test/rustfs/RustfsAccess.java   |  68 +++
 .../polaris/test/rustfs/RustfsContainer.java       | 282 +++++++++++++
 .../polaris/test/rustfs/RustfsExtension.java       | 141 +++++++
 .../polaris/test/rustfs/Dockerfile-rustfs-version  |  22 +
 17 files changed, 1182 insertions(+), 20 deletions(-)

diff --git a/LICENSE b/LICENSE
index 437a6849e..b84fe1727 100644
--- a/LICENSE
+++ b/LICENSE
@@ -328,6 +328,10 @@ This product includes code from Project Nessie.
 * 
tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioAccess.java
 * 
tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioContainer.java
 * 
tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioExtension.java
+* 
tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/Rustfs.java
+* 
tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsAccess.java
+* 
tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsContainer.java
+* 
tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsExtension.java
 * 
runtime/admin/src/main/java/org/apache/polaris/admintool/PolarisAdminTool.java
 * 
runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java
 * helm/polaris/tests/logging_storage_test.yaml
diff --git a/README.md b/README.md
index 6079c943f..65929f09f 100644
--- a/README.md
+++ b/README.md
@@ -79,6 +79,7 @@ Apache Polaris is organized into the following modules:
         - `polaris-container-spec-helper` - Helper for container specifications
         - `polaris-immutables` - Predefined Immutables configuration & 
annotations for Polaris
         - `polaris-minio-testcontainer` - Minio test container
+        - `polaris-rustfs-testcontainer` - RustFS test container
         - `polaris-misc-types` - Miscellaneous types for Polaris
         - `polaris-version` - Versioning for Polaris
 
diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts
index d5e5afc75..8a1fa16b9 100644
--- a/bom/build.gradle.kts
+++ b/bom/build.gradle.kts
@@ -31,6 +31,7 @@ dependencies {
 
     api(project(":polaris-container-spec-helper"))
     api(project(":polaris-minio-testcontainer"))
+    api(project(":polaris-rustfs-testcontainer"))
     api(project(":polaris-immutables"))
     api(project(":polaris-misc-types"))
     api(project(":polaris-version"))
diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties
index fadbbadea..5cb39799f 100644
--- a/gradle/projects.main.properties
+++ b/gradle/projects.main.properties
@@ -38,6 +38,7 @@ aggregated-license-report=aggregated-license-report
 polaris-immutables=tools/immutables
 polaris-container-spec-helper=tools/container-spec-helper
 polaris-minio-testcontainer=tools/minio-testcontainer
+polaris-rustfs-testcontainer=tools/rustfs-testcontainer
 polaris-version=tools/version
 polaris-misc-types=tools/misc-types
 polaris-extensions-federation-hadoop=extensions/federation/hadoop
diff --git a/integration-tests/build.gradle.kts 
b/integration-tests/build.gradle.kts
index 0ac894cab..f91f327c9 100644
--- a/integration-tests/build.gradle.kts
+++ b/integration-tests/build.gradle.kts
@@ -68,6 +68,7 @@ dependencies {
   implementation(libs.s3mock.testcontainers)
   implementation(project(":polaris-runtime-test-common"))
   implementation(project(":polaris-minio-testcontainer"))
+  implementation(project(":polaris-rustfs-testcontainer"))
 }
 
 copiedCodeChecks {
diff --git 
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
 
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
index e60fe3fad..3e4738f53 100644
--- 
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
+++ 
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
@@ -25,6 +25,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import java.net.URI;
 import java.util.List;
 import java.util.UUID;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.iceberg.exceptions.ForbiddenException;
 import org.apache.polaris.core.admin.model.AuthenticationParameters;
 import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
@@ -52,9 +53,9 @@ import org.apache.polaris.service.it.env.PolarisApiEndpoints;
 import org.apache.polaris.service.it.env.PolarisClient;
 import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension;
 import org.apache.polaris.service.it.ext.SparkSessionBuilder;
-import org.apache.polaris.test.minio.Minio;
-import org.apache.polaris.test.minio.MinioAccess;
-import org.apache.polaris.test.minio.MinioExtension;
+import org.apache.polaris.test.rustfs.Rustfs;
+import org.apache.polaris.test.rustfs.RustfsAccess;
+import org.apache.polaris.test.rustfs.RustfsExtension;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.junit.jupiter.api.AfterAll;
@@ -69,13 +70,13 @@ import org.junit.jupiter.api.io.TempDir;
  * Integration test for catalog federation functionality. This test verifies 
that an external
  * catalog can be created that federates with an internal catalog.
  */
-@ExtendWith(MinioExtension.class)
+@ExtendWith(RustfsExtension.class)
 @ExtendWith(PolarisIntegrationTestExtension.class)
 public class CatalogFederationIntegrationTest {
 
-  public static final String BUCKET_URI_PREFIX = 
"/minio-test-catalog-federation";
-  public static final String MINIO_ACCESS_KEY = 
"test-ak-123-catalog-federation";
-  public static final String MINIO_SECRET_KEY = 
"test-sk-123-catalog-federation";
+  public static final String BUCKET_URI_PREFIX = 
"/rustfs-test-catalog-federation";
+  public static final String RUSTFS_ACCESS_KEY = 
"test-ak-123-catalog-federation";
+  public static final String RUSTFS_SECRET_KEY = 
"test-sk-123-catalog-federation";
 
   private static PolarisClient client;
   private static CatalogApi catalogApi;
@@ -108,21 +109,22 @@ public class CatalogFederationIntegrationTest {
   static void setup(
       PolarisApiEndpoints apiEndpoints,
       ClientCredentials credentials,
-      @Minio(accessKey = MINIO_ACCESS_KEY, secretKey = MINIO_SECRET_KEY) 
MinioAccess minioAccess) {
+      @Rustfs(accessKey = RUSTFS_ACCESS_KEY, secretKey = RUSTFS_SECRET_KEY)
+          RustfsAccess rustfsAccess) {
     endpoints = apiEndpoints;
     client = polarisClient(endpoints);
     String adminToken = client.obtainToken(credentials);
     managementApi = client.managementApi(adminToken);
     catalogApi = client.catalogApi(adminToken);
-    endpoint = minioAccess.s3endpoint();
+    endpoint = rustfsAccess.s3endpoint();
 
-    localStorageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX + 
"/local_catalog");
-    remoteStorageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX + 
"/federated_catalog");
+    localStorageBase = rustfsAccess.s3BucketUri(BUCKET_URI_PREFIX + 
"/local_catalog");
+    remoteStorageBase = rustfsAccess.s3BucketUri(BUCKET_URI_PREFIX + 
"/federated_catalog");
     // Allow credential vending for tables located under ns1
     remoteStorageExtraAllowedLocationNs1 =
-        minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns1");
+        rustfsAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns1");
     remoteStorageExtraAllowedLocationNs2 =
-        minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns2");
+        rustfsAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns2");
   }
 
   @AfterAll
@@ -422,7 +424,14 @@ public class CatalogFederationIntegrationTest {
 
     // Verify that write is blocked since the vended credential should only 
have read permission
     assertThatThrownBy(() -> spark.sql("INSERT INTO ns1.test_table VALUES (3, 
'Charlie')"))
-        .hasMessageContaining("Access Denied. (Service: S3, Status Code: 
403,");
+        .satisfies(
+            ex -> {
+              Throwable root = ExceptionUtils.getRootCause(ex);
+              assertThat(root)
+                  .isInstanceOf(
+                      
software.amazon.awssdk.services.s3.model.AccessDeniedException.class);
+              assertThat(root.getMessage()).matches("(?s).*Access 
Denied.*Status Code: 403.*");
+            });
 
     // Case 3: TABLE_WRITE_DATA should
     managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, 
tableReadDataGrant);
diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts
index 11d1b7a48..1218cb183 100644
--- a/runtime/service/build.gradle.kts
+++ b/runtime/service/build.gradle.kts
@@ -125,6 +125,7 @@ dependencies {
   testImplementation(project(":polaris-relational-jdbc"))
 
   testImplementation(project(":polaris-minio-testcontainer"))
+  testImplementation(project(":polaris-rustfs-testcontainer"))
 
   
testImplementation("org.apache.iceberg:iceberg-api:${libs.versions.iceberg.get()}:tests")
   
testImplementation("org.apache.iceberg:iceberg-core:${libs.versions.iceberg.get()}:tests")
diff --git 
a/runtime/service/src/intTest/java/org/apache/polaris/service/it/PolarisRestCatalogRustFSIT.java
 
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/PolarisRestCatalogRustFSIT.java
new file mode 100644
index 000000000..8a93c23d6
--- /dev/null
+++ 
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/PolarisRestCatalogRustFSIT.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.it;
+
+import com.google.common.collect.ImmutableMap;
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import io.quarkus.test.junit.QuarkusTestProfile;
+import io.quarkus.test.junit.TestProfile;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.core.storage.StorageAccessProperty;
+import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension;
+import org.apache.polaris.service.it.test.PolarisRestCatalogIntegrationBase;
+import org.apache.polaris.test.rustfs.Rustfs;
+import org.apache.polaris.test.rustfs.RustfsAccess;
+import org.apache.polaris.test.rustfs.RustfsExtension;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@QuarkusIntegrationTest
+@TestProfile(PolarisRestCatalogRustFSIT.Profile.class)
+@ExtendWith(RustfsExtension.class)
+@ExtendWith(PolarisIntegrationTestExtension.class)
+public class PolarisRestCatalogRustFSIT extends 
PolarisRestCatalogIntegrationBase {
+
+  protected static final String BUCKET_URI_PREFIX = "/rustfs-test-polaris";
+  protected static final String RUSTFS_ACCESS_KEY = "test-ak-123-polaris";
+  protected static final String RUSTFS_SECRET_KEY = "test-sk-123-polaris";
+
+  public static class Profile implements QuarkusTestProfile {
+
+    @Override
+    public Map<String, String> getConfigOverrides() {
+      return ImmutableMap.<String, String>builder()
+          .put("polaris.storage.aws.access-key", RUSTFS_ACCESS_KEY)
+          .put("polaris.storage.aws.secret-key", RUSTFS_SECRET_KEY)
+          .put("polaris.features.\"SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION\"", 
"false")
+          .build();
+    }
+  }
+
+  private static URI storageBase;
+  private static String endpoint;
+
+  @BeforeAll
+  static void setup(
+      @Rustfs(accessKey = RUSTFS_ACCESS_KEY, secretKey = RUSTFS_SECRET_KEY)
+          RustfsAccess rustfsAccess) {
+    storageBase = rustfsAccess.s3BucketUri(BUCKET_URI_PREFIX);
+    endpoint = rustfsAccess.s3endpoint();
+  }
+
+  @Override
+  protected ImmutableMap.Builder<String, String> clientFileIOProperties() {
+    return super.clientFileIOProperties()
+        .put(StorageAccessProperty.AWS_ENDPOINT.getPropertyName(), endpoint)
+        .put(StorageAccessProperty.AWS_PATH_STYLE_ACCESS.getPropertyName(), 
"true")
+        .put(StorageAccessProperty.AWS_KEY_ID.getPropertyName(), 
RUSTFS_ACCESS_KEY)
+        .put(StorageAccessProperty.AWS_SECRET_KEY.getPropertyName(), 
RUSTFS_SECRET_KEY);
+  }
+
+  @Override
+  protected StorageConfigInfo getStorageConfigInfo() {
+    AwsStorageConfigInfo.Builder storageConfig =
+        AwsStorageConfigInfo.builder()
+            .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
+            .setPathStyleAccess(true)
+            .setEndpoint(endpoint)
+            .setAllowedLocations(List.of(storageBase.toString()));
+
+    return storageConfig.build();
+  }
+}
diff --git 
a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogRustFSSpecialIT.java
 
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogRustFSSpecialIT.java
new file mode 100644
index 000000000..ba137cd6c
--- /dev/null
+++ 
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogRustFSSpecialIT.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.it;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.iceberg.CatalogProperties.TABLE_DEFAULT_PREFIX;
+import static 
org.apache.iceberg.aws.AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT;
+import static org.apache.iceberg.aws.s3.S3FileIOProperties.ACCESS_KEY_ID;
+import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
+import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.apache.polaris.core.storage.StorageAccessProperty.AWS_KEY_ID;
+import static 
org.apache.polaris.core.storage.StorageAccessProperty.AWS_SECRET_KEY;
+import static 
org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS;
+import static org.apache.polaris.service.it.env.PolarisClient.polarisClient;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.google.common.collect.ImmutableMap;
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import io.quarkus.test.junit.QuarkusTestProfile;
+import io.quarkus.test.junit.TestProfile;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.rest.auth.OAuth2Properties;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.types.Types;
+import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.CatalogProperties;
+import org.apache.polaris.core.admin.model.PolarisCatalog;
+import org.apache.polaris.core.admin.model.PrincipalWithCredentials;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.service.catalog.AccessDelegationMode;
+import org.apache.polaris.service.it.env.CatalogApi;
+import org.apache.polaris.service.it.env.ClientCredentials;
+import org.apache.polaris.service.it.env.ManagementApi;
+import org.apache.polaris.service.it.env.PolarisApiEndpoints;
+import org.apache.polaris.service.it.env.PolarisClient;
+import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension;
+import org.apache.polaris.test.rustfs.Rustfs;
+import org.apache.polaris.test.rustfs.RustfsAccess;
+import org.apache.polaris.test.rustfs.RustfsExtension;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+
+/**
+ * These tests complement {@link PolarisRestCatalogRustFSIT} to validate 
client-side access to
+ * RustFS storage via {@code FileIO} instances configured from catalog's 
{@code loadTable} responses
+ * with some S3-specific options.
+ */
+@QuarkusIntegrationTest
+@TestProfile(RestCatalogRustFSSpecialIT.Profile.class)
+@ExtendWith(RustfsExtension.class)
+@ExtendWith(PolarisIntegrationTestExtension.class)
+public class RestCatalogRustFSSpecialIT {
+
+  private static final String BUCKET_URI_PREFIX = "/rustfs-test";
+  private static final String RUSTFS_ACCESS_KEY = "test-ak-123";
+  private static final String RUSTFS_SECRET_KEY = "test-sk-123";
+  private static String adminToken;
+
+  public static class Profile implements QuarkusTestProfile {
+
+    @Override
+    public Map<String, String> getConfigOverrides() {
+      return ImmutableMap.<String, String>builder()
+          .put("polaris.storage.aws.access-key", RUSTFS_ACCESS_KEY)
+          .put("polaris.storage.aws.secret-key", RUSTFS_SECRET_KEY)
+          .put("polaris.features.\"SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION\"", 
"false")
+          .build();
+    }
+  }
+
+  private static final Schema SCHEMA =
+      new Schema(
+          required(1, "id", Types.IntegerType.get(), "doc"),
+          optional(2, "data", Types.StringType.get()));
+
+  private static PolarisApiEndpoints endpoints;
+  private static PolarisClient client;
+  private static ManagementApi managementApi;
+  private static URI storageBase;
+  private static String endpoint;
+  private static S3Client s3Client;
+
+  private CatalogApi catalogApi;
+  private String principalRoleName;
+  private PrincipalWithCredentials principalCredentials;
+  private String catalogName;
+
+  @BeforeAll
+  static void setup(
+      PolarisApiEndpoints apiEndpoints,
+      @Rustfs(accessKey = RUSTFS_ACCESS_KEY, secretKey = RUSTFS_SECRET_KEY)
+          RustfsAccess rustfsAccess,
+      ClientCredentials credentials) {
+    s3Client = rustfsAccess.s3Client();
+    endpoints = apiEndpoints;
+    client = polarisClient(endpoints);
+    adminToken = client.obtainToken(credentials);
+    managementApi = client.managementApi(adminToken);
+    storageBase = rustfsAccess.s3BucketUri(BUCKET_URI_PREFIX);
+    endpoint = rustfsAccess.s3endpoint();
+  }
+
+  @AfterAll
+  static void close() throws Exception {
+    client.close();
+  }
+
+  @BeforeEach
+  public void before(TestInfo testInfo) {
+    String principalName = client.newEntityName("test-user");
+    principalRoleName = client.newEntityName("test-admin");
+    principalCredentials = 
managementApi.createPrincipalWithRole(principalName, principalRoleName);
+
+    String principalToken = client.obtainToken(principalCredentials);
+    catalogApi = client.catalogApi(principalToken);
+
+    catalogName = 
client.newEntityName(testInfo.getTestMethod().orElseThrow().getName());
+  }
+
+  private RESTCatalog createCatalog(
+      Optional<String> endpoint,
+      Optional<String> stsEndpoint,
+      boolean pathStyleAccess,
+      Optional<AccessDelegationMode> delegationMode,
+      boolean stsEnabled) {
+    return createCatalog(
+        endpoint, stsEndpoint, pathStyleAccess, Optional.empty(), 
delegationMode, stsEnabled);
+  }
+
+  private RESTCatalog createCatalog(
+      Optional<String> endpoint,
+      Optional<String> stsEndpoint,
+      boolean pathStyleAccess,
+      Optional<String> endpointInternal,
+      Optional<AccessDelegationMode> delegationMode,
+      boolean stsEnabled) {
+    AwsStorageConfigInfo.Builder storageConfig =
+        AwsStorageConfigInfo.builder()
+            .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
+            .setPathStyleAccess(pathStyleAccess)
+            .setStsUnavailable(!stsEnabled)
+            .setAllowedLocations(List.of(storageBase.toString()));
+
+    endpoint.ifPresent(storageConfig::setEndpoint);
+    stsEndpoint.ifPresent(storageConfig::setStsEndpoint);
+    endpointInternal.ifPresent(storageConfig::setEndpointInternal);
+
+    CatalogProperties.Builder catalogProps =
+        CatalogProperties.builder(storageBase.toASCIIString() + "/" + 
catalogName);
+    if (!stsEnabled) {
+      catalogProps.addProperty(
+          TABLE_DEFAULT_PREFIX + AWS_KEY_ID.getPropertyName(), 
RUSTFS_ACCESS_KEY);
+      catalogProps.addProperty(
+          TABLE_DEFAULT_PREFIX + AWS_SECRET_KEY.getPropertyName(), 
RUSTFS_SECRET_KEY);
+    }
+    Catalog catalog =
+        PolarisCatalog.builder()
+            .setType(Catalog.TypeEnum.INTERNAL)
+            .setName(catalogName)
+            .setStorageConfigInfo(storageConfig.build())
+            .setProperties(catalogProps.build())
+            .build();
+
+    managementApi.createCatalog(principalRoleName, catalog);
+
+    String authToken = client.obtainToken(principalCredentials);
+    RESTCatalog restCatalog = new RESTCatalog();
+
+    ImmutableMap.Builder<String, String> propertiesBuilder =
+        ImmutableMap.<String, String>builder()
+            .put(
+                org.apache.iceberg.CatalogProperties.URI, 
endpoints.catalogApiEndpoint().toString())
+            .put(OAuth2Properties.TOKEN, authToken)
+            .put("warehouse", catalogName)
+            .putAll(endpoints.extraHeaders("header."));
+
+    delegationMode.ifPresent(
+        dm -> propertiesBuilder.put("header.X-Iceberg-Access-Delegation", 
dm.protocolValue()));
+
+    if (delegationMode.isEmpty()) {
+      // Use local credentials on the client side
+      propertiesBuilder.put("s3.access-key-id", RUSTFS_ACCESS_KEY);
+      propertiesBuilder.put("s3.secret-access-key", RUSTFS_SECRET_KEY);
+    }
+
+    restCatalog.initialize("polaris", propertiesBuilder.buildKeepingLast());
+    return restCatalog;
+  }
+
+  @AfterEach
+  public void cleanUp() {
+    client.cleanUp(adminToken);
+  }
+
+  @ParameterizedTest
+  @CsvSource("true,  true,")
+  @CsvSource("false, true,")
+  @CsvSource("true,  false,")
+  @CsvSource("false, false,")
+  public void testCreateTable(boolean pathStyle, boolean stsEnabled) throws 
IOException {
+    LoadTableResponse response = doTestCreateTable(pathStyle, 
Optional.empty(), stsEnabled);
+    assertThat(response.config()).doesNotContainKey(SECRET_ACCESS_KEY);
+    assertThat(response.config()).doesNotContainKey(ACCESS_KEY_ID);
+    
assertThat(response.config()).doesNotContainKey(REFRESH_CREDENTIALS_ENDPOINT);
+    assertThat(response.credentials()).isEmpty();
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testCreateTableVendedCredentials(boolean pathStyle) throws 
IOException {
+    LoadTableResponse response =
+        doTestCreateTable(pathStyle, Optional.of(VENDED_CREDENTIALS), true);
+    assertThat(response.config())
+        .containsEntry(
+            REFRESH_CREDENTIALS_ENDPOINT,
+            "v1/" + catalogName + "/namespaces/test-ns/tables/t1/credentials");
+    assertThat(response.credentials()).hasSize(1);
+  }
+
+  private LoadTableResponse doTestCreateTable(
+      boolean pathStyle, Optional<AccessDelegationMode> dm, boolean 
stsEnabled) throws IOException {
+    try (RESTCatalog restCatalog =
+        createCatalog(Optional.of(endpoint), Optional.empty(), pathStyle, dm, 
stsEnabled)) {
+      LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog, dm);
+      if (pathStyle) {
+        assertThat(loadTableResponse.config())
+            .containsEntry("s3.path-style-access", Boolean.TRUE.toString());
+      }
+      return loadTableResponse;
+    }
+  }
+
+  @Test
+  public void testInternalEndpoints() throws IOException {
+    try (RESTCatalog restCatalog =
+        createCatalog(
+            Optional.of("http://s3.example.com";),
+            Optional.of(endpoint),
+            false,
+            Optional.of(endpoint),
+            Optional.empty(),
+            true)) {
+      StorageConfigInfo storageConfig =
+          managementApi.getCatalog(catalogName).getStorageConfigInfo();
+      assertThat((AwsStorageConfigInfo) storageConfig)
+          .extracting(
+              AwsStorageConfigInfo::getEndpoint,
+              AwsStorageConfigInfo::getStsEndpoint,
+              AwsStorageConfigInfo::getEndpointInternal,
+              AwsStorageConfigInfo::getPathStyleAccess)
+          .containsExactly("http://s3.example.com";, endpoint, endpoint, false);
+      LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog, 
Optional.empty());
+      assertThat(loadTableResponse.config()).containsEntry(ENDPOINT, 
"http://s3.example.com";);
+    }
+  }
+
+  @Test
+  public void testCreateTableFailureWithCredentialVendingWithoutSts() throws 
IOException {
+    try (RESTCatalog restCatalog =
+        createCatalog(
+            Optional.of(endpoint),
+            Optional.of("http://sts.example.com";), // not called
+            false,
+            Optional.of(VENDED_CREDENTIALS),
+            false)) {
+      StorageConfigInfo storageConfig =
+          managementApi.getCatalog(catalogName).getStorageConfigInfo();
+      assertThat((AwsStorageConfigInfo) storageConfig)
+          .extracting(
+              AwsStorageConfigInfo::getEndpoint,
+              AwsStorageConfigInfo::getStsEndpoint,
+              AwsStorageConfigInfo::getEndpointInternal,
+              AwsStorageConfigInfo::getPathStyleAccess,
+              AwsStorageConfigInfo::getStsUnavailable)
+          .containsExactly(endpoint, "http://sts.example.com";, null, false, 
true);
+
+      catalogApi.createNamespace(catalogName, "test-ns");
+      TableIdentifier id = TableIdentifier.of("test-ns", "t2");
+      // Credential vending is not supported without STS
+      assertThatThrownBy(() -> restCatalog.createTable(id, SCHEMA))
+          .hasMessageContaining("but no credentials are available")
+          .hasMessageContaining(id.toString());
+    }
+  }
+
+  @Test
+  public void testLoadTableFailureWithCredentialVendingWithoutSts() throws 
IOException {
+    try (RESTCatalog restCatalog =
+        createCatalog(
+            Optional.of(endpoint),
+            Optional.of("http://sts.example.com";), // not called
+            false,
+            Optional.empty(),
+            false)) {
+
+      catalogApi.createNamespace(catalogName, "test-ns");
+      TableIdentifier id = TableIdentifier.of("test-ns", "t3");
+      restCatalog.createTable(id, SCHEMA);
+
+      // Credential vending is not supported without STS
+      assertThatThrownBy(
+              () ->
+                  catalogApi.loadTable(
+                      catalogName,
+                      id,
+                      "ALL",
+                      Map.of("X-Iceberg-Access-Delegation", 
VENDED_CREDENTIALS.protocolValue())))
+          .hasMessageContaining("but no credentials are available")
+          .hasMessageContaining(id.toString());
+    }
+  }
+
+  public LoadTableResponse doTestCreateTable(
+      RESTCatalog restCatalog, Optional<AccessDelegationMode> dm) {
+    catalogApi.createNamespace(catalogName, "test-ns");
+    TableIdentifier id = TableIdentifier.of("test-ns", "t1");
+    Table table = restCatalog.createTable(id, SCHEMA);
+    assertThat(table).isNotNull();
+    assertThat(restCatalog.tableExists(id)).isTrue();
+
+    TableOperations ops = ((HasTableOperations) table).operations();
+    URI location = URI.create(ops.current().metadataFileLocation());
+
+    GetObjectResponse response =
+        s3Client
+            .getObject(
+                GetObjectRequest.builder()
+                    .bucket(location.getAuthority())
+                    .key(location.getPath().substring(1)) // drop leading slash
+                    .build())
+            .response();
+    assertThat(response.contentLength()).isGreaterThan(0);
+
+    LoadTableResponse loadTableResponse =
+        catalogApi.loadTable(
+            catalogName,
+            id,
+            "ALL",
+            dm.map(v -> Map.of("X-Iceberg-Access-Delegation", 
v.protocolValue())).orElse(Map.of()));
+
+    assertThat(loadTableResponse.config()).containsKey(ENDPOINT);
+
+    restCatalog.dropTable(id);
+    assertThat(restCatalog.tableExists(id)).isFalse();
+    return loadTableResponse;
+  }
+
+  @ParameterizedTest
+  @CsvSource("true,  true,")
+  @CsvSource("false, true,")
+  @CsvSource("true,  false,")
+  @CsvSource("false, false,")
+  @CsvSource("true,  true,  VENDED_CREDENTIALS")
+  @CsvSource("false, true,  VENDED_CREDENTIALS")
+  public void testAppendFiles(
+      boolean pathStyle, boolean stsEnabled, AccessDelegationMode 
delegationMode)
+      throws IOException {
+    try (RESTCatalog restCatalog =
+        createCatalog(
+            Optional.of(endpoint),
+            Optional.of(endpoint),
+            pathStyle,
+            Optional.ofNullable(delegationMode),
+            stsEnabled)) {
+      catalogApi.createNamespace(catalogName, "test-ns");
+      TableIdentifier id = TableIdentifier.of("test-ns", "t1");
+      Table table = restCatalog.createTable(id, SCHEMA);
+      assertThat(table).isNotNull();
+
+      @SuppressWarnings("resource")
+      FileIO io = table.io();
+
+      URI loc =
+          URI.create(
+              table
+                  .locationProvider()
+                  .newDataLocation(
+                      String.format(
+                          "test-file-%s-%s-%s.txt", pathStyle, delegationMode, 
stsEnabled)));
+      OutputFile f1 = io.newOutputFile(loc.toString());
+      try (PositionOutputStream os = f1.create()) {
+        os.write("Hello World".getBytes(UTF_8));
+      }
+
+      DataFile df =
+          DataFiles.builder(PartitionSpec.unpartitioned())
+              .withPath(f1.location())
+              .withFormat(FileFormat.PARQUET) // bogus value
+              .withFileSizeInBytes(4)
+              .withRecordCount(1)
+              .build();
+
+      table.newAppend().appendFile(df).commit();
+
+      try (InputStream is =
+          s3Client.getObject(
+              GetObjectRequest.builder()
+                  .bucket(loc.getAuthority())
+                  .key(loc.getPath().substring(1)) // drop leading slash
+                  .build())) {
+        assertThat(new String(is.readAllBytes(), UTF_8)).isEqualTo("Hello 
World");
+      }
+    }
+  }
+}
diff --git 
a/runtime/service/src/intTest/java/org/apache/polaris/service/it/nosql/NoSqlCatalogIT.java
 
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/nosql/NoSqlCatalogIT.java
index ba99caab5..f4cd8ec8d 100644
--- 
a/runtime/service/src/intTest/java/org/apache/polaris/service/it/nosql/NoSqlCatalogIT.java
+++ 
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/nosql/NoSqlCatalogIT.java
@@ -22,18 +22,18 @@ import com.google.common.collect.ImmutableMap;
 import io.quarkus.test.junit.QuarkusIntegrationTest;
 import io.quarkus.test.junit.TestProfile;
 import java.util.Map;
-import org.apache.polaris.service.it.PolarisRestCatalogMinIOIT;
+import org.apache.polaris.service.it.PolarisRestCatalogRustFSIT;
 
 @QuarkusIntegrationTest
 @TestProfile(value = NoSqlCatalogIT.Profile.class)
-public class NoSqlCatalogIT extends PolarisRestCatalogMinIOIT {
+public class NoSqlCatalogIT extends PolarisRestCatalogRustFSIT {
   public static class Profile extends NoSqlTesting.PersistenceInMemoryProfile {
     @Override
     public Map<String, String> getConfigOverrides() {
       return ImmutableMap.<String, String>builder()
           .putAll(super.getConfigOverrides())
-          .put("polaris.storage.aws.access-key", MINIO_ACCESS_KEY)
-          .put("polaris.storage.aws.secret-key", MINIO_SECRET_KEY)
+          .put("polaris.storage.aws.access-key", RUSTFS_ACCESS_KEY)
+          .put("polaris.storage.aws.secret-key", RUSTFS_SECRET_KEY)
           .put("polaris.features.\"SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION\"", 
"false")
           .build();
     }
diff --git 
a/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java
 
b/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java
index 60e01f9a0..a05211b30 100644
--- 
a/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java
+++ 
b/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java
@@ -41,8 +41,8 @@ public class CatalogFederationIT extends 
CatalogFederationIntegrationTest {
           .put("polaris.features.\"SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION\"", 
"false")
           
.put("polaris.features.\"ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING\"", "true")
           
.put("polaris.features.\"ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING\"", "true")
-          .put("polaris.storage.aws.access-key", 
CatalogFederationIntegrationTest.MINIO_ACCESS_KEY)
-          .put("polaris.storage.aws.secret-key", 
CatalogFederationIntegrationTest.MINIO_SECRET_KEY)
+          .put("polaris.storage.aws.access-key", 
CatalogFederationIntegrationTest.RUSTFS_ACCESS_KEY)
+          .put("polaris.storage.aws.secret-key", 
CatalogFederationIntegrationTest.RUSTFS_SECRET_KEY)
           .build();
     }
   }
diff --git a/tools/rustfs-testcontainer/build.gradle.kts 
b/tools/rustfs-testcontainer/build.gradle.kts
new file mode 100644
index 000000000..5aa1eb9cd
--- /dev/null
+++ b/tools/rustfs-testcontainer/build.gradle.kts
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+  id("org.kordamp.gradle.jandex")
+  id("polaris-server")
+}
+
+dependencies {
+  api(platform(libs.testcontainers.bom))
+  api("org.testcontainers:testcontainers")
+
+  api(platform(libs.awssdk.bom))
+  api("software.amazon.awssdk:s3")
+  api("software.amazon.awssdk:kms")
+
+  implementation(project(":polaris-container-spec-helper"))
+  implementation("software.amazon.awssdk:url-connection-client")
+  implementation(libs.guava)
+
+  compileOnly(platform(libs.junit.bom))
+  compileOnly("org.junit.jupiter:junit-jupiter-api")
+}
diff --git 
a/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/Rustfs.java
 
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/Rustfs.java
new file mode 100644
index 000000000..a18f9791d
--- /dev/null
+++ 
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/Rustfs.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.test.rustfs;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+// CODE_COPIED_TO_POLARIS from Project Nessie 0.104.2
+@Target({ElementType.FIELD, ElementType.PARAMETER})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface Rustfs {
+  /** Optional, use this access key instead of a random one. */
+  String accessKey() default DEFAULT;
+
+  /** Optional, use this secret key instead of a random one. */
+  String secretKey() default DEFAULT;
+
+  /** Optional, use this bucket instead of a random one. */
+  String bucket() default DEFAULT;
+
+  /** Optional, use this region. */
+  String region() default DEFAULT;
+
+  String DEFAULT = "rustfs_default_value__";
+}
diff --git 
a/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsAccess.java
 
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsAccess.java
new file mode 100644
index 000000000..d49c71e25
--- /dev/null
+++ 
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsAccess.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.test.rustfs;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Optional;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.s3.S3Client;
+
+/**
+ * Provides access to Rustfs via a preconfigured S3 client and providing the 
by default randomized
+ * bucket and access/secret keys.
+ *
+ * <p>Annotate JUnit test instance or static fields or method parameters of 
this type with {@link
+ * Rustfs}.
+ */
+// CODE_COPIED_TO_POLARIS from Project Nessie 0.104.2
+public interface RustfsAccess {
+
+  /** Host and port, separated by '{@code :}'. */
+  String hostPort();
+
+  String accessKey();
+
+  String secretKey();
+
+  String bucket();
+
+  Optional<String> region();
+
+  /** HTTP protocol endpoint. */
+  String s3endpoint();
+
+  S3Client s3Client();
+
+  /** Properties needed by Apache Iceberg to access this instance. */
+  Map<String, String> icebergProperties();
+
+  /** Properties needed by Apache Hadoop to access this instance. */
+  Map<String, String> hadoopConfig();
+
+  /** S3 scheme URI including the bucket to access the given path. */
+  URI s3BucketUri(String path);
+
+  /** Convenience method to put an object into S3. */
+  @SuppressWarnings("resource")
+  default void s3put(String key, RequestBody body) {
+    s3Client().putObject(b -> b.bucket(bucket()).key(key), body);
+  }
+}
diff --git 
a/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsContainer.java
 
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsContainer.java
new file mode 100644
index 000000000..bac77279b
--- /dev/null
+++ 
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsContainer.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.test.rustfs;
+
+import com.google.common.base.Preconditions;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.polaris.containerspec.ContainerSpecHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.Base58;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+
+// CODE_COPIED_TO_POLARIS from Project Nessie 0.104.2
+public final class RustfsContainer extends GenericContainer<RustfsContainer>
+    implements RustfsAccess, AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RustfsContainer.class);
+
+  private static final int S3_API_PORT = 9000;
+  private static final int CONSOLE_PORT = 9001;
+
+  private static final String RUSTFS_ACCESS_KEY = "RUSTFS_ACCESS_KEY";
+  private static final String RUSTFS_SECRET_KEY = "RUSTFS_SECRET_KEY";
+  private static final String RUSTFS_DOMAIN = "RUSTFS_SERVER_DOMAINS";
+
+  private static final String HEALTH_ENDPOINT = "/health";
+  private static final String RUSTFS_DOMAIN_NAME;
+
+  /**
+   * Domain must start with "s3" in order to be recognized as an S3 endpoint 
by the AWS SDK with
+   * virtual-host-style addressing. The bucket name is expected to be the 
first part of the domain
+   * name, e.g. "bucket.s3.127-0-0-1.nip.io".
+   */
+  private static final String RUSTFS_DOMAIN_NIP = "s3.127-0-0-1.nip.io";
+
+  /**
+   * Whether random bucket names cannot be used. Randomized bucket names can 
only be used when
+   * either `*.localhost` (on Linux) or `*.s3.127-0-0-1.nip.io` (on macOS, if 
DNS rebind protection
+   * is not active) can be resolved. Otherwise we have to use a fixed bucket 
name and users have to
+   * configure that in `/etc/hosts`.
+   */
+  private static final String FIXED_BUCKET_NAME;
+
+  static boolean canRunOnMacOs() {
+    return RUSTFS_DOMAIN_NAME.equals(RUSTFS_DOMAIN_NIP);
+  }
+
+  static {
+    String name;
+    String fixedBucketName = null;
+    if 
(System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("linux")) {
+      name = "localhost";
+    } else {
+      try {
+        InetAddress ignored = InetAddress.getByName(RUSTFS_DOMAIN_NIP);
+        name = RUSTFS_DOMAIN_NIP;
+      } catch (UnknownHostException x) {
+        LOGGER.warn(
+            "Could not resolve '{}', falling back to 'localhost'. "
+                + "This usually happens when your router or DNS provider is 
unable to resolve the nip.io addresses.",
+            RUSTFS_DOMAIN_NIP);
+        name = "localhost";
+        fixedBucketName = "rustfsbucket";
+        validateBucketHost(fixedBucketName);
+      }
+    }
+    RUSTFS_DOMAIN_NAME = name;
+    FIXED_BUCKET_NAME = fixedBucketName;
+  }
+
+  /** Validates the bucket host name, on non-Linux, if necessary. */
+  private static String validateBucketHost(String bucketName) {
+    if (FIXED_BUCKET_NAME != null) {
+      String test = bucketName + ".localhost";
+      try {
+        InetAddress ignored = InetAddress.getByName(test);
+      } catch (UnknownHostException e) {
+        LOGGER.warn(
+            "Could not resolve '{}',\n  Please add the line \n    '127.0.0.1   
{}'\n  to your local '/etc/hosts' file.\n  Tests are expected to fail unless 
name resolution works.",
+            test,
+            test);
+      }
+    }
+    return bucketName;
+  }
+
+  private final String accessKey;
+  private final String secretKey;
+  private final String bucket;
+
+  private String hostPort;
+  private String s3endpoint;
+  private S3Client s3;
+  private Optional<String> region;
+
+  @SuppressWarnings("unused")
+  public RustfsContainer() {
+    this(null, null, null, null, null);
+  }
+
+  @SuppressWarnings("resource")
+  public RustfsContainer(
+      String image, String accessKey, String secretKey, String bucket, String 
region) {
+    super(
+        ContainerSpecHelper.containerSpecHelper("rustfs", 
RustfsContainer.class)
+            .dockerImageName(image));
+    withNetworkAliases(randomString("rustfs"));
+    withLogConsumer(new 
Slf4jLogConsumer(LoggerFactory.getLogger(RustfsContainer.class)));
+    // A fixed S3 API port is needed due to test-containers map a random port 
to host and
+    // RustFS is very restrict on server domains.
+    // Anything not using port 443 port must define the port as part of 
RUSTFS_SERVER_DOMAINS.
+    // More detail in https://github.com/rustfs/rustfs/issues/1593
+    addFixedExposedPort(S3_API_PORT, S3_API_PORT);
+    addExposedPort(CONSOLE_PORT);
+    this.accessKey = accessKey != null ? accessKey : randomString("access");
+    this.secretKey = secretKey != null ? secretKey : randomString("secret");
+    this.bucket =
+        bucket != null
+            ? validateBucketHost(bucket)
+            : (FIXED_BUCKET_NAME != null ? FIXED_BUCKET_NAME : 
randomString("bucket"));
+    this.region = Optional.ofNullable(region);
+    withEnv(RUSTFS_ACCESS_KEY, this.accessKey);
+    withEnv(RUSTFS_SECRET_KEY, this.secretKey);
+    // S3 SDK encodes bucket names in host names - need to tell Rustfs which 
domain to use
+    withEnv(RUSTFS_DOMAIN, RUSTFS_DOMAIN_NAME + ":" + S3_API_PORT);
+    setWaitStrategy(
+        new HttpWaitStrategy()
+            .forPort(CONSOLE_PORT)
+            .forPath(HEALTH_ENDPOINT)
+            .withStartupTimeout(Duration.ofMinutes(2)));
+  }
+
+  public RustfsContainer withRegion(String region) {
+    this.region = Optional.of(region);
+    return this;
+  }
+
+  private static String randomString(String prefix) {
+    return prefix + "-" + Base58.randomString(6).toLowerCase(Locale.ROOT);
+  }
+
+  @Override
+  public String hostPort() {
+    Preconditions.checkState(hostPort != null, "Container not yet started");
+    return hostPort;
+  }
+
+  @Override
+  public String accessKey() {
+    return accessKey;
+  }
+
+  @Override
+  public String secretKey() {
+    return secretKey;
+  }
+
+  @Override
+  public String bucket() {
+    return bucket;
+  }
+
+  @Override
+  public Optional<String> region() {
+    return region;
+  }
+
+  @Override
+  public String s3endpoint() {
+    Preconditions.checkState(s3endpoint != null, "Container not yet started");
+    return s3endpoint;
+  }
+
+  @Override
+  public S3Client s3Client() {
+    Preconditions.checkState(s3 != null, "Container not yet started");
+    return s3;
+  }
+
+  @Override
+  public Map<String, String> icebergProperties() {
+    Map<String, String> props = new HashMap<>();
+    props.put("s3.access-key-id", accessKey());
+    props.put("s3.secret-access-key", secretKey());
+    props.put("s3.endpoint", s3endpoint());
+    props.put("http-client.type", "urlconnection");
+    region().ifPresent(r -> props.put("client.region", r));
+    return props;
+  }
+
+  @Override
+  public Map<String, String> hadoopConfig() {
+    Map<String, String> r = new HashMap<>();
+    r.put("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+    r.put("fs.s3a.access.key", accessKey());
+    r.put("fs.s3a.secret.key", secretKey());
+    r.put("fs.s3a.endpoint", s3endpoint());
+    return r;
+  }
+
+  @Override
+  public URI s3BucketUri(String path) {
+    return s3BucketUri("s3", path);
+  }
+
+  public URI s3BucketUri(String scheme, String path) {
+    Preconditions.checkState(bucket != null, "Container not yet started");
+    return URI.create(String.format("%s://%s/", scheme, bucket)).resolve(path);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+
+    this.hostPort = RUSTFS_DOMAIN_NAME + ":" + getMappedPort(S3_API_PORT);
+    this.s3endpoint = String.format("http://%s/";, hostPort);
+
+    this.s3 = createS3Client();
+    
this.s3.createBucket(CreateBucketRequest.builder().bucket(bucket()).build());
+  }
+
+  @Override
+  public void close() {
+    stop();
+  }
+
+  @Override
+  public void stop() {
+    try {
+      if (s3 != null) {
+        s3.close();
+      }
+    } finally {
+      s3 = null;
+      super.stop();
+    }
+  }
+
+  private S3Client createS3Client() {
+    return S3Client.builder()
+        .httpClientBuilder(UrlConnectionHttpClient.builder())
+        .applyMutation(builder -> 
builder.endpointOverride(URI.create(s3endpoint())))
+        .applyMutation(builder -> region.ifPresent(r -> 
builder.region(Region.of(r))))
+        // .serviceConfiguration(s3Configuration(s3PathStyleAccess, 
s3UseArnRegionEnabled))
+        // credentialsProvider(s3AccessKeyId, s3SecretAccessKey, 
s3SessionToken)
+        .credentialsProvider(
+            
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey(), 
secretKey())))
+        .build();
+  }
+}
diff --git 
a/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsExtension.java
 
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsExtension.java
new file mode 100644
index 000000000..becd226f6
--- /dev/null
+++ 
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsExtension.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.test.rustfs;
+
+import static java.lang.String.format;
+import static 
org.junit.jupiter.api.extension.ConditionEvaluationResult.disabled;
+import static 
org.junit.jupiter.api.extension.ConditionEvaluationResult.enabled;
+import static 
org.junit.platform.commons.util.AnnotationUtils.findAnnotatedFields;
+import static org.junit.platform.commons.util.ReflectionUtils.makeAccessible;
+
+import java.lang.reflect.Field;
+import org.junit.jupiter.api.condition.OS;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ConditionEvaluationResult;
+import org.junit.jupiter.api.extension.ExecutionCondition;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.junit.platform.commons.util.AnnotationUtils;
+import org.junit.platform.commons.util.ExceptionUtils;
+import org.junit.platform.commons.util.ReflectionUtils;
+
+/**
+ * JUnit extension that provides a Rustfs container configured with a single 
bucket.
+ *
+ * <p>Provides instances of {@link RustfsAccess} via instance or static fields 
or parameters
+ * annotated with {@link Rustfs}.
+ */
+// CODE_COPIED_TO_POLARIS from Project Nessie 0.104.2
+public class RustfsExtension
+    implements BeforeAllCallback, BeforeEachCallback, ParameterResolver, 
ExecutionCondition {
+  private static final ExtensionContext.Namespace NAMESPACE =
+      ExtensionContext.Namespace.create(RustfsExtension.class);
+
+  @Override
+  public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext 
context) {
+    if (OS.current() == OS.LINUX) {
+      return enabled("Running on Linux");
+    }
+    if (OS.current() == OS.MAC
+        && System.getenv("CI_MAC") == null
+        && RustfsContainer.canRunOnMacOs()) {
+      // Disable tests on GitHub Actions
+      return enabled("Running on macOS locally");
+    }
+    return disabled(format("Disabled on %s", OS.current().name()));
+  }
+
+  @Override
+  public void beforeAll(ExtensionContext context) {
+    Class<?> testClass = context.getRequiredTestClass();
+
+    findAnnotatedFields(testClass, Rustfs.class, ReflectionUtils::isStatic)
+        .forEach(field -> injectField(context, field));
+  }
+
+  @Override
+  public void beforeEach(ExtensionContext context) {
+    Class<?> testClass = context.getRequiredTestClass();
+
+    findAnnotatedFields(testClass, Rustfs.class, ReflectionUtils::isNotStatic)
+        .forEach(field -> injectField(context, field));
+  }
+
+  private void injectField(ExtensionContext context, Field field) {
+    try {
+      Rustfs rustfs =
+          AnnotationUtils.findAnnotation(field, Rustfs.class)
+              .orElseThrow(IllegalStateException::new);
+
+      RustfsAccess container =
+          context
+              .getStore(NAMESPACE)
+              .getOrComputeIfAbsent(
+                  field.toString(), x -> createContainer(rustfs), 
RustfsAccess.class);
+
+      makeAccessible(field).set(context.getTestInstance().orElse(null), 
container);
+    } catch (Throwable t) {
+      ExceptionUtils.throwAsUncheckedException(t);
+    }
+  }
+
+  @Override
+  public boolean supportsParameter(
+      ParameterContext parameterContext, ExtensionContext extensionContext)
+      throws ParameterResolutionException {
+    if (parameterContext.findAnnotation(Rustfs.class).isEmpty()) {
+      return false;
+    }
+    return 
parameterContext.getParameter().getType().isAssignableFrom(RustfsAccess.class);
+  }
+
+  @Override
+  public Object resolveParameter(
+      ParameterContext parameterContext, ExtensionContext extensionContext)
+      throws ParameterResolutionException {
+    return extensionContext
+        .getStore(NAMESPACE)
+        .getOrComputeIfAbsent(
+            RustfsExtension.class.getName() + '#' + 
parameterContext.getParameter().getName(),
+            k -> {
+              Rustfs rustfs = 
parameterContext.findAnnotation(Rustfs.class).get();
+              return createContainer(rustfs);
+            },
+            RustfsAccess.class);
+  }
+
+  private RustfsAccess createContainer(Rustfs rustfs) {
+    String accessKey = nonDefault(rustfs.accessKey());
+    String secretKey = nonDefault(rustfs.secretKey());
+    String bucket = nonDefault(rustfs.bucket());
+    String region = nonDefault(rustfs.region());
+    RustfsContainer container =
+        new RustfsContainer(null, accessKey, secretKey, bucket, 
region).withStartupAttempts(5);
+    container.start();
+    return container;
+  }
+
+  private static String nonDefault(String s) {
+    return s.equals(Rustfs.DEFAULT) ? null : s;
+  }
+}
diff --git 
a/tools/rustfs-testcontainer/src/main/resources/org/apache/polaris/test/rustfs/Dockerfile-rustfs-version
 
b/tools/rustfs-testcontainer/src/main/resources/org/apache/polaris/test/rustfs/Dockerfile-rustfs-version
new file mode 100644
index 000000000..194d5402b
--- /dev/null
+++ 
b/tools/rustfs-testcontainer/src/main/resources/org/apache/polaris/test/rustfs/Dockerfile-rustfs-version
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Dockerfile to provide the image name and tag to a test.
+# Version is managed by Renovate - do not edit.
+FROM rustfs/rustfs:1.0.0-alpha.82
\ No newline at end of file

Reply via email to