This is an automated email from the ASF dual-hosted git repository. emaynard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push: new f5feb95e7 Eliminate usage of CatalogHandlers; introduce CatalogHandlerUtils (#1576) f5feb95e7 is described below commit f5feb95e76928aad12bf7ee80f49b1d2a3e99558 Author: Eric Maynard <eric.maynard+...@snowflake.com> AuthorDate: Thu May 15 09:30:42 2025 -0700 Eliminate usage of CatalogHandlers; introduce CatalogHandlerUtils (#1576) * compiles * wire it up * De-static, add config * autolint * license entry * cut over * pull * revert helm doc * autolint --- LICENSE | 1 + .../polaris/core/config/FeatureConfiguration.java | 8 + .../quarkus/admin/PolarisAuthzTestBase.java | 2 + .../catalog/IcebergCatalogHandlerAuthzTest.java | 9 +- .../catalog/iceberg/CatalogHandlerUtils.java | 614 +++++++++++++++++++++ .../catalog/iceberg/IcebergCatalogAdapter.java | 14 +- .../catalog/iceberg/IcebergCatalogHandler.java | 62 ++- .../org/apache/polaris/service/TestServices.java | 7 +- 8 files changed, 679 insertions(+), 38 deletions(-) diff --git a/LICENSE b/LICENSE index f603db922..38b90ce86 100644 --- a/LICENSE +++ b/LICENSE @@ -219,6 +219,7 @@ This product includes code from Apache Iceberg. * spec/polaris-catalog-apis/oauth-tokens-api.yaml * integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java * service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +* service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java * plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java * plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java * plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 39a342e95..f06bfad45 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -261,4 +261,12 @@ public class FeatureConfiguration<T> extends PolarisConfiguration<T> { .description("The list of supported catalog connection types for federation") .defaultValue(List.of(ConnectionType.ICEBERG_REST.name())) .buildFeatureConfiguration(); + + public static final FeatureConfiguration<Integer> ICEBERG_COMMIT_MAX_RETRIES = + PolarisConfiguration.<Integer>builder() + .key("ICEBERG_COMMIT_MAX_RETRIES") + .catalogConfig("polaris.config.iceberg-commit-max-retries") + .description("The max number of times to try committing to an Iceberg table") + .defaultValue(4) + .buildFeatureConfiguration(); } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java index 37dcc04e5..8abb4c041 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java @@ -81,6 +81,7 @@ import org.apache.polaris.core.secrets.UserSecretsManagerFactory; import org.apache.polaris.service.admin.PolarisAdminService; import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView; import org.apache.polaris.service.catalog.generic.PolarisGenericTableCatalog; +import org.apache.polaris.service.catalog.iceberg.CatalogHandlerUtils; import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.policy.PolicyCatalog; @@ -192,6 +193,7 @@ public abstract class PolarisAuthzTestBase { @Inject protected Clock clock; @Inject protected FileIOFactory fileIOFactory; @Inject protected PolarisEventListener polarisEventListener; + @Inject protected CatalogHandlerUtils catalogHandlerUtils; protected IcebergCatalog baseCatalog; protected PolarisGenericTableCatalog genericTableCatalog; diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java index cbfaa41f3..926e9791e 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java @@ -116,7 +116,8 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { factory, catalogName, polarisAuthorizer, - reservedProperties); + reservedProperties, + catalogHandlerUtils); } /** @@ -256,7 +257,8 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { callContextCatalogFactory, CATALOG_NAME, polarisAuthorizer, - reservedProperties); + reservedProperties, + catalogHandlerUtils); // a variety of actions are all disallowed because the principal's credentials must be rotated doTestInsufficientPrivileges( @@ -290,7 +292,8 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { callContextCatalogFactory, CATALOG_NAME, polarisAuthorizer, - reservedProperties); + reservedProperties, + catalogHandlerUtils); doTestSufficientPrivilegeSets( List.of(Set.of(PolarisPrivilege.NAMESPACE_LIST)), diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java new file mode 100644 index 000000000..39f9b3352 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.catalog.iceberg; + +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseMetadataTable; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.BaseTransaction; +import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateRequirement; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.ViewCatalog; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.rest.requests.CreateNamespaceRequest; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.CreateViewRequest; +import org.apache.iceberg.rest.requests.RegisterTableRequest; +import org.apache.iceberg.rest.requests.RenameTableRequest; +import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; +import org.apache.iceberg.rest.requests.UpdateTableRequest; +import org.apache.iceberg.rest.responses.CreateNamespaceResponse; +import org.apache.iceberg.rest.responses.GetNamespaceResponse; +import org.apache.iceberg.rest.responses.ImmutableLoadViewResponse; +import org.apache.iceberg.rest.responses.ListNamespacesResponse; +import org.apache.iceberg.rest.responses.ListTablesResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.rest.responses.LoadViewResponse; +import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.view.BaseView; +import org.apache.iceberg.view.SQLViewRepresentation; +import org.apache.iceberg.view.View; +import org.apache.iceberg.view.ViewBuilder; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewOperations; +import org.apache.iceberg.view.ViewRepresentation; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.PolarisConfigurationStore; + +/** + * CODE_COPIED_TO_POLARIS Copied from CatalogHandler in Iceberg 1.8.0 Contains a collection of + * utilities related to managing Iceberg entities + */ +@ApplicationScoped +public class CatalogHandlerUtils { + private static final Schema EMPTY_SCHEMA = new Schema(); + private static final String INITIAL_PAGE_TOKEN = ""; + + private final PolarisCallContext polarisCallContext; + private final PolarisConfigurationStore configurationStore; + + @Inject + public CatalogHandlerUtils( + PolarisCallContext polarisCallContext, PolarisConfigurationStore configurationStore) { + this.polarisCallContext = polarisCallContext; + this.configurationStore = configurationStore; + } + + /** + * Exception used to avoid retrying commits when assertions fail. + * + * <p>When a REST assertion fails, it will throw CommitFailedException to send back to the client. + * But the assertion checks happen in the block that is retried if {@link + * TableOperations#commit(TableMetadata, TableMetadata)} throws CommitFailedException. This is + * used to avoid retries for assertion failures, which are unwrapped and rethrown outside of the + * commit loop. + */ + private static class ValidationFailureException extends RuntimeException { + private final CommitFailedException wrapped; + + private ValidationFailureException(CommitFailedException cause) { + super(cause); + this.wrapped = cause; + } + + public CommitFailedException wrapped() { + return wrapped; + } + } + + private <T> Pair<List<T>, String> paginate(List<T> list, String pageToken, int pageSize) { + int pageStart = INITIAL_PAGE_TOKEN.equals(pageToken) ? 0 : Integer.parseInt(pageToken); + if (pageStart >= list.size()) { + return Pair.of(Collections.emptyList(), null); + } + + int end = Math.min(pageStart + pageSize, list.size()); + List<T> subList = list.subList(pageStart, end); + String nextPageToken = end >= list.size() ? null : String.valueOf(end); + + return Pair.of(subList, nextPageToken); + } + + public ListNamespacesResponse listNamespaces(SupportsNamespaces catalog, Namespace parent) { + List<Namespace> results; + if (parent.isEmpty()) { + results = catalog.listNamespaces(); + } else { + results = catalog.listNamespaces(parent); + } + + return ListNamespacesResponse.builder().addAll(results).build(); + } + + public ListNamespacesResponse listNamespaces( + SupportsNamespaces catalog, Namespace parent, String pageToken, String pageSize) { + List<Namespace> results; + + if (parent.isEmpty()) { + results = catalog.listNamespaces(); + } else { + results = catalog.listNamespaces(parent); + } + + Pair<List<Namespace>, String> page = paginate(results, pageToken, Integer.parseInt(pageSize)); + + return ListNamespacesResponse.builder() + .addAll(page.first()) + .nextPageToken(page.second()) + .build(); + } + + public CreateNamespaceResponse createNamespace( + SupportsNamespaces catalog, CreateNamespaceRequest request) { + Namespace namespace = request.namespace(); + catalog.createNamespace(namespace, request.properties()); + return CreateNamespaceResponse.builder() + .withNamespace(namespace) + .setProperties(catalog.loadNamespaceMetadata(namespace)) + .build(); + } + + public void namespaceExists(SupportsNamespaces catalog, Namespace namespace) { + if (!catalog.namespaceExists(namespace)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + } + + public GetNamespaceResponse loadNamespace(SupportsNamespaces catalog, Namespace namespace) { + Map<String, String> properties = catalog.loadNamespaceMetadata(namespace); + return GetNamespaceResponse.builder() + .withNamespace(namespace) + .setProperties(properties) + .build(); + } + + public void dropNamespace(SupportsNamespaces catalog, Namespace namespace) { + boolean dropped = catalog.dropNamespace(namespace); + if (!dropped) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + } + + public UpdateNamespacePropertiesResponse updateNamespaceProperties( + SupportsNamespaces catalog, Namespace namespace, UpdateNamespacePropertiesRequest request) { + request.validate(); + + Set<String> removals = Sets.newHashSet(request.removals()); + Map<String, String> updates = request.updates(); + + Map<String, String> startProperties = catalog.loadNamespaceMetadata(namespace); + Set<String> missing = Sets.difference(removals, startProperties.keySet()); + + if (!updates.isEmpty()) { + catalog.setProperties(namespace, updates); + } + + if (!removals.isEmpty()) { + // remove the original set just in case there was an update just after loading properties + catalog.removeProperties(namespace, removals); + } + + return UpdateNamespacePropertiesResponse.builder() + .addMissing(missing) + .addUpdated(updates.keySet()) + .addRemoved(Sets.difference(removals, missing)) + .build(); + } + + public ListTablesResponse listTables(Catalog catalog, Namespace namespace) { + List<TableIdentifier> idents = catalog.listTables(namespace); + return ListTablesResponse.builder().addAll(idents).build(); + } + + public ListTablesResponse listTables( + Catalog catalog, Namespace namespace, String pageToken, String pageSize) { + List<TableIdentifier> results = catalog.listTables(namespace); + + Pair<List<TableIdentifier>, String> page = + paginate(results, pageToken, Integer.parseInt(pageSize)); + + return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build(); + } + + public LoadTableResponse stageTableCreate( + Catalog catalog, Namespace namespace, CreateTableRequest request) { + request.validate(); + + TableIdentifier ident = TableIdentifier.of(namespace, request.name()); + if (catalog.tableExists(ident)) { + throw new AlreadyExistsException("Table already exists: %s", ident); + } + + Map<String, String> properties = Maps.newHashMap(); + properties.put("created-at", OffsetDateTime.now(ZoneOffset.UTC).toString()); + properties.putAll(request.properties()); + + String location; + if (request.location() != null) { + location = request.location(); + } else { + location = + catalog + .buildTable(ident, request.schema()) + .withPartitionSpec(request.spec()) + .withSortOrder(request.writeOrder()) + .withProperties(properties) + .createTransaction() + .table() + .location(); + } + + TableMetadata metadata = + TableMetadata.newTableMetadata( + request.schema(), + request.spec() != null ? request.spec() : PartitionSpec.unpartitioned(), + request.writeOrder() != null ? request.writeOrder() : SortOrder.unsorted(), + location, + properties); + + return LoadTableResponse.builder().withTableMetadata(metadata).build(); + } + + public LoadTableResponse createTable( + Catalog catalog, Namespace namespace, CreateTableRequest request) { + request.validate(); + + TableIdentifier ident = TableIdentifier.of(namespace, request.name()); + Table table = + catalog + .buildTable(ident, request.schema()) + .withLocation(request.location()) + .withPartitionSpec(request.spec()) + .withSortOrder(request.writeOrder()) + .withProperties(request.properties()) + .create(); + + if (table instanceof BaseTable) { + return LoadTableResponse.builder() + .withTableMetadata(((BaseTable) table).operations().current()) + .build(); + } + + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + + public LoadTableResponse registerTable( + Catalog catalog, Namespace namespace, RegisterTableRequest request) { + request.validate(); + + TableIdentifier identifier = TableIdentifier.of(namespace, request.name()); + Table table = catalog.registerTable(identifier, request.metadataLocation()); + if (table instanceof BaseTable) { + return LoadTableResponse.builder() + .withTableMetadata(((BaseTable) table).operations().current()) + .build(); + } + + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + + public void dropTable(Catalog catalog, TableIdentifier ident) { + boolean dropped = catalog.dropTable(ident, false); + if (!dropped) { + throw new NoSuchTableException("Table does not exist: %s", ident); + } + } + + public void purgeTable(Catalog catalog, TableIdentifier ident) { + boolean dropped = catalog.dropTable(ident, true); + if (!dropped) { + throw new NoSuchTableException("Table does not exist: %s", ident); + } + } + + public void tableExists(Catalog catalog, TableIdentifier ident) { + boolean exists = catalog.tableExists(ident); + if (!exists) { + throw new NoSuchTableException("Table does not exist: %s", ident); + } + } + + public LoadTableResponse loadTable(Catalog catalog, TableIdentifier ident) { + Table table = catalog.loadTable(ident); + + if (table instanceof BaseTable) { + return LoadTableResponse.builder() + .withTableMetadata(((BaseTable) table).operations().current()) + .build(); + } else if (table instanceof BaseMetadataTable) { + // metadata tables are loaded on the client side, return NoSuchTableException for now + throw new NoSuchTableException("Table does not exist: %s", ident.toString()); + } + + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + + public LoadTableResponse updateTable( + Catalog catalog, TableIdentifier ident, UpdateTableRequest request) { + TableMetadata finalMetadata; + if (isCreate(request)) { + // this is a hacky way to get TableOperations for an uncommitted table + Transaction transaction = + catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction(); + if (transaction instanceof BaseTransaction) { + BaseTransaction baseTransaction = (BaseTransaction) transaction; + finalMetadata = create(baseTransaction.underlyingOps(), request); + } else { + throw new IllegalStateException( + "Cannot wrap catalog that does not produce BaseTransaction"); + } + + } else { + Table table = catalog.loadTable(ident); + if (table instanceof BaseTable) { + TableOperations ops = ((BaseTable) table).operations(); + finalMetadata = commit(ops, request); + } else { + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + } + + return LoadTableResponse.builder().withTableMetadata(finalMetadata).build(); + } + + public void renameTable(Catalog catalog, RenameTableRequest request) { + catalog.renameTable(request.source(), request.destination()); + } + + private boolean isCreate(UpdateTableRequest request) { + boolean isCreate = + request.requirements().stream() + .anyMatch(UpdateRequirement.AssertTableDoesNotExist.class::isInstance); + + if (isCreate) { + List<UpdateRequirement> invalidRequirements = + request.requirements().stream() + .filter(req -> !(req instanceof UpdateRequirement.AssertTableDoesNotExist)) + .collect(Collectors.toList()); + Preconditions.checkArgument( + invalidRequirements.isEmpty(), "Invalid create requirements: %s", invalidRequirements); + } + + return isCreate; + } + + private TableMetadata create(TableOperations ops, UpdateTableRequest request) { + // the only valid requirement is that the table will be created + request.requirements().forEach(requirement -> requirement.validate(ops.current())); + Optional<Integer> formatVersion = + request.updates().stream() + .filter(update -> update instanceof UpgradeFormatVersion) + .map(update -> ((UpgradeFormatVersion) update).formatVersion()) + .findFirst(); + + TableMetadata.Builder builder = + formatVersion.map(TableMetadata::buildFromEmpty).orElseGet(TableMetadata::buildFromEmpty); + request.updates().forEach(update -> update.applyTo(builder)); + // create transactions do not retry. if the table exists, retrying is not a solution + ops.commit(null, builder.build()); + + return ops.current(); + } + + protected TableMetadata commit(TableOperations ops, UpdateTableRequest request) { + AtomicBoolean isRetry = new AtomicBoolean(false); + try { + Tasks.foreach(ops) + .retry(maxCommitRetries()) + .exponentialBackoff( + COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, + COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, + COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, + 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run( + taskOps -> { + TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); + isRetry.set(true); + + // validate requirements + try { + request.requirements().forEach(requirement -> requirement.validate(base)); + } catch (CommitFailedException e) { + // wrap and rethrow outside of tasks to avoid unnecessary retry + throw new ValidationFailureException(e); + } + + // apply changes + TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base); + request.updates().forEach(update -> update.applyTo(metadataBuilder)); + + TableMetadata updated = metadataBuilder.build(); + if (updated.changes().isEmpty()) { + // do not commit if the metadata has not changed + return; + } + + // commit + taskOps.commit(base, updated); + }); + + } catch (ValidationFailureException e) { + throw e.wrapped(); + } + + return ops.current(); + } + + private BaseView asBaseView(View view) { + Preconditions.checkState( + view instanceof BaseView, "Cannot wrap catalog that does not produce BaseView"); + return (BaseView) view; + } + + public ListTablesResponse listViews(ViewCatalog catalog, Namespace namespace) { + return ListTablesResponse.builder().addAll(catalog.listViews(namespace)).build(); + } + + public ListTablesResponse listViews( + ViewCatalog catalog, Namespace namespace, String pageToken, String pageSize) { + List<TableIdentifier> results = catalog.listViews(namespace); + + Pair<List<TableIdentifier>, String> page = + paginate(results, pageToken, Integer.parseInt(pageSize)); + + return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build(); + } + + public LoadViewResponse createView( + ViewCatalog catalog, Namespace namespace, CreateViewRequest request) { + request.validate(); + + ViewBuilder viewBuilder = + catalog + .buildView(TableIdentifier.of(namespace, request.name())) + .withSchema(request.schema()) + .withProperties(request.properties()) + .withDefaultNamespace(request.viewVersion().defaultNamespace()) + .withDefaultCatalog(request.viewVersion().defaultCatalog()) + .withLocation(request.location()); + + Set<String> unsupportedRepresentations = + request.viewVersion().representations().stream() + .filter(r -> !(r instanceof SQLViewRepresentation)) + .map(ViewRepresentation::type) + .collect(Collectors.toSet()); + + if (!unsupportedRepresentations.isEmpty()) { + throw new IllegalStateException( + String.format("Found unsupported view representations: %s", unsupportedRepresentations)); + } + + request.viewVersion().representations().stream() + .filter(SQLViewRepresentation.class::isInstance) + .map(SQLViewRepresentation.class::cast) + .forEach(r -> viewBuilder.withQuery(r.dialect(), r.sql())); + + View view = viewBuilder.create(); + + return viewResponse(view); + } + + private LoadViewResponse viewResponse(View view) { + ViewMetadata metadata = asBaseView(view).operations().current(); + return ImmutableLoadViewResponse.builder() + .metadata(metadata) + .metadataLocation(metadata.metadataFileLocation()) + .build(); + } + + public void viewExists(ViewCatalog catalog, TableIdentifier viewIdentifier) { + if (!catalog.viewExists(viewIdentifier)) { + throw new NoSuchViewException("View does not exist: %s", viewIdentifier); + } + } + + public LoadViewResponse loadView(ViewCatalog catalog, TableIdentifier viewIdentifier) { + View view = catalog.loadView(viewIdentifier); + return viewResponse(view); + } + + public LoadViewResponse updateView( + ViewCatalog catalog, TableIdentifier ident, UpdateTableRequest request) { + View view = catalog.loadView(ident); + ViewMetadata metadata = commit(asBaseView(view).operations(), request); + + return ImmutableLoadViewResponse.builder() + .metadata(metadata) + .metadataLocation(metadata.metadataFileLocation()) + .build(); + } + + public void renameView(ViewCatalog catalog, RenameTableRequest request) { + catalog.renameView(request.source(), request.destination()); + } + + public void dropView(ViewCatalog catalog, TableIdentifier viewIdentifier) { + boolean dropped = catalog.dropView(viewIdentifier); + if (!dropped) { + throw new NoSuchViewException("View does not exist: %s", viewIdentifier); + } + } + + protected ViewMetadata commit(ViewOperations ops, UpdateTableRequest request) { + AtomicBoolean isRetry = new AtomicBoolean(false); + try { + Tasks.foreach(ops) + .retry(maxCommitRetries()) + .exponentialBackoff( + COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, + COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, + COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, + 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run( + taskOps -> { + ViewMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); + isRetry.set(true); + + // validate requirements + try { + request.requirements().forEach(requirement -> requirement.validate(base)); + } catch (CommitFailedException e) { + // wrap and rethrow outside of tasks to avoid unnecessary retry + throw new ValidationFailureException(e); + } + + // apply changes + ViewMetadata.Builder metadataBuilder = ViewMetadata.buildFrom(base); + request.updates().forEach(update -> update.applyTo(metadataBuilder)); + + ViewMetadata updated = metadataBuilder.build(); + + if (updated.changes().isEmpty()) { + // do not commit if the metadata has not changed + return; + } + + // commit + taskOps.commit(base, updated); + }); + + } catch (ValidationFailureException e) { + throw e.wrapped(); + } + + return ops.current(); + } + + private int maxCommitRetries() { + return configurationStore.getConfiguration( + polarisCallContext, FeatureConfiguration.ICEBERG_COMMIT_MAX_RETRIES); + } +} diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 4484dc129..4c9c527d6 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -35,7 +35,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Function; -import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.BadRequestException; @@ -85,9 +84,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * {@link IcebergRestCatalogApiService} implementation that delegates operations to {@link - * org.apache.iceberg.rest.CatalogHandlers} after finding the appropriate {@link Catalog} for the - * current {@link RealmContext}. + * An adapter between generated service types like `IcebergRestCatalogApiService` and + * `IcebergCatalogHandler`. */ @RequestScoped public class IcebergCatalogAdapter @@ -140,6 +138,7 @@ public class IcebergCatalogAdapter private final PolarisAuthorizer polarisAuthorizer; private final CatalogPrefixParser prefixParser; private final ReservedProperties reservedProperties; + private final CatalogHandlerUtils catalogHandlerUtils; @Inject public IcebergCatalogAdapter( @@ -151,7 +150,8 @@ public class IcebergCatalogAdapter UserSecretsManager userSecretsManager, PolarisAuthorizer polarisAuthorizer, CatalogPrefixParser prefixParser, - ReservedProperties reservedProperties) { + ReservedProperties reservedProperties, + CatalogHandlerUtils catalogHandlerUtils) { this.realmContext = realmContext; this.callContext = callContext; this.catalogFactory = catalogFactory; @@ -161,6 +161,7 @@ public class IcebergCatalogAdapter this.polarisAuthorizer = polarisAuthorizer; this.prefixParser = prefixParser; this.reservedProperties = reservedProperties; + this.catalogHandlerUtils = catalogHandlerUtils; // FIXME: This is a hack to set the current context for downstream calls. CallContext.setCurrentContext(callContext); @@ -199,7 +200,8 @@ public class IcebergCatalogAdapter catalogFactory, catalogName, polarisAuthorizer, - reservedProperties); + reservedProperties, + catalogHandlerUtils); } @Override diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index aca12eb6f..972f6871e 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -55,7 +55,6 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.rest.CatalogHandlers; import org.apache.iceberg.rest.HTTPClient; import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.credentials.ImmutableCredential; @@ -128,6 +127,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab private final UserSecretsManager userSecretsManager; private final CallContextCatalogFactory catalogFactory; private final ReservedProperties reservedProperties; + private final CatalogHandlerUtils catalogHandlerUtils; // Catalog instance will be initialized after authorizing resolver successfully resolves // the catalog entity. @@ -147,12 +147,14 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab CallContextCatalogFactory catalogFactory, String catalogName, PolarisAuthorizer authorizer, - ReservedProperties reservedProperties) { + ReservedProperties reservedProperties, + CatalogHandlerUtils catalogHandlerUtils) { super(callContext, entityManager, securityContext, catalogName, authorizer); this.metaStoreManager = metaStoreManager; this.userSecretsManager = userSecretsManager; this.catalogFactory = catalogFactory; this.reservedProperties = reservedProperties; + this.catalogHandlerUtils = catalogHandlerUtils; } /** @@ -188,7 +190,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab .nextPageToken(results.pageToken.toTokenString()) .build(); } else { - return CatalogHandlers.listNamespaces( + return catalogHandlerUtils.listNamespaces( namespaceCatalog, parent, pageToken, String.valueOf(pageSize)); } } @@ -255,7 +257,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_NAMESPACES; authorizeBasicNamespaceOperationOrThrow(op, parent); - return CatalogHandlers.listNamespaces(namespaceCatalog, parent); + return catalogHandlerUtils.listNamespaces(namespaceCatalog, parent); } public CreateNamespaceResponse createNamespace(CreateNamespaceRequest request) { @@ -291,7 +293,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab .setProperties(filteredProperties) .build(); } else { - return CatalogHandlers.createNamespace(namespaceCatalog, request); + return catalogHandlerUtils.createNamespace(namespaceCatalog, request); } } @@ -305,7 +307,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LOAD_NAMESPACE_METADATA; authorizeBasicNamespaceOperationOrThrow(op, namespace); - return CatalogHandlers.loadNamespace(namespaceCatalog, namespace); + return catalogHandlerUtils.loadNamespace(namespaceCatalog, namespace); } public void namespaceExists(Namespace namespace) { @@ -320,14 +322,14 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab authorizeBasicNamespaceOperationOrThrow(op, namespace); // TODO: Just skip CatalogHandlers for this one maybe - CatalogHandlers.loadNamespace(namespaceCatalog, namespace); + catalogHandlerUtils.loadNamespace(namespaceCatalog, namespace); } public void dropNamespace(Namespace namespace) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.DROP_NAMESPACE; authorizeBasicNamespaceOperationOrThrow(op, namespace); - CatalogHandlers.dropNamespace(namespaceCatalog, namespace); + catalogHandlerUtils.dropNamespace(namespaceCatalog, namespace); } public UpdateNamespacePropertiesResponse updateNamespaceProperties( @@ -335,7 +337,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab PolarisAuthorizableOperation op = PolarisAuthorizableOperation.UPDATE_NAMESPACE_PROPERTIES; authorizeBasicNamespaceOperationOrThrow(op, namespace); - return CatalogHandlers.updateNamespaceProperties(namespaceCatalog, namespace, request); + return catalogHandlerUtils.updateNamespaceProperties(namespaceCatalog, namespace, request); } public ListTablesResponse listTables(Namespace namespace, String pageToken, Integer pageSize) { @@ -349,7 +351,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab .nextPageToken(results.pageToken.toTokenString()) .build(); } else { - return CatalogHandlers.listTables( + return catalogHandlerUtils.listTables( baseCatalog, namespace, pageToken, String.valueOf(pageSize)); } } @@ -358,7 +360,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_TABLES; authorizeBasicNamespaceOperationOrThrow(op, namespace); - return CatalogHandlers.listTables(baseCatalog, namespace); + return catalogHandlerUtils.listTables(baseCatalog, namespace); } /** @@ -391,7 +393,8 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab .withWriteOrder(request.writeOrder()) .setProperties(reservedProperties.removeReservedProperties(request.properties())) .build(); - return CatalogHandlers.createTable(baseCatalog, namespace, requestWithoutReservedProperties); + return catalogHandlerUtils.createTable( + baseCatalog, namespace, requestWithoutReservedProperties); } /** @@ -553,7 +556,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab authorizeCreateTableLikeUnderNamespaceOperationOrThrow( op, TableIdentifier.of(namespace, request.name())); - return CatalogHandlers.registerTable(baseCatalog, namespace, request); + return catalogHandlerUtils.registerTable(baseCatalog, namespace, request); } public boolean sendNotification(TableIdentifier identifier, NotificationRequest request) { @@ -646,7 +649,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab } } - LoadTableResponse rawResponse = CatalogHandlers.loadTable(baseCatalog, tableIdentifier); + LoadTableResponse rawResponse = catalogHandlerUtils.loadTable(baseCatalog, tableIdentifier); return Optional.of(filterResponseToSnapshots(rawResponse, snapshots)); } @@ -824,7 +827,8 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot update table on static-facade external catalogs."); } - return CatalogHandlers.updateTable(baseCatalog, tableIdentifier, applyUpdateFilters(request)); + return catalogHandlerUtils.updateTable( + baseCatalog, tableIdentifier, applyUpdateFilters(request)); } public LoadTableResponse updateTableForStagedCreate( @@ -841,7 +845,8 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot update table on static-facade external catalogs."); } - return CatalogHandlers.updateTable(baseCatalog, tableIdentifier, applyUpdateFilters(request)); + return catalogHandlerUtils.updateTable( + baseCatalog, tableIdentifier, applyUpdateFilters(request)); } public void dropTableWithoutPurge(TableIdentifier tableIdentifier) { @@ -849,7 +854,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab authorizeBasicTableLikeOperationOrThrow( op, PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier); - CatalogHandlers.dropTable(baseCatalog, tableIdentifier); + catalogHandlerUtils.dropTable(baseCatalog, tableIdentifier); } public void dropTableWithPurge(TableIdentifier tableIdentifier) { @@ -866,7 +871,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot drop table on static-facade external catalogs."); } - CatalogHandlers.purgeTable(baseCatalog, tableIdentifier); + catalogHandlerUtils.purgeTable(baseCatalog, tableIdentifier); } public void tableExists(TableIdentifier tableIdentifier) { @@ -875,7 +880,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab op, PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier); // TODO: Just skip CatalogHandlers for this one maybe - CatalogHandlers.loadTable(baseCatalog, tableIdentifier); + catalogHandlerUtils.loadTable(baseCatalog, tableIdentifier); } public void renameTable(RenameTableRequest request) { @@ -892,7 +897,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot rename table on static-facade external catalogs."); } - CatalogHandlers.renameTable(baseCatalog, request); + catalogHandlerUtils.renameTable(baseCatalog, request); } public void commitTransaction(CommitTransactionRequest commitTransactionRequest) { @@ -1012,7 +1017,8 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab .nextPageToken(results.pageToken.toTokenString()) .build(); } else if (baseCatalog instanceof ViewCatalog viewCatalog) { - return CatalogHandlers.listViews(viewCatalog, namespace, pageToken, String.valueOf(pageSize)); + return catalogHandlerUtils.listViews( + viewCatalog, namespace, pageToken, String.valueOf(pageSize)); } else { throw new BadRequestException( "Unsupported operation: listViews with baseCatalog type: %s", @@ -1024,7 +1030,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_VIEWS; authorizeBasicNamespaceOperationOrThrow(op, namespace); - return CatalogHandlers.listViews(viewCatalog, namespace); + return catalogHandlerUtils.listViews(viewCatalog, namespace); } public LoadViewResponse createView(Namespace namespace, CreateViewRequest request) { @@ -1041,14 +1047,14 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot create view on static-facade external catalogs."); } - return CatalogHandlers.createView(viewCatalog, namespace, request); + return catalogHandlerUtils.createView(viewCatalog, namespace, request); } public LoadViewResponse loadView(TableIdentifier viewIdentifier) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LOAD_VIEW; authorizeBasicTableLikeOperationOrThrow(op, PolarisEntitySubType.ICEBERG_VIEW, viewIdentifier); - return CatalogHandlers.loadView(viewCatalog, viewIdentifier); + return catalogHandlerUtils.loadView(viewCatalog, viewIdentifier); } public LoadViewResponse replaceView(TableIdentifier viewIdentifier, UpdateTableRequest request) { @@ -1064,14 +1070,14 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot replace view on static-facade external catalogs."); } - return CatalogHandlers.updateView(viewCatalog, viewIdentifier, applyUpdateFilters(request)); + return catalogHandlerUtils.updateView(viewCatalog, viewIdentifier, applyUpdateFilters(request)); } public void dropView(TableIdentifier viewIdentifier) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.DROP_VIEW; authorizeBasicTableLikeOperationOrThrow(op, PolarisEntitySubType.ICEBERG_VIEW, viewIdentifier); - CatalogHandlers.dropView(viewCatalog, viewIdentifier); + catalogHandlerUtils.dropView(viewCatalog, viewIdentifier); } public void viewExists(TableIdentifier viewIdentifier) { @@ -1079,7 +1085,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab authorizeBasicTableLikeOperationOrThrow(op, PolarisEntitySubType.ICEBERG_VIEW, viewIdentifier); // TODO: Just skip CatalogHandlers for this one maybe - CatalogHandlers.loadView(viewCatalog, viewIdentifier); + catalogHandlerUtils.loadView(viewCatalog, viewIdentifier); } public void renameView(RenameTableRequest request) { @@ -1096,7 +1102,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot rename view on static-facade external catalogs."); } - CatalogHandlers.renameView(viewCatalog, request); + catalogHandlerUtils.renameView(viewCatalog, request); } private @Nonnull LoadTableResponse filterResponseToSnapshots( diff --git a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 34a3caf45..4301fe993 100644 --- a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -49,6 +49,7 @@ import org.apache.polaris.service.admin.api.PolarisCatalogsApi; import org.apache.polaris.service.catalog.DefaultCatalogPrefixParser; import org.apache.polaris.service.catalog.api.IcebergRestCatalogApi; import org.apache.polaris.service.catalog.api.IcebergRestConfigurationApi; +import org.apache.polaris.service.catalog.iceberg.CatalogHandlerUtils; import org.apache.polaris.service.catalog.iceberg.IcebergCatalogAdapter; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory; @@ -191,6 +192,9 @@ public record TestServices( ReservedProperties reservedProperties = ReservedProperties.NONE; + CatalogHandlerUtils catalogHandlerUtils = + new CatalogHandlerUtils(callContext.getPolarisCallContext(), configurationStore); + IcebergCatalogAdapter service = new IcebergCatalogAdapter( realmContext, @@ -201,7 +205,8 @@ public record TestServices( userSecretsManager, authorizer, new DefaultCatalogPrefixParser(), - reservedProperties); + reservedProperties, + catalogHandlerUtils); IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(service); IcebergRestConfigurationApi restConfigurationApi = new IcebergRestConfigurationApi(service);