This is an automated email from the ASF dual-hosted git repository.
yzheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 1d8dc0010 Add support for rustfs test-containers (#3679)
1d8dc0010 is described below
commit 1d8dc001026e0ce4498c6a3aa3b831d1109a02db
Author: Yong Zheng <[email protected]>
AuthorDate: Sat Feb 7 15:47:12 2026 -0600
Add support for rustfs test-containers (#3679)
---
LICENSE | 4 +
README.md | 1 +
bom/build.gradle.kts | 1 +
gradle/projects.main.properties | 1 +
integration-tests/build.gradle.kts | 1 +
.../it/test/CatalogFederationIntegrationTest.java | 37 +-
runtime/service/build.gradle.kts | 1 +
.../service/it/PolarisRestCatalogRustFSIT.java | 92 +++++
.../service/it/RestCatalogRustFSSpecialIT.java | 454 +++++++++++++++++++++
.../polaris/service/it/nosql/NoSqlCatalogIT.java | 8 +-
.../service/spark/it/CatalogFederationIT.java | 4 +-
tools/rustfs-testcontainer/build.gradle.kts | 39 ++
.../org/apache/polaris/test/rustfs/Rustfs.java | 46 +++
.../apache/polaris/test/rustfs/RustfsAccess.java | 68 +++
.../polaris/test/rustfs/RustfsContainer.java | 282 +++++++++++++
.../polaris/test/rustfs/RustfsExtension.java | 141 +++++++
.../polaris/test/rustfs/Dockerfile-rustfs-version | 22 +
17 files changed, 1182 insertions(+), 20 deletions(-)
diff --git a/LICENSE b/LICENSE
index 437a6849e..b84fe1727 100644
--- a/LICENSE
+++ b/LICENSE
@@ -328,6 +328,10 @@ This product includes code from Project Nessie.
*
tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioAccess.java
*
tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioContainer.java
*
tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioExtension.java
+*
tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/Rustfs.java
+*
tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsAccess.java
+*
tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsContainer.java
+*
tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsExtension.java
*
runtime/admin/src/main/java/org/apache/polaris/admintool/PolarisAdminTool.java
*
runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java
* helm/polaris/tests/logging_storage_test.yaml
diff --git a/README.md b/README.md
index 6079c943f..65929f09f 100644
--- a/README.md
+++ b/README.md
@@ -79,6 +79,7 @@ Apache Polaris is organized into the following modules:
- `polaris-container-spec-helper` - Helper for container specifications
- `polaris-immutables` - Predefined Immutables configuration &
annotations for Polaris
- `polaris-minio-testcontainer` - Minio test container
+ - `polaris-rustfs-testcontainer` - RustFS test container
- `polaris-misc-types` - Miscellaneous types for Polaris
- `polaris-version` - Versioning for Polaris
diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts
index d5e5afc75..8a1fa16b9 100644
--- a/bom/build.gradle.kts
+++ b/bom/build.gradle.kts
@@ -31,6 +31,7 @@ dependencies {
api(project(":polaris-container-spec-helper"))
api(project(":polaris-minio-testcontainer"))
+ api(project(":polaris-rustfs-testcontainer"))
api(project(":polaris-immutables"))
api(project(":polaris-misc-types"))
api(project(":polaris-version"))
diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties
index fadbbadea..5cb39799f 100644
--- a/gradle/projects.main.properties
+++ b/gradle/projects.main.properties
@@ -38,6 +38,7 @@ aggregated-license-report=aggregated-license-report
polaris-immutables=tools/immutables
polaris-container-spec-helper=tools/container-spec-helper
polaris-minio-testcontainer=tools/minio-testcontainer
+polaris-rustfs-testcontainer=tools/rustfs-testcontainer
polaris-version=tools/version
polaris-misc-types=tools/misc-types
polaris-extensions-federation-hadoop=extensions/federation/hadoop
diff --git a/integration-tests/build.gradle.kts
b/integration-tests/build.gradle.kts
index 0ac894cab..f91f327c9 100644
--- a/integration-tests/build.gradle.kts
+++ b/integration-tests/build.gradle.kts
@@ -68,6 +68,7 @@ dependencies {
implementation(libs.s3mock.testcontainers)
implementation(project(":polaris-runtime-test-common"))
implementation(project(":polaris-minio-testcontainer"))
+ implementation(project(":polaris-rustfs-testcontainer"))
}
copiedCodeChecks {
diff --git
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
index e60fe3fad..3e4738f53 100644
---
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
+++
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java
@@ -25,6 +25,7 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
import java.net.URI;
import java.util.List;
import java.util.UUID;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.polaris.core.admin.model.AuthenticationParameters;
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
@@ -52,9 +53,9 @@ import org.apache.polaris.service.it.env.PolarisApiEndpoints;
import org.apache.polaris.service.it.env.PolarisClient;
import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension;
import org.apache.polaris.service.it.ext.SparkSessionBuilder;
-import org.apache.polaris.test.minio.Minio;
-import org.apache.polaris.test.minio.MinioAccess;
-import org.apache.polaris.test.minio.MinioExtension;
+import org.apache.polaris.test.rustfs.Rustfs;
+import org.apache.polaris.test.rustfs.RustfsAccess;
+import org.apache.polaris.test.rustfs.RustfsExtension;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
@@ -69,13 +70,13 @@ import org.junit.jupiter.api.io.TempDir;
* Integration test for catalog federation functionality. This test verifies
that an external
* catalog can be created that federates with an internal catalog.
*/
-@ExtendWith(MinioExtension.class)
+@ExtendWith(RustfsExtension.class)
@ExtendWith(PolarisIntegrationTestExtension.class)
public class CatalogFederationIntegrationTest {
- public static final String BUCKET_URI_PREFIX =
"/minio-test-catalog-federation";
- public static final String MINIO_ACCESS_KEY =
"test-ak-123-catalog-federation";
- public static final String MINIO_SECRET_KEY =
"test-sk-123-catalog-federation";
+ public static final String BUCKET_URI_PREFIX =
"/rustfs-test-catalog-federation";
+ public static final String RUSTFS_ACCESS_KEY =
"test-ak-123-catalog-federation";
+ public static final String RUSTFS_SECRET_KEY =
"test-sk-123-catalog-federation";
private static PolarisClient client;
private static CatalogApi catalogApi;
@@ -108,21 +109,22 @@ public class CatalogFederationIntegrationTest {
static void setup(
PolarisApiEndpoints apiEndpoints,
ClientCredentials credentials,
- @Minio(accessKey = MINIO_ACCESS_KEY, secretKey = MINIO_SECRET_KEY)
MinioAccess minioAccess) {
+ @Rustfs(accessKey = RUSTFS_ACCESS_KEY, secretKey = RUSTFS_SECRET_KEY)
+ RustfsAccess rustfsAccess) {
endpoints = apiEndpoints;
client = polarisClient(endpoints);
String adminToken = client.obtainToken(credentials);
managementApi = client.managementApi(adminToken);
catalogApi = client.catalogApi(adminToken);
- endpoint = minioAccess.s3endpoint();
+ endpoint = rustfsAccess.s3endpoint();
- localStorageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX +
"/local_catalog");
- remoteStorageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX +
"/federated_catalog");
+ localStorageBase = rustfsAccess.s3BucketUri(BUCKET_URI_PREFIX +
"/local_catalog");
+ remoteStorageBase = rustfsAccess.s3BucketUri(BUCKET_URI_PREFIX +
"/federated_catalog");
// Allow credential vending for tables located under ns1
remoteStorageExtraAllowedLocationNs1 =
- minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns1");
+ rustfsAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns1");
remoteStorageExtraAllowedLocationNs2 =
- minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns2");
+ rustfsAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns2");
}
@AfterAll
@@ -422,7 +424,14 @@ public class CatalogFederationIntegrationTest {
// Verify that write is blocked since the vended credential should only
have read permission
assertThatThrownBy(() -> spark.sql("INSERT INTO ns1.test_table VALUES (3,
'Charlie')"))
- .hasMessageContaining("Access Denied. (Service: S3, Status Code:
403,");
+ .satisfies(
+ ex -> {
+ Throwable root = ExceptionUtils.getRootCause(ex);
+ assertThat(root)
+ .isInstanceOf(
+
software.amazon.awssdk.services.s3.model.AccessDeniedException.class);
+ assertThat(root.getMessage()).matches("(?s).*Access
Denied.*Status Code: 403.*");
+ });
// Case 3: TABLE_WRITE_DATA should
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName,
tableReadDataGrant);
diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts
index 11d1b7a48..1218cb183 100644
--- a/runtime/service/build.gradle.kts
+++ b/runtime/service/build.gradle.kts
@@ -125,6 +125,7 @@ dependencies {
testImplementation(project(":polaris-relational-jdbc"))
testImplementation(project(":polaris-minio-testcontainer"))
+ testImplementation(project(":polaris-rustfs-testcontainer"))
testImplementation("org.apache.iceberg:iceberg-api:${libs.versions.iceberg.get()}:tests")
testImplementation("org.apache.iceberg:iceberg-core:${libs.versions.iceberg.get()}:tests")
diff --git
a/runtime/service/src/intTest/java/org/apache/polaris/service/it/PolarisRestCatalogRustFSIT.java
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/PolarisRestCatalogRustFSIT.java
new file mode 100644
index 000000000..8a93c23d6
--- /dev/null
+++
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/PolarisRestCatalogRustFSIT.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.it;
+
+import com.google.common.collect.ImmutableMap;
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import io.quarkus.test.junit.QuarkusTestProfile;
+import io.quarkus.test.junit.TestProfile;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.core.storage.StorageAccessProperty;
+import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension;
+import org.apache.polaris.service.it.test.PolarisRestCatalogIntegrationBase;
+import org.apache.polaris.test.rustfs.Rustfs;
+import org.apache.polaris.test.rustfs.RustfsAccess;
+import org.apache.polaris.test.rustfs.RustfsExtension;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@QuarkusIntegrationTest
+@TestProfile(PolarisRestCatalogRustFSIT.Profile.class)
+@ExtendWith(RustfsExtension.class)
+@ExtendWith(PolarisIntegrationTestExtension.class)
+public class PolarisRestCatalogRustFSIT extends
PolarisRestCatalogIntegrationBase {
+
+ protected static final String BUCKET_URI_PREFIX = "/rustfs-test-polaris";
+ protected static final String RUSTFS_ACCESS_KEY = "test-ak-123-polaris";
+ protected static final String RUSTFS_SECRET_KEY = "test-sk-123-polaris";
+
+ public static class Profile implements QuarkusTestProfile {
+
+ @Override
+ public Map<String, String> getConfigOverrides() {
+ return ImmutableMap.<String, String>builder()
+ .put("polaris.storage.aws.access-key", RUSTFS_ACCESS_KEY)
+ .put("polaris.storage.aws.secret-key", RUSTFS_SECRET_KEY)
+ .put("polaris.features.\"SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION\"",
"false")
+ .build();
+ }
+ }
+
+ private static URI storageBase;
+ private static String endpoint;
+
+ @BeforeAll
+ static void setup(
+ @Rustfs(accessKey = RUSTFS_ACCESS_KEY, secretKey = RUSTFS_SECRET_KEY)
+ RustfsAccess rustfsAccess) {
+ storageBase = rustfsAccess.s3BucketUri(BUCKET_URI_PREFIX);
+ endpoint = rustfsAccess.s3endpoint();
+ }
+
+ @Override
+ protected ImmutableMap.Builder<String, String> clientFileIOProperties() {
+ return super.clientFileIOProperties()
+ .put(StorageAccessProperty.AWS_ENDPOINT.getPropertyName(), endpoint)
+ .put(StorageAccessProperty.AWS_PATH_STYLE_ACCESS.getPropertyName(),
"true")
+ .put(StorageAccessProperty.AWS_KEY_ID.getPropertyName(),
RUSTFS_ACCESS_KEY)
+ .put(StorageAccessProperty.AWS_SECRET_KEY.getPropertyName(),
RUSTFS_SECRET_KEY);
+ }
+
+ @Override
+ protected StorageConfigInfo getStorageConfigInfo() {
+ AwsStorageConfigInfo.Builder storageConfig =
+ AwsStorageConfigInfo.builder()
+ .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
+ .setPathStyleAccess(true)
+ .setEndpoint(endpoint)
+ .setAllowedLocations(List.of(storageBase.toString()));
+
+ return storageConfig.build();
+ }
+}
diff --git
a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogRustFSSpecialIT.java
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogRustFSSpecialIT.java
new file mode 100644
index 000000000..ba137cd6c
--- /dev/null
+++
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogRustFSSpecialIT.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.it;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.iceberg.CatalogProperties.TABLE_DEFAULT_PREFIX;
+import static
org.apache.iceberg.aws.AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT;
+import static org.apache.iceberg.aws.s3.S3FileIOProperties.ACCESS_KEY_ID;
+import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
+import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.apache.polaris.core.storage.StorageAccessProperty.AWS_KEY_ID;
+import static
org.apache.polaris.core.storage.StorageAccessProperty.AWS_SECRET_KEY;
+import static
org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS;
+import static org.apache.polaris.service.it.env.PolarisClient.polarisClient;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.google.common.collect.ImmutableMap;
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import io.quarkus.test.junit.QuarkusTestProfile;
+import io.quarkus.test.junit.TestProfile;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.rest.auth.OAuth2Properties;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.types.Types;
+import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.CatalogProperties;
+import org.apache.polaris.core.admin.model.PolarisCatalog;
+import org.apache.polaris.core.admin.model.PrincipalWithCredentials;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.service.catalog.AccessDelegationMode;
+import org.apache.polaris.service.it.env.CatalogApi;
+import org.apache.polaris.service.it.env.ClientCredentials;
+import org.apache.polaris.service.it.env.ManagementApi;
+import org.apache.polaris.service.it.env.PolarisApiEndpoints;
+import org.apache.polaris.service.it.env.PolarisClient;
+import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension;
+import org.apache.polaris.test.rustfs.Rustfs;
+import org.apache.polaris.test.rustfs.RustfsAccess;
+import org.apache.polaris.test.rustfs.RustfsExtension;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+
+/**
+ * These tests complement {@link PolarisRestCatalogRustFSIT} to validate
client-side access to
+ * RustFS storage via {@code FileIO} instances configured from catalog's
{@code loadTable} responses
+ * with some S3-specific options.
+ */
+@QuarkusIntegrationTest
+@TestProfile(RestCatalogRustFSSpecialIT.Profile.class)
+@ExtendWith(RustfsExtension.class)
+@ExtendWith(PolarisIntegrationTestExtension.class)
+public class RestCatalogRustFSSpecialIT {
+
+ private static final String BUCKET_URI_PREFIX = "/rustfs-test";
+ private static final String RUSTFS_ACCESS_KEY = "test-ak-123";
+ private static final String RUSTFS_SECRET_KEY = "test-sk-123";
+ private static String adminToken;
+
+ public static class Profile implements QuarkusTestProfile {
+
+ @Override
+ public Map<String, String> getConfigOverrides() {
+ return ImmutableMap.<String, String>builder()
+ .put("polaris.storage.aws.access-key", RUSTFS_ACCESS_KEY)
+ .put("polaris.storage.aws.secret-key", RUSTFS_SECRET_KEY)
+ .put("polaris.features.\"SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION\"",
"false")
+ .build();
+ }
+ }
+
+ private static final Schema SCHEMA =
+ new Schema(
+ required(1, "id", Types.IntegerType.get(), "doc"),
+ optional(2, "data", Types.StringType.get()));
+
+ private static PolarisApiEndpoints endpoints;
+ private static PolarisClient client;
+ private static ManagementApi managementApi;
+ private static URI storageBase;
+ private static String endpoint;
+ private static S3Client s3Client;
+
+ private CatalogApi catalogApi;
+ private String principalRoleName;
+ private PrincipalWithCredentials principalCredentials;
+ private String catalogName;
+
+ @BeforeAll
+ static void setup(
+ PolarisApiEndpoints apiEndpoints,
+ @Rustfs(accessKey = RUSTFS_ACCESS_KEY, secretKey = RUSTFS_SECRET_KEY)
+ RustfsAccess rustfsAccess,
+ ClientCredentials credentials) {
+ s3Client = rustfsAccess.s3Client();
+ endpoints = apiEndpoints;
+ client = polarisClient(endpoints);
+ adminToken = client.obtainToken(credentials);
+ managementApi = client.managementApi(adminToken);
+ storageBase = rustfsAccess.s3BucketUri(BUCKET_URI_PREFIX);
+ endpoint = rustfsAccess.s3endpoint();
+ }
+
+ @AfterAll
+ static void close() throws Exception {
+ client.close();
+ }
+
+ @BeforeEach
+ public void before(TestInfo testInfo) {
+ String principalName = client.newEntityName("test-user");
+ principalRoleName = client.newEntityName("test-admin");
+ principalCredentials =
managementApi.createPrincipalWithRole(principalName, principalRoleName);
+
+ String principalToken = client.obtainToken(principalCredentials);
+ catalogApi = client.catalogApi(principalToken);
+
+ catalogName =
client.newEntityName(testInfo.getTestMethod().orElseThrow().getName());
+ }
+
+ private RESTCatalog createCatalog(
+ Optional<String> endpoint,
+ Optional<String> stsEndpoint,
+ boolean pathStyleAccess,
+ Optional<AccessDelegationMode> delegationMode,
+ boolean stsEnabled) {
+ return createCatalog(
+ endpoint, stsEndpoint, pathStyleAccess, Optional.empty(),
delegationMode, stsEnabled);
+ }
+
+ private RESTCatalog createCatalog(
+ Optional<String> endpoint,
+ Optional<String> stsEndpoint,
+ boolean pathStyleAccess,
+ Optional<String> endpointInternal,
+ Optional<AccessDelegationMode> delegationMode,
+ boolean stsEnabled) {
+ AwsStorageConfigInfo.Builder storageConfig =
+ AwsStorageConfigInfo.builder()
+ .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
+ .setPathStyleAccess(pathStyleAccess)
+ .setStsUnavailable(!stsEnabled)
+ .setAllowedLocations(List.of(storageBase.toString()));
+
+ endpoint.ifPresent(storageConfig::setEndpoint);
+ stsEndpoint.ifPresent(storageConfig::setStsEndpoint);
+ endpointInternal.ifPresent(storageConfig::setEndpointInternal);
+
+ CatalogProperties.Builder catalogProps =
+ CatalogProperties.builder(storageBase.toASCIIString() + "/" +
catalogName);
+ if (!stsEnabled) {
+ catalogProps.addProperty(
+ TABLE_DEFAULT_PREFIX + AWS_KEY_ID.getPropertyName(),
RUSTFS_ACCESS_KEY);
+ catalogProps.addProperty(
+ TABLE_DEFAULT_PREFIX + AWS_SECRET_KEY.getPropertyName(),
RUSTFS_SECRET_KEY);
+ }
+ Catalog catalog =
+ PolarisCatalog.builder()
+ .setType(Catalog.TypeEnum.INTERNAL)
+ .setName(catalogName)
+ .setStorageConfigInfo(storageConfig.build())
+ .setProperties(catalogProps.build())
+ .build();
+
+ managementApi.createCatalog(principalRoleName, catalog);
+
+ String authToken = client.obtainToken(principalCredentials);
+ RESTCatalog restCatalog = new RESTCatalog();
+
+ ImmutableMap.Builder<String, String> propertiesBuilder =
+ ImmutableMap.<String, String>builder()
+ .put(
+ org.apache.iceberg.CatalogProperties.URI,
endpoints.catalogApiEndpoint().toString())
+ .put(OAuth2Properties.TOKEN, authToken)
+ .put("warehouse", catalogName)
+ .putAll(endpoints.extraHeaders("header."));
+
+ delegationMode.ifPresent(
+ dm -> propertiesBuilder.put("header.X-Iceberg-Access-Delegation",
dm.protocolValue()));
+
+ if (delegationMode.isEmpty()) {
+ // Use local credentials on the client side
+ propertiesBuilder.put("s3.access-key-id", RUSTFS_ACCESS_KEY);
+ propertiesBuilder.put("s3.secret-access-key", RUSTFS_SECRET_KEY);
+ }
+
+ restCatalog.initialize("polaris", propertiesBuilder.buildKeepingLast());
+ return restCatalog;
+ }
+
+ @AfterEach
+ public void cleanUp() {
+ client.cleanUp(adminToken);
+ }
+
+ @ParameterizedTest
+ @CsvSource("true, true,")
+ @CsvSource("false, true,")
+ @CsvSource("true, false,")
+ @CsvSource("false, false,")
+ public void testCreateTable(boolean pathStyle, boolean stsEnabled) throws
IOException {
+ LoadTableResponse response = doTestCreateTable(pathStyle,
Optional.empty(), stsEnabled);
+ assertThat(response.config()).doesNotContainKey(SECRET_ACCESS_KEY);
+ assertThat(response.config()).doesNotContainKey(ACCESS_KEY_ID);
+
assertThat(response.config()).doesNotContainKey(REFRESH_CREDENTIALS_ENDPOINT);
+ assertThat(response.credentials()).isEmpty();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testCreateTableVendedCredentials(boolean pathStyle) throws
IOException {
+ LoadTableResponse response =
+ doTestCreateTable(pathStyle, Optional.of(VENDED_CREDENTIALS), true);
+ assertThat(response.config())
+ .containsEntry(
+ REFRESH_CREDENTIALS_ENDPOINT,
+ "v1/" + catalogName + "/namespaces/test-ns/tables/t1/credentials");
+ assertThat(response.credentials()).hasSize(1);
+ }
+
+ private LoadTableResponse doTestCreateTable(
+ boolean pathStyle, Optional<AccessDelegationMode> dm, boolean
stsEnabled) throws IOException {
+ try (RESTCatalog restCatalog =
+ createCatalog(Optional.of(endpoint), Optional.empty(), pathStyle, dm,
stsEnabled)) {
+ LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog, dm);
+ if (pathStyle) {
+ assertThat(loadTableResponse.config())
+ .containsEntry("s3.path-style-access", Boolean.TRUE.toString());
+ }
+ return loadTableResponse;
+ }
+ }
+
+ @Test
+ public void testInternalEndpoints() throws IOException {
+ try (RESTCatalog restCatalog =
+ createCatalog(
+ Optional.of("http://s3.example.com"),
+ Optional.of(endpoint),
+ false,
+ Optional.of(endpoint),
+ Optional.empty(),
+ true)) {
+ StorageConfigInfo storageConfig =
+ managementApi.getCatalog(catalogName).getStorageConfigInfo();
+ assertThat((AwsStorageConfigInfo) storageConfig)
+ .extracting(
+ AwsStorageConfigInfo::getEndpoint,
+ AwsStorageConfigInfo::getStsEndpoint,
+ AwsStorageConfigInfo::getEndpointInternal,
+ AwsStorageConfigInfo::getPathStyleAccess)
+ .containsExactly("http://s3.example.com", endpoint, endpoint, false);
+ LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog,
Optional.empty());
+ assertThat(loadTableResponse.config()).containsEntry(ENDPOINT,
"http://s3.example.com");
+ }
+ }
+
+ @Test
+ public void testCreateTableFailureWithCredentialVendingWithoutSts() throws
IOException {
+ try (RESTCatalog restCatalog =
+ createCatalog(
+ Optional.of(endpoint),
+ Optional.of("http://sts.example.com"), // not called
+ false,
+ Optional.of(VENDED_CREDENTIALS),
+ false)) {
+ StorageConfigInfo storageConfig =
+ managementApi.getCatalog(catalogName).getStorageConfigInfo();
+ assertThat((AwsStorageConfigInfo) storageConfig)
+ .extracting(
+ AwsStorageConfigInfo::getEndpoint,
+ AwsStorageConfigInfo::getStsEndpoint,
+ AwsStorageConfigInfo::getEndpointInternal,
+ AwsStorageConfigInfo::getPathStyleAccess,
+ AwsStorageConfigInfo::getStsUnavailable)
+ .containsExactly(endpoint, "http://sts.example.com", null, false,
true);
+
+ catalogApi.createNamespace(catalogName, "test-ns");
+ TableIdentifier id = TableIdentifier.of("test-ns", "t2");
+ // Credential vending is not supported without STS
+ assertThatThrownBy(() -> restCatalog.createTable(id, SCHEMA))
+ .hasMessageContaining("but no credentials are available")
+ .hasMessageContaining(id.toString());
+ }
+ }
+
+ @Test
+ public void testLoadTableFailureWithCredentialVendingWithoutSts() throws
IOException {
+ try (RESTCatalog restCatalog =
+ createCatalog(
+ Optional.of(endpoint),
+ Optional.of("http://sts.example.com"), // not called
+ false,
+ Optional.empty(),
+ false)) {
+
+ catalogApi.createNamespace(catalogName, "test-ns");
+ TableIdentifier id = TableIdentifier.of("test-ns", "t3");
+ restCatalog.createTable(id, SCHEMA);
+
+ // Credential vending is not supported without STS
+ assertThatThrownBy(
+ () ->
+ catalogApi.loadTable(
+ catalogName,
+ id,
+ "ALL",
+ Map.of("X-Iceberg-Access-Delegation",
VENDED_CREDENTIALS.protocolValue())))
+ .hasMessageContaining("but no credentials are available")
+ .hasMessageContaining(id.toString());
+ }
+ }
+
+ public LoadTableResponse doTestCreateTable(
+ RESTCatalog restCatalog, Optional<AccessDelegationMode> dm) {
+ catalogApi.createNamespace(catalogName, "test-ns");
+ TableIdentifier id = TableIdentifier.of("test-ns", "t1");
+ Table table = restCatalog.createTable(id, SCHEMA);
+ assertThat(table).isNotNull();
+ assertThat(restCatalog.tableExists(id)).isTrue();
+
+ TableOperations ops = ((HasTableOperations) table).operations();
+ URI location = URI.create(ops.current().metadataFileLocation());
+
+ GetObjectResponse response =
+ s3Client
+ .getObject(
+ GetObjectRequest.builder()
+ .bucket(location.getAuthority())
+ .key(location.getPath().substring(1)) // drop leading slash
+ .build())
+ .response();
+ assertThat(response.contentLength()).isGreaterThan(0);
+
+ LoadTableResponse loadTableResponse =
+ catalogApi.loadTable(
+ catalogName,
+ id,
+ "ALL",
+ dm.map(v -> Map.of("X-Iceberg-Access-Delegation",
v.protocolValue())).orElse(Map.of()));
+
+ assertThat(loadTableResponse.config()).containsKey(ENDPOINT);
+
+ restCatalog.dropTable(id);
+ assertThat(restCatalog.tableExists(id)).isFalse();
+ return loadTableResponse;
+ }
+
+ @ParameterizedTest
+ @CsvSource("true, true,")
+ @CsvSource("false, true,")
+ @CsvSource("true, false,")
+ @CsvSource("false, false,")
+ @CsvSource("true, true, VENDED_CREDENTIALS")
+ @CsvSource("false, true, VENDED_CREDENTIALS")
+ public void testAppendFiles(
+ boolean pathStyle, boolean stsEnabled, AccessDelegationMode
delegationMode)
+ throws IOException {
+ try (RESTCatalog restCatalog =
+ createCatalog(
+ Optional.of(endpoint),
+ Optional.of(endpoint),
+ pathStyle,
+ Optional.ofNullable(delegationMode),
+ stsEnabled)) {
+ catalogApi.createNamespace(catalogName, "test-ns");
+ TableIdentifier id = TableIdentifier.of("test-ns", "t1");
+ Table table = restCatalog.createTable(id, SCHEMA);
+ assertThat(table).isNotNull();
+
+ @SuppressWarnings("resource")
+ FileIO io = table.io();
+
+ URI loc =
+ URI.create(
+ table
+ .locationProvider()
+ .newDataLocation(
+ String.format(
+ "test-file-%s-%s-%s.txt", pathStyle, delegationMode,
stsEnabled)));
+ OutputFile f1 = io.newOutputFile(loc.toString());
+ try (PositionOutputStream os = f1.create()) {
+ os.write("Hello World".getBytes(UTF_8));
+ }
+
+ DataFile df =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(f1.location())
+ .withFormat(FileFormat.PARQUET) // bogus value
+ .withFileSizeInBytes(4)
+ .withRecordCount(1)
+ .build();
+
+ table.newAppend().appendFile(df).commit();
+
+ try (InputStream is =
+ s3Client.getObject(
+ GetObjectRequest.builder()
+ .bucket(loc.getAuthority())
+ .key(loc.getPath().substring(1)) // drop leading slash
+ .build())) {
+ assertThat(new String(is.readAllBytes(), UTF_8)).isEqualTo("Hello
World");
+ }
+ }
+ }
+}
diff --git
a/runtime/service/src/intTest/java/org/apache/polaris/service/it/nosql/NoSqlCatalogIT.java
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/nosql/NoSqlCatalogIT.java
index ba99caab5..f4cd8ec8d 100644
---
a/runtime/service/src/intTest/java/org/apache/polaris/service/it/nosql/NoSqlCatalogIT.java
+++
b/runtime/service/src/intTest/java/org/apache/polaris/service/it/nosql/NoSqlCatalogIT.java
@@ -22,18 +22,18 @@ import com.google.common.collect.ImmutableMap;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.quarkus.test.junit.TestProfile;
import java.util.Map;
-import org.apache.polaris.service.it.PolarisRestCatalogMinIOIT;
+import org.apache.polaris.service.it.PolarisRestCatalogRustFSIT;
@QuarkusIntegrationTest
@TestProfile(value = NoSqlCatalogIT.Profile.class)
-public class NoSqlCatalogIT extends PolarisRestCatalogMinIOIT {
+public class NoSqlCatalogIT extends PolarisRestCatalogRustFSIT {
public static class Profile extends NoSqlTesting.PersistenceInMemoryProfile {
@Override
public Map<String, String> getConfigOverrides() {
return ImmutableMap.<String, String>builder()
.putAll(super.getConfigOverrides())
- .put("polaris.storage.aws.access-key", MINIO_ACCESS_KEY)
- .put("polaris.storage.aws.secret-key", MINIO_SECRET_KEY)
+ .put("polaris.storage.aws.access-key", RUSTFS_ACCESS_KEY)
+ .put("polaris.storage.aws.secret-key", RUSTFS_SECRET_KEY)
.put("polaris.features.\"SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION\"",
"false")
.build();
}
diff --git
a/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java
b/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java
index 60e01f9a0..a05211b30 100644
---
a/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java
+++
b/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java
@@ -41,8 +41,8 @@ public class CatalogFederationIT extends
CatalogFederationIntegrationTest {
.put("polaris.features.\"SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION\"",
"false")
.put("polaris.features.\"ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING\"", "true")
.put("polaris.features.\"ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING\"", "true")
- .put("polaris.storage.aws.access-key",
CatalogFederationIntegrationTest.MINIO_ACCESS_KEY)
- .put("polaris.storage.aws.secret-key",
CatalogFederationIntegrationTest.MINIO_SECRET_KEY)
+ .put("polaris.storage.aws.access-key",
CatalogFederationIntegrationTest.RUSTFS_ACCESS_KEY)
+ .put("polaris.storage.aws.secret-key",
CatalogFederationIntegrationTest.RUSTFS_SECRET_KEY)
.build();
}
}
diff --git a/tools/rustfs-testcontainer/build.gradle.kts
b/tools/rustfs-testcontainer/build.gradle.kts
new file mode 100644
index 000000000..5aa1eb9cd
--- /dev/null
+++ b/tools/rustfs-testcontainer/build.gradle.kts
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+ id("org.kordamp.gradle.jandex")
+ id("polaris-server")
+}
+
+dependencies {
+ api(platform(libs.testcontainers.bom))
+ api("org.testcontainers:testcontainers")
+
+ api(platform(libs.awssdk.bom))
+ api("software.amazon.awssdk:s3")
+ api("software.amazon.awssdk:kms")
+
+ implementation(project(":polaris-container-spec-helper"))
+ implementation("software.amazon.awssdk:url-connection-client")
+ implementation(libs.guava)
+
+ compileOnly(platform(libs.junit.bom))
+ compileOnly("org.junit.jupiter:junit-jupiter-api")
+}
diff --git
a/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/Rustfs.java
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/Rustfs.java
new file mode 100644
index 000000000..a18f9791d
--- /dev/null
+++
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/Rustfs.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.test.rustfs;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+// CODE_COPIED_TO_POLARIS from Project Nessie 0.104.2
+@Target({ElementType.FIELD, ElementType.PARAMETER})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface Rustfs {
+ /** Optional, use this access key instead of a random one. */
+ String accessKey() default DEFAULT;
+
+ /** Optional, use this secret key instead of a random one. */
+ String secretKey() default DEFAULT;
+
+ /** Optional, use this bucket instead of a random one. */
+ String bucket() default DEFAULT;
+
+ /** Optional, use this region. */
+ String region() default DEFAULT;
+
+ String DEFAULT = "rustfs_default_value__";
+}
diff --git
a/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsAccess.java
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsAccess.java
new file mode 100644
index 000000000..d49c71e25
--- /dev/null
+++
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsAccess.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.test.rustfs;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Optional;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.s3.S3Client;
+
+/**
+ * Provides access to Rustfs via a preconfigured S3 client and providing the
by default randomized
+ * bucket and access/secret keys.
+ *
+ * <p>Annotate JUnit test instance or static fields or method parameters of
this type with {@link
+ * Rustfs}.
+ */
+// CODE_COPIED_TO_POLARIS from Project Nessie 0.104.2
+public interface RustfsAccess {
+
+ /** Host and port, separated by '{@code :}'. */
+ String hostPort();
+
+ String accessKey();
+
+ String secretKey();
+
+ String bucket();
+
+ Optional<String> region();
+
+ /** HTTP protocol endpoint. */
+ String s3endpoint();
+
+ S3Client s3Client();
+
+ /** Properties needed by Apache Iceberg to access this instance. */
+ Map<String, String> icebergProperties();
+
+ /** Properties needed by Apache Hadoop to access this instance. */
+ Map<String, String> hadoopConfig();
+
+ /** S3 scheme URI including the bucket to access the given path. */
+ URI s3BucketUri(String path);
+
+ /** Convenience method to put an object into S3. */
+ @SuppressWarnings("resource")
+ default void s3put(String key, RequestBody body) {
+ s3Client().putObject(b -> b.bucket(bucket()).key(key), body);
+ }
+}
diff --git
a/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsContainer.java
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsContainer.java
new file mode 100644
index 000000000..bac77279b
--- /dev/null
+++
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsContainer.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.test.rustfs;
+
+import com.google.common.base.Preconditions;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.polaris.containerspec.ContainerSpecHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.Base58;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+
+// CODE_COPIED_TO_POLARIS from Project Nessie 0.104.2
+public final class RustfsContainer extends GenericContainer<RustfsContainer>
+ implements RustfsAccess, AutoCloseable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RustfsContainer.class);
+
+ private static final int S3_API_PORT = 9000;
+ private static final int CONSOLE_PORT = 9001;
+
+ private static final String RUSTFS_ACCESS_KEY = "RUSTFS_ACCESS_KEY";
+ private static final String RUSTFS_SECRET_KEY = "RUSTFS_SECRET_KEY";
+ private static final String RUSTFS_DOMAIN = "RUSTFS_SERVER_DOMAINS";
+
+ private static final String HEALTH_ENDPOINT = "/health";
+ private static final String RUSTFS_DOMAIN_NAME;
+
+ /**
+ * Domain must start with "s3" in order to be recognized as an S3 endpoint
by the AWS SDK with
+ * virtual-host-style addressing. The bucket name is expected to be the
first part of the domain
+ * name, e.g. "bucket.s3.127-0-0-1.nip.io".
+ */
+ private static final String RUSTFS_DOMAIN_NIP = "s3.127-0-0-1.nip.io";
+
+ /**
+ * Whether random bucket names cannot be used. Randomized bucket names can
only be used when
+ * either `*.localhost` (on Linux) or `*.s3.127-0-0-1.nip.io` (on macOS, if
DNS rebind protection
+ * is not active) can be resolved. Otherwise we have to use a fixed bucket
name and users have to
+ * configure that in `/etc/hosts`.
+ */
+ private static final String FIXED_BUCKET_NAME;
+
+ static boolean canRunOnMacOs() {
+ return RUSTFS_DOMAIN_NAME.equals(RUSTFS_DOMAIN_NIP);
+ }
+
+ static {
+ String name;
+ String fixedBucketName = null;
+ if
(System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("linux")) {
+ name = "localhost";
+ } else {
+ try {
+ InetAddress ignored = InetAddress.getByName(RUSTFS_DOMAIN_NIP);
+ name = RUSTFS_DOMAIN_NIP;
+ } catch (UnknownHostException x) {
+ LOGGER.warn(
+ "Could not resolve '{}', falling back to 'localhost'. "
+ + "This usually happens when your router or DNS provider is
unable to resolve the nip.io addresses.",
+ RUSTFS_DOMAIN_NIP);
+ name = "localhost";
+ fixedBucketName = "rustfsbucket";
+ validateBucketHost(fixedBucketName);
+ }
+ }
+ RUSTFS_DOMAIN_NAME = name;
+ FIXED_BUCKET_NAME = fixedBucketName;
+ }
+
+ /** Validates the bucket host name, on non-Linux, if necessary. */
+ private static String validateBucketHost(String bucketName) {
+ if (FIXED_BUCKET_NAME != null) {
+ String test = bucketName + ".localhost";
+ try {
+ InetAddress ignored = InetAddress.getByName(test);
+ } catch (UnknownHostException e) {
+ LOGGER.warn(
+ "Could not resolve '{}',\n Please add the line \n '127.0.0.1
{}'\n to your local '/etc/hosts' file.\n Tests are expected to fail unless
name resolution works.",
+ test,
+ test);
+ }
+ }
+ return bucketName;
+ }
+
+ private final String accessKey;
+ private final String secretKey;
+ private final String bucket;
+
+ private String hostPort;
+ private String s3endpoint;
+ private S3Client s3;
+ private Optional<String> region;
+
+ @SuppressWarnings("unused")
+ public RustfsContainer() {
+ this(null, null, null, null, null);
+ }
+
+ @SuppressWarnings("resource")
+ public RustfsContainer(
+ String image, String accessKey, String secretKey, String bucket, String
region) {
+ super(
+ ContainerSpecHelper.containerSpecHelper("rustfs",
RustfsContainer.class)
+ .dockerImageName(image));
+ withNetworkAliases(randomString("rustfs"));
+ withLogConsumer(new
Slf4jLogConsumer(LoggerFactory.getLogger(RustfsContainer.class)));
+ // A fixed S3 API port is needed due to test-containers map a random port
to host and
+ // RustFS is very restrict on server domains.
+ // Anything not using port 443 port must define the port as part of
RUSTFS_SERVER_DOMAINS.
+ // More detail in https://github.com/rustfs/rustfs/issues/1593
+ addFixedExposedPort(S3_API_PORT, S3_API_PORT);
+ addExposedPort(CONSOLE_PORT);
+ this.accessKey = accessKey != null ? accessKey : randomString("access");
+ this.secretKey = secretKey != null ? secretKey : randomString("secret");
+ this.bucket =
+ bucket != null
+ ? validateBucketHost(bucket)
+ : (FIXED_BUCKET_NAME != null ? FIXED_BUCKET_NAME :
randomString("bucket"));
+ this.region = Optional.ofNullable(region);
+ withEnv(RUSTFS_ACCESS_KEY, this.accessKey);
+ withEnv(RUSTFS_SECRET_KEY, this.secretKey);
+ // S3 SDK encodes bucket names in host names - need to tell Rustfs which
domain to use
+ withEnv(RUSTFS_DOMAIN, RUSTFS_DOMAIN_NAME + ":" + S3_API_PORT);
+ setWaitStrategy(
+ new HttpWaitStrategy()
+ .forPort(CONSOLE_PORT)
+ .forPath(HEALTH_ENDPOINT)
+ .withStartupTimeout(Duration.ofMinutes(2)));
+ }
+
+ public RustfsContainer withRegion(String region) {
+ this.region = Optional.of(region);
+ return this;
+ }
+
+ private static String randomString(String prefix) {
+ return prefix + "-" + Base58.randomString(6).toLowerCase(Locale.ROOT);
+ }
+
+ @Override
+ public String hostPort() {
+ Preconditions.checkState(hostPort != null, "Container not yet started");
+ return hostPort;
+ }
+
+ @Override
+ public String accessKey() {
+ return accessKey;
+ }
+
+ @Override
+ public String secretKey() {
+ return secretKey;
+ }
+
+ @Override
+ public String bucket() {
+ return bucket;
+ }
+
+ @Override
+ public Optional<String> region() {
+ return region;
+ }
+
+ @Override
+ public String s3endpoint() {
+ Preconditions.checkState(s3endpoint != null, "Container not yet started");
+ return s3endpoint;
+ }
+
+ @Override
+ public S3Client s3Client() {
+ Preconditions.checkState(s3 != null, "Container not yet started");
+ return s3;
+ }
+
+ @Override
+ public Map<String, String> icebergProperties() {
+ Map<String, String> props = new HashMap<>();
+ props.put("s3.access-key-id", accessKey());
+ props.put("s3.secret-access-key", secretKey());
+ props.put("s3.endpoint", s3endpoint());
+ props.put("http-client.type", "urlconnection");
+ region().ifPresent(r -> props.put("client.region", r));
+ return props;
+ }
+
+ @Override
+ public Map<String, String> hadoopConfig() {
+ Map<String, String> r = new HashMap<>();
+ r.put("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+ r.put("fs.s3a.access.key", accessKey());
+ r.put("fs.s3a.secret.key", secretKey());
+ r.put("fs.s3a.endpoint", s3endpoint());
+ return r;
+ }
+
+ @Override
+ public URI s3BucketUri(String path) {
+ return s3BucketUri("s3", path);
+ }
+
+ public URI s3BucketUri(String scheme, String path) {
+ Preconditions.checkState(bucket != null, "Container not yet started");
+ return URI.create(String.format("%s://%s/", scheme, bucket)).resolve(path);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+
+ this.hostPort = RUSTFS_DOMAIN_NAME + ":" + getMappedPort(S3_API_PORT);
+ this.s3endpoint = String.format("http://%s/", hostPort);
+
+ this.s3 = createS3Client();
+
this.s3.createBucket(CreateBucketRequest.builder().bucket(bucket()).build());
+ }
+
+ @Override
+ public void close() {
+ stop();
+ }
+
+ @Override
+ public void stop() {
+ try {
+ if (s3 != null) {
+ s3.close();
+ }
+ } finally {
+ s3 = null;
+ super.stop();
+ }
+ }
+
+ private S3Client createS3Client() {
+ return S3Client.builder()
+ .httpClientBuilder(UrlConnectionHttpClient.builder())
+ .applyMutation(builder ->
builder.endpointOverride(URI.create(s3endpoint())))
+ .applyMutation(builder -> region.ifPresent(r ->
builder.region(Region.of(r))))
+ // .serviceConfiguration(s3Configuration(s3PathStyleAccess,
s3UseArnRegionEnabled))
+ // credentialsProvider(s3AccessKeyId, s3SecretAccessKey,
s3SessionToken)
+ .credentialsProvider(
+
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey(),
secretKey())))
+ .build();
+ }
+}
diff --git
a/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsExtension.java
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsExtension.java
new file mode 100644
index 000000000..becd226f6
--- /dev/null
+++
b/tools/rustfs-testcontainer/src/main/java/org/apache/polaris/test/rustfs/RustfsExtension.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.test.rustfs;
+
+import static java.lang.String.format;
+import static
org.junit.jupiter.api.extension.ConditionEvaluationResult.disabled;
+import static
org.junit.jupiter.api.extension.ConditionEvaluationResult.enabled;
+import static
org.junit.platform.commons.util.AnnotationUtils.findAnnotatedFields;
+import static org.junit.platform.commons.util.ReflectionUtils.makeAccessible;
+
+import java.lang.reflect.Field;
+import org.junit.jupiter.api.condition.OS;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ConditionEvaluationResult;
+import org.junit.jupiter.api.extension.ExecutionCondition;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.junit.platform.commons.util.AnnotationUtils;
+import org.junit.platform.commons.util.ExceptionUtils;
+import org.junit.platform.commons.util.ReflectionUtils;
+
+/**
+ * JUnit extension that provides a Rustfs container configured with a single
bucket.
+ *
+ * <p>Provides instances of {@link RustfsAccess} via instance or static fields
or parameters
+ * annotated with {@link Rustfs}.
+ */
+// CODE_COPIED_TO_POLARIS from Project Nessie 0.104.2
+public class RustfsExtension
+ implements BeforeAllCallback, BeforeEachCallback, ParameterResolver,
ExecutionCondition {
+ private static final ExtensionContext.Namespace NAMESPACE =
+ ExtensionContext.Namespace.create(RustfsExtension.class);
+
+ @Override
+ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext
context) {
+ if (OS.current() == OS.LINUX) {
+ return enabled("Running on Linux");
+ }
+ if (OS.current() == OS.MAC
+ && System.getenv("CI_MAC") == null
+ && RustfsContainer.canRunOnMacOs()) {
+ // Disable tests on GitHub Actions
+ return enabled("Running on macOS locally");
+ }
+ return disabled(format("Disabled on %s", OS.current().name()));
+ }
+
+ @Override
+ public void beforeAll(ExtensionContext context) {
+ Class<?> testClass = context.getRequiredTestClass();
+
+ findAnnotatedFields(testClass, Rustfs.class, ReflectionUtils::isStatic)
+ .forEach(field -> injectField(context, field));
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) {
+ Class<?> testClass = context.getRequiredTestClass();
+
+ findAnnotatedFields(testClass, Rustfs.class, ReflectionUtils::isNotStatic)
+ .forEach(field -> injectField(context, field));
+ }
+
+ private void injectField(ExtensionContext context, Field field) {
+ try {
+ Rustfs rustfs =
+ AnnotationUtils.findAnnotation(field, Rustfs.class)
+ .orElseThrow(IllegalStateException::new);
+
+ RustfsAccess container =
+ context
+ .getStore(NAMESPACE)
+ .getOrComputeIfAbsent(
+ field.toString(), x -> createContainer(rustfs),
RustfsAccess.class);
+
+ makeAccessible(field).set(context.getTestInstance().orElse(null),
container);
+ } catch (Throwable t) {
+ ExceptionUtils.throwAsUncheckedException(t);
+ }
+ }
+
+ @Override
+ public boolean supportsParameter(
+ ParameterContext parameterContext, ExtensionContext extensionContext)
+ throws ParameterResolutionException {
+ if (parameterContext.findAnnotation(Rustfs.class).isEmpty()) {
+ return false;
+ }
+ return
parameterContext.getParameter().getType().isAssignableFrom(RustfsAccess.class);
+ }
+
+ @Override
+ public Object resolveParameter(
+ ParameterContext parameterContext, ExtensionContext extensionContext)
+ throws ParameterResolutionException {
+ return extensionContext
+ .getStore(NAMESPACE)
+ .getOrComputeIfAbsent(
+ RustfsExtension.class.getName() + '#' +
parameterContext.getParameter().getName(),
+ k -> {
+ Rustfs rustfs =
parameterContext.findAnnotation(Rustfs.class).get();
+ return createContainer(rustfs);
+ },
+ RustfsAccess.class);
+ }
+
+ private RustfsAccess createContainer(Rustfs rustfs) {
+ String accessKey = nonDefault(rustfs.accessKey());
+ String secretKey = nonDefault(rustfs.secretKey());
+ String bucket = nonDefault(rustfs.bucket());
+ String region = nonDefault(rustfs.region());
+ RustfsContainer container =
+ new RustfsContainer(null, accessKey, secretKey, bucket,
region).withStartupAttempts(5);
+ container.start();
+ return container;
+ }
+
+ private static String nonDefault(String s) {
+ return s.equals(Rustfs.DEFAULT) ? null : s;
+ }
+}
diff --git
a/tools/rustfs-testcontainer/src/main/resources/org/apache/polaris/test/rustfs/Dockerfile-rustfs-version
b/tools/rustfs-testcontainer/src/main/resources/org/apache/polaris/test/rustfs/Dockerfile-rustfs-version
new file mode 100644
index 000000000..194d5402b
--- /dev/null
+++
b/tools/rustfs-testcontainer/src/main/resources/org/apache/polaris/test/rustfs/Dockerfile-rustfs-version
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Dockerfile to provide the image name and tag to a test.
+# Version is managed by Renovate - do not edit.
+FROM rustfs/rustfs:1.0.0-alpha.82
\ No newline at end of file