This is an automated email from the ASF dual-hosted git repository.

emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new f5feb95e7 Eliminate usage of CatalogHandlers; introduce 
CatalogHandlerUtils (#1576)
f5feb95e7 is described below

commit f5feb95e76928aad12bf7ee80f49b1d2a3e99558
Author: Eric Maynard <eric.maynard+...@snowflake.com>
AuthorDate: Thu May 15 09:30:42 2025 -0700

    Eliminate usage of CatalogHandlers; introduce CatalogHandlerUtils (#1576)
    
    * compiles
    
    * wire it up
    
    * De-static, add config
    
    * autolint
    
    * license entry
    
    * cut over
    
    * pull
    
    * revert helm doc
    
    * autolint
---
 LICENSE                                            |   1 +
 .../polaris/core/config/FeatureConfiguration.java  |   8 +
 .../quarkus/admin/PolarisAuthzTestBase.java        |   2 +
 .../catalog/IcebergCatalogHandlerAuthzTest.java    |   9 +-
 .../catalog/iceberg/CatalogHandlerUtils.java       | 614 +++++++++++++++++++++
 .../catalog/iceberg/IcebergCatalogAdapter.java     |  14 +-
 .../catalog/iceberg/IcebergCatalogHandler.java     |  62 ++-
 .../org/apache/polaris/service/TestServices.java   |   7 +-
 8 files changed, 679 insertions(+), 38 deletions(-)

diff --git a/LICENSE b/LICENSE
index f603db922..38b90ce86 100644
--- a/LICENSE
+++ b/LICENSE
@@ -219,6 +219,7 @@ This product includes code from Apache Iceberg.
 * spec/polaris-catalog-apis/oauth-tokens-api.yaml
 * 
integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java
 * 
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+* 
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
 * 
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
 * 
plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java
 * 
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
index 39a342e95..f06bfad45 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
@@ -261,4 +261,12 @@ public class FeatureConfiguration<T> extends 
PolarisConfiguration<T> {
           .description("The list of supported catalog connection types for 
federation")
           .defaultValue(List.of(ConnectionType.ICEBERG_REST.name()))
           .buildFeatureConfiguration();
+
+  public static final FeatureConfiguration<Integer> ICEBERG_COMMIT_MAX_RETRIES 
=
+      PolarisConfiguration.<Integer>builder()
+          .key("ICEBERG_COMMIT_MAX_RETRIES")
+          .catalogConfig("polaris.config.iceberg-commit-max-retries")
+          .description("The max number of times to try committing to an 
Iceberg table")
+          .defaultValue(4)
+          .buildFeatureConfiguration();
 }
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
index 37dcc04e5..8abb4c041 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
@@ -81,6 +81,7 @@ import 
org.apache.polaris.core.secrets.UserSecretsManagerFactory;
 import org.apache.polaris.service.admin.PolarisAdminService;
 import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
 import org.apache.polaris.service.catalog.generic.PolarisGenericTableCatalog;
+import org.apache.polaris.service.catalog.iceberg.CatalogHandlerUtils;
 import org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.catalog.policy.PolicyCatalog;
@@ -192,6 +193,7 @@ public abstract class PolarisAuthzTestBase {
   @Inject protected Clock clock;
   @Inject protected FileIOFactory fileIOFactory;
   @Inject protected PolarisEventListener polarisEventListener;
+  @Inject protected CatalogHandlerUtils catalogHandlerUtils;
 
   protected IcebergCatalog baseCatalog;
   protected PolarisGenericTableCatalog genericTableCatalog;
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
index cbfaa41f3..926e9791e 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
@@ -116,7 +116,8 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
         factory,
         catalogName,
         polarisAuthorizer,
-        reservedProperties);
+        reservedProperties,
+        catalogHandlerUtils);
   }
 
   /**
@@ -256,7 +257,8 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
             callContextCatalogFactory,
             CATALOG_NAME,
             polarisAuthorizer,
-            reservedProperties);
+            reservedProperties,
+            catalogHandlerUtils);
 
     // a variety of actions are all disallowed because the principal's 
credentials must be rotated
     doTestInsufficientPrivileges(
@@ -290,7 +292,8 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
             callContextCatalogFactory,
             CATALOG_NAME,
             polarisAuthorizer,
-            reservedProperties);
+            reservedProperties,
+            catalogHandlerUtils);
 
     doTestSufficientPrivilegeSets(
         List.of(Set.of(PolarisPrivilege.NAMESPACE_LIST)),
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
new file mode 100644
index 000000000..39f9b3352
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.catalog.iceberg;
+
+import static 
org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetadataTable;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.BaseTransaction;
+import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateRequirement;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.catalog.ViewCatalog;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.CreateViewRequest;
+import org.apache.iceberg.rest.requests.RegisterTableRequest;
+import org.apache.iceberg.rest.requests.RenameTableRequest;
+import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
+import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
+import org.apache.iceberg.rest.responses.GetNamespaceResponse;
+import org.apache.iceberg.rest.responses.ImmutableLoadViewResponse;
+import org.apache.iceberg.rest.responses.ListNamespacesResponse;
+import org.apache.iceberg.rest.responses.ListTablesResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.LoadViewResponse;
+import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.view.BaseView;
+import org.apache.iceberg.view.SQLViewRepresentation;
+import org.apache.iceberg.view.View;
+import org.apache.iceberg.view.ViewBuilder;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.iceberg.view.ViewOperations;
+import org.apache.iceberg.view.ViewRepresentation;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+
+/**
+ * CODE_COPIED_TO_POLARIS Copied from CatalogHandler in Iceberg 1.8.0 Contains 
a collection of
+ * utilities related to managing Iceberg entities
+ */
+@ApplicationScoped
+public class CatalogHandlerUtils {
+  private static final Schema EMPTY_SCHEMA = new Schema();
+  private static final String INITIAL_PAGE_TOKEN = "";
+
+  private final PolarisCallContext polarisCallContext;
+  private final PolarisConfigurationStore configurationStore;
+
+  @Inject
+  public CatalogHandlerUtils(
+      PolarisCallContext polarisCallContext, PolarisConfigurationStore 
configurationStore) {
+    this.polarisCallContext = polarisCallContext;
+    this.configurationStore = configurationStore;
+  }
+
+  /**
+   * Exception used to avoid retrying commits when assertions fail.
+   *
+   * <p>When a REST assertion fails, it will throw CommitFailedException to 
send back to the client.
+   * But the assertion checks happen in the block that is retried if {@link
+   * TableOperations#commit(TableMetadata, TableMetadata)} throws 
CommitFailedException. This is
+   * used to avoid retries for assertion failures, which are unwrapped and 
rethrown outside of the
+   * commit loop.
+   */
+  private static class ValidationFailureException extends RuntimeException {
+    private final CommitFailedException wrapped;
+
+    private ValidationFailureException(CommitFailedException cause) {
+      super(cause);
+      this.wrapped = cause;
+    }
+
+    public CommitFailedException wrapped() {
+      return wrapped;
+    }
+  }
+
+  private <T> Pair<List<T>, String> paginate(List<T> list, String pageToken, 
int pageSize) {
+    int pageStart = INITIAL_PAGE_TOKEN.equals(pageToken) ? 0 : 
Integer.parseInt(pageToken);
+    if (pageStart >= list.size()) {
+      return Pair.of(Collections.emptyList(), null);
+    }
+
+    int end = Math.min(pageStart + pageSize, list.size());
+    List<T> subList = list.subList(pageStart, end);
+    String nextPageToken = end >= list.size() ? null : String.valueOf(end);
+
+    return Pair.of(subList, nextPageToken);
+  }
+
+  public ListNamespacesResponse listNamespaces(SupportsNamespaces catalog, 
Namespace parent) {
+    List<Namespace> results;
+    if (parent.isEmpty()) {
+      results = catalog.listNamespaces();
+    } else {
+      results = catalog.listNamespaces(parent);
+    }
+
+    return ListNamespacesResponse.builder().addAll(results).build();
+  }
+
+  public ListNamespacesResponse listNamespaces(
+      SupportsNamespaces catalog, Namespace parent, String pageToken, String 
pageSize) {
+    List<Namespace> results;
+
+    if (parent.isEmpty()) {
+      results = catalog.listNamespaces();
+    } else {
+      results = catalog.listNamespaces(parent);
+    }
+
+    Pair<List<Namespace>, String> page = paginate(results, pageToken, 
Integer.parseInt(pageSize));
+
+    return ListNamespacesResponse.builder()
+        .addAll(page.first())
+        .nextPageToken(page.second())
+        .build();
+  }
+
+  public CreateNamespaceResponse createNamespace(
+      SupportsNamespaces catalog, CreateNamespaceRequest request) {
+    Namespace namespace = request.namespace();
+    catalog.createNamespace(namespace, request.properties());
+    return CreateNamespaceResponse.builder()
+        .withNamespace(namespace)
+        .setProperties(catalog.loadNamespaceMetadata(namespace))
+        .build();
+  }
+
+  public void namespaceExists(SupportsNamespaces catalog, Namespace namespace) 
{
+    if (!catalog.namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+    }
+  }
+
+  public GetNamespaceResponse loadNamespace(SupportsNamespaces catalog, 
Namespace namespace) {
+    Map<String, String> properties = catalog.loadNamespaceMetadata(namespace);
+    return GetNamespaceResponse.builder()
+        .withNamespace(namespace)
+        .setProperties(properties)
+        .build();
+  }
+
+  public void dropNamespace(SupportsNamespaces catalog, Namespace namespace) {
+    boolean dropped = catalog.dropNamespace(namespace);
+    if (!dropped) {
+      throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+    }
+  }
+
+  public UpdateNamespacePropertiesResponse updateNamespaceProperties(
+      SupportsNamespaces catalog, Namespace namespace, 
UpdateNamespacePropertiesRequest request) {
+    request.validate();
+
+    Set<String> removals = Sets.newHashSet(request.removals());
+    Map<String, String> updates = request.updates();
+
+    Map<String, String> startProperties = 
catalog.loadNamespaceMetadata(namespace);
+    Set<String> missing = Sets.difference(removals, startProperties.keySet());
+
+    if (!updates.isEmpty()) {
+      catalog.setProperties(namespace, updates);
+    }
+
+    if (!removals.isEmpty()) {
+      // remove the original set just in case there was an update just after 
loading properties
+      catalog.removeProperties(namespace, removals);
+    }
+
+    return UpdateNamespacePropertiesResponse.builder()
+        .addMissing(missing)
+        .addUpdated(updates.keySet())
+        .addRemoved(Sets.difference(removals, missing))
+        .build();
+  }
+
+  public ListTablesResponse listTables(Catalog catalog, Namespace namespace) {
+    List<TableIdentifier> idents = catalog.listTables(namespace);
+    return ListTablesResponse.builder().addAll(idents).build();
+  }
+
+  public ListTablesResponse listTables(
+      Catalog catalog, Namespace namespace, String pageToken, String pageSize) 
{
+    List<TableIdentifier> results = catalog.listTables(namespace);
+
+    Pair<List<TableIdentifier>, String> page =
+        paginate(results, pageToken, Integer.parseInt(pageSize));
+
+    return 
ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build();
+  }
+
+  public LoadTableResponse stageTableCreate(
+      Catalog catalog, Namespace namespace, CreateTableRequest request) {
+    request.validate();
+
+    TableIdentifier ident = TableIdentifier.of(namespace, request.name());
+    if (catalog.tableExists(ident)) {
+      throw new AlreadyExistsException("Table already exists: %s", ident);
+    }
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("created-at", 
OffsetDateTime.now(ZoneOffset.UTC).toString());
+    properties.putAll(request.properties());
+
+    String location;
+    if (request.location() != null) {
+      location = request.location();
+    } else {
+      location =
+          catalog
+              .buildTable(ident, request.schema())
+              .withPartitionSpec(request.spec())
+              .withSortOrder(request.writeOrder())
+              .withProperties(properties)
+              .createTransaction()
+              .table()
+              .location();
+    }
+
+    TableMetadata metadata =
+        TableMetadata.newTableMetadata(
+            request.schema(),
+            request.spec() != null ? request.spec() : 
PartitionSpec.unpartitioned(),
+            request.writeOrder() != null ? request.writeOrder() : 
SortOrder.unsorted(),
+            location,
+            properties);
+
+    return LoadTableResponse.builder().withTableMetadata(metadata).build();
+  }
+
+  public LoadTableResponse createTable(
+      Catalog catalog, Namespace namespace, CreateTableRequest request) {
+    request.validate();
+
+    TableIdentifier ident = TableIdentifier.of(namespace, request.name());
+    Table table =
+        catalog
+            .buildTable(ident, request.schema())
+            .withLocation(request.location())
+            .withPartitionSpec(request.spec())
+            .withSortOrder(request.writeOrder())
+            .withProperties(request.properties())
+            .create();
+
+    if (table instanceof BaseTable) {
+      return LoadTableResponse.builder()
+          .withTableMetadata(((BaseTable) table).operations().current())
+          .build();
+    }
+
+    throw new IllegalStateException("Cannot wrap catalog that does not produce 
BaseTable");
+  }
+
+  public LoadTableResponse registerTable(
+      Catalog catalog, Namespace namespace, RegisterTableRequest request) {
+    request.validate();
+
+    TableIdentifier identifier = TableIdentifier.of(namespace, request.name());
+    Table table = catalog.registerTable(identifier, 
request.metadataLocation());
+    if (table instanceof BaseTable) {
+      return LoadTableResponse.builder()
+          .withTableMetadata(((BaseTable) table).operations().current())
+          .build();
+    }
+
+    throw new IllegalStateException("Cannot wrap catalog that does not produce 
BaseTable");
+  }
+
+  public void dropTable(Catalog catalog, TableIdentifier ident) {
+    boolean dropped = catalog.dropTable(ident, false);
+    if (!dropped) {
+      throw new NoSuchTableException("Table does not exist: %s", ident);
+    }
+  }
+
+  public void purgeTable(Catalog catalog, TableIdentifier ident) {
+    boolean dropped = catalog.dropTable(ident, true);
+    if (!dropped) {
+      throw new NoSuchTableException("Table does not exist: %s", ident);
+    }
+  }
+
+  public void tableExists(Catalog catalog, TableIdentifier ident) {
+    boolean exists = catalog.tableExists(ident);
+    if (!exists) {
+      throw new NoSuchTableException("Table does not exist: %s", ident);
+    }
+  }
+
+  public LoadTableResponse loadTable(Catalog catalog, TableIdentifier ident) {
+    Table table = catalog.loadTable(ident);
+
+    if (table instanceof BaseTable) {
+      return LoadTableResponse.builder()
+          .withTableMetadata(((BaseTable) table).operations().current())
+          .build();
+    } else if (table instanceof BaseMetadataTable) {
+      // metadata tables are loaded on the client side, return 
NoSuchTableException for now
+      throw new NoSuchTableException("Table does not exist: %s", 
ident.toString());
+    }
+
+    throw new IllegalStateException("Cannot wrap catalog that does not produce 
BaseTable");
+  }
+
+  public LoadTableResponse updateTable(
+      Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
+    TableMetadata finalMetadata;
+    if (isCreate(request)) {
+      // this is a hacky way to get TableOperations for an uncommitted table
+      Transaction transaction =
+          catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
+      if (transaction instanceof BaseTransaction) {
+        BaseTransaction baseTransaction = (BaseTransaction) transaction;
+        finalMetadata = create(baseTransaction.underlyingOps(), request);
+      } else {
+        throw new IllegalStateException(
+            "Cannot wrap catalog that does not produce BaseTransaction");
+      }
+
+    } else {
+      Table table = catalog.loadTable(ident);
+      if (table instanceof BaseTable) {
+        TableOperations ops = ((BaseTable) table).operations();
+        finalMetadata = commit(ops, request);
+      } else {
+        throw new IllegalStateException("Cannot wrap catalog that does not 
produce BaseTable");
+      }
+    }
+
+    return 
LoadTableResponse.builder().withTableMetadata(finalMetadata).build();
+  }
+
+  public void renameTable(Catalog catalog, RenameTableRequest request) {
+    catalog.renameTable(request.source(), request.destination());
+  }
+
+  private boolean isCreate(UpdateTableRequest request) {
+    boolean isCreate =
+        request.requirements().stream()
+            
.anyMatch(UpdateRequirement.AssertTableDoesNotExist.class::isInstance);
+
+    if (isCreate) {
+      List<UpdateRequirement> invalidRequirements =
+          request.requirements().stream()
+              .filter(req -> !(req instanceof 
UpdateRequirement.AssertTableDoesNotExist))
+              .collect(Collectors.toList());
+      Preconditions.checkArgument(
+          invalidRequirements.isEmpty(), "Invalid create requirements: %s", 
invalidRequirements);
+    }
+
+    return isCreate;
+  }
+
+  private TableMetadata create(TableOperations ops, UpdateTableRequest 
request) {
+    // the only valid requirement is that the table will be created
+    request.requirements().forEach(requirement -> 
requirement.validate(ops.current()));
+    Optional<Integer> formatVersion =
+        request.updates().stream()
+            .filter(update -> update instanceof UpgradeFormatVersion)
+            .map(update -> ((UpgradeFormatVersion) update).formatVersion())
+            .findFirst();
+
+    TableMetadata.Builder builder =
+        
formatVersion.map(TableMetadata::buildFromEmpty).orElseGet(TableMetadata::buildFromEmpty);
+    request.updates().forEach(update -> update.applyTo(builder));
+    // create transactions do not retry. if the table exists, retrying is not 
a solution
+    ops.commit(null, builder.build());
+
+    return ops.current();
+  }
+
+  protected TableMetadata commit(TableOperations ops, UpdateTableRequest 
request) {
+    AtomicBoolean isRetry = new AtomicBoolean(false);
+    try {
+      Tasks.foreach(ops)
+          .retry(maxCommitRetries())
+          .exponentialBackoff(
+              COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
+              COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
+              COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
+              2.0 /* exponential */)
+          .onlyRetryOn(CommitFailedException.class)
+          .run(
+              taskOps -> {
+                TableMetadata base = isRetry.get() ? taskOps.refresh() : 
taskOps.current();
+                isRetry.set(true);
+
+                // validate requirements
+                try {
+                  request.requirements().forEach(requirement -> 
requirement.validate(base));
+                } catch (CommitFailedException e) {
+                  // wrap and rethrow outside of tasks to avoid unnecessary 
retry
+                  throw new ValidationFailureException(e);
+                }
+
+                // apply changes
+                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
+                request.updates().forEach(update -> 
update.applyTo(metadataBuilder));
+
+                TableMetadata updated = metadataBuilder.build();
+                if (updated.changes().isEmpty()) {
+                  // do not commit if the metadata has not changed
+                  return;
+                }
+
+                // commit
+                taskOps.commit(base, updated);
+              });
+
+    } catch (ValidationFailureException e) {
+      throw e.wrapped();
+    }
+
+    return ops.current();
+  }
+
+  private BaseView asBaseView(View view) {
+    Preconditions.checkState(
+        view instanceof BaseView, "Cannot wrap catalog that does not produce 
BaseView");
+    return (BaseView) view;
+  }
+
+  public ListTablesResponse listViews(ViewCatalog catalog, Namespace 
namespace) {
+    return 
ListTablesResponse.builder().addAll(catalog.listViews(namespace)).build();
+  }
+
+  public ListTablesResponse listViews(
+      ViewCatalog catalog, Namespace namespace, String pageToken, String 
pageSize) {
+    List<TableIdentifier> results = catalog.listViews(namespace);
+
+    Pair<List<TableIdentifier>, String> page =
+        paginate(results, pageToken, Integer.parseInt(pageSize));
+
+    return 
ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build();
+  }
+
+  public LoadViewResponse createView(
+      ViewCatalog catalog, Namespace namespace, CreateViewRequest request) {
+    request.validate();
+
+    ViewBuilder viewBuilder =
+        catalog
+            .buildView(TableIdentifier.of(namespace, request.name()))
+            .withSchema(request.schema())
+            .withProperties(request.properties())
+            .withDefaultNamespace(request.viewVersion().defaultNamespace())
+            .withDefaultCatalog(request.viewVersion().defaultCatalog())
+            .withLocation(request.location());
+
+    Set<String> unsupportedRepresentations =
+        request.viewVersion().representations().stream()
+            .filter(r -> !(r instanceof SQLViewRepresentation))
+            .map(ViewRepresentation::type)
+            .collect(Collectors.toSet());
+
+    if (!unsupportedRepresentations.isEmpty()) {
+      throw new IllegalStateException(
+          String.format("Found unsupported view representations: %s", 
unsupportedRepresentations));
+    }
+
+    request.viewVersion().representations().stream()
+        .filter(SQLViewRepresentation.class::isInstance)
+        .map(SQLViewRepresentation.class::cast)
+        .forEach(r -> viewBuilder.withQuery(r.dialect(), r.sql()));
+
+    View view = viewBuilder.create();
+
+    return viewResponse(view);
+  }
+
+  private LoadViewResponse viewResponse(View view) {
+    ViewMetadata metadata = asBaseView(view).operations().current();
+    return ImmutableLoadViewResponse.builder()
+        .metadata(metadata)
+        .metadataLocation(metadata.metadataFileLocation())
+        .build();
+  }
+
+  public void viewExists(ViewCatalog catalog, TableIdentifier viewIdentifier) {
+    if (!catalog.viewExists(viewIdentifier)) {
+      throw new NoSuchViewException("View does not exist: %s", viewIdentifier);
+    }
+  }
+
+  public LoadViewResponse loadView(ViewCatalog catalog, TableIdentifier 
viewIdentifier) {
+    View view = catalog.loadView(viewIdentifier);
+    return viewResponse(view);
+  }
+
+  public LoadViewResponse updateView(
+      ViewCatalog catalog, TableIdentifier ident, UpdateTableRequest request) {
+    View view = catalog.loadView(ident);
+    ViewMetadata metadata = commit(asBaseView(view).operations(), request);
+
+    return ImmutableLoadViewResponse.builder()
+        .metadata(metadata)
+        .metadataLocation(metadata.metadataFileLocation())
+        .build();
+  }
+
+  public void renameView(ViewCatalog catalog, RenameTableRequest request) {
+    catalog.renameView(request.source(), request.destination());
+  }
+
+  public void dropView(ViewCatalog catalog, TableIdentifier viewIdentifier) {
+    boolean dropped = catalog.dropView(viewIdentifier);
+    if (!dropped) {
+      throw new NoSuchViewException("View does not exist: %s", viewIdentifier);
+    }
+  }
+
+  protected ViewMetadata commit(ViewOperations ops, UpdateTableRequest 
request) {
+    AtomicBoolean isRetry = new AtomicBoolean(false);
+    try {
+      Tasks.foreach(ops)
+          .retry(maxCommitRetries())
+          .exponentialBackoff(
+              COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
+              COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
+              COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
+              2.0 /* exponential */)
+          .onlyRetryOn(CommitFailedException.class)
+          .run(
+              taskOps -> {
+                ViewMetadata base = isRetry.get() ? taskOps.refresh() : 
taskOps.current();
+                isRetry.set(true);
+
+                // validate requirements
+                try {
+                  request.requirements().forEach(requirement -> 
requirement.validate(base));
+                } catch (CommitFailedException e) {
+                  // wrap and rethrow outside of tasks to avoid unnecessary 
retry
+                  throw new ValidationFailureException(e);
+                }
+
+                // apply changes
+                ViewMetadata.Builder metadataBuilder = 
ViewMetadata.buildFrom(base);
+                request.updates().forEach(update -> 
update.applyTo(metadataBuilder));
+
+                ViewMetadata updated = metadataBuilder.build();
+
+                if (updated.changes().isEmpty()) {
+                  // do not commit if the metadata has not changed
+                  return;
+                }
+
+                // commit
+                taskOps.commit(base, updated);
+              });
+
+    } catch (ValidationFailureException e) {
+      throw e.wrapped();
+    }
+
+    return ops.current();
+  }
+
+  private int maxCommitRetries() {
+    return configurationStore.getConfiguration(
+        polarisCallContext, FeatureConfiguration.ICEBERG_COMMIT_MAX_RETRIES);
+  }
+}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
index 4484dc129..4c9c527d6 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
@@ -35,7 +35,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
-import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.BadRequestException;
@@ -85,9 +84,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * {@link IcebergRestCatalogApiService} implementation that delegates 
operations to {@link
- * org.apache.iceberg.rest.CatalogHandlers} after finding the appropriate 
{@link Catalog} for the
- * current {@link RealmContext}.
+ * An adapter between generated service types like 
`IcebergRestCatalogApiService` and
+ * `IcebergCatalogHandler`.
  */
 @RequestScoped
 public class IcebergCatalogAdapter
@@ -140,6 +138,7 @@ public class IcebergCatalogAdapter
   private final PolarisAuthorizer polarisAuthorizer;
   private final CatalogPrefixParser prefixParser;
   private final ReservedProperties reservedProperties;
+  private final CatalogHandlerUtils catalogHandlerUtils;
 
   @Inject
   public IcebergCatalogAdapter(
@@ -151,7 +150,8 @@ public class IcebergCatalogAdapter
       UserSecretsManager userSecretsManager,
       PolarisAuthorizer polarisAuthorizer,
       CatalogPrefixParser prefixParser,
-      ReservedProperties reservedProperties) {
+      ReservedProperties reservedProperties,
+      CatalogHandlerUtils catalogHandlerUtils) {
     this.realmContext = realmContext;
     this.callContext = callContext;
     this.catalogFactory = catalogFactory;
@@ -161,6 +161,7 @@ public class IcebergCatalogAdapter
     this.polarisAuthorizer = polarisAuthorizer;
     this.prefixParser = prefixParser;
     this.reservedProperties = reservedProperties;
+    this.catalogHandlerUtils = catalogHandlerUtils;
 
     // FIXME: This is a hack to set the current context for downstream calls.
     CallContext.setCurrentContext(callContext);
@@ -199,7 +200,8 @@ public class IcebergCatalogAdapter
         catalogFactory,
         catalogName,
         polarisAuthorizer,
-        reservedProperties);
+        reservedProperties,
+        catalogHandlerUtils);
   }
 
   @Override
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index aca12eb6f..972f6871e 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -55,7 +55,6 @@ import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.ForbiddenException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hadoop.HadoopCatalog;
-import org.apache.iceberg.rest.CatalogHandlers;
 import org.apache.iceberg.rest.HTTPClient;
 import org.apache.iceberg.rest.RESTCatalog;
 import org.apache.iceberg.rest.credentials.ImmutableCredential;
@@ -128,6 +127,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
   private final UserSecretsManager userSecretsManager;
   private final CallContextCatalogFactory catalogFactory;
   private final ReservedProperties reservedProperties;
+  private final CatalogHandlerUtils catalogHandlerUtils;
 
   // Catalog instance will be initialized after authorizing resolver 
successfully resolves
   // the catalog entity.
@@ -147,12 +147,14 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
       CallContextCatalogFactory catalogFactory,
       String catalogName,
       PolarisAuthorizer authorizer,
-      ReservedProperties reservedProperties) {
+      ReservedProperties reservedProperties,
+      CatalogHandlerUtils catalogHandlerUtils) {
     super(callContext, entityManager, securityContext, catalogName, 
authorizer);
     this.metaStoreManager = metaStoreManager;
     this.userSecretsManager = userSecretsManager;
     this.catalogFactory = catalogFactory;
     this.reservedProperties = reservedProperties;
+    this.catalogHandlerUtils = catalogHandlerUtils;
   }
 
   /**
@@ -188,7 +190,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
           .nextPageToken(results.pageToken.toTokenString())
           .build();
     } else {
-      return CatalogHandlers.listNamespaces(
+      return catalogHandlerUtils.listNamespaces(
           namespaceCatalog, parent, pageToken, String.valueOf(pageSize));
     }
   }
@@ -255,7 +257,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.LIST_NAMESPACES;
     authorizeBasicNamespaceOperationOrThrow(op, parent);
 
-    return CatalogHandlers.listNamespaces(namespaceCatalog, parent);
+    return catalogHandlerUtils.listNamespaces(namespaceCatalog, parent);
   }
 
   public CreateNamespaceResponse createNamespace(CreateNamespaceRequest 
request) {
@@ -291,7 +293,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
           .setProperties(filteredProperties)
           .build();
     } else {
-      return CatalogHandlers.createNamespace(namespaceCatalog, request);
+      return catalogHandlerUtils.createNamespace(namespaceCatalog, request);
     }
   }
 
@@ -305,7 +307,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.LOAD_NAMESPACE_METADATA;
     authorizeBasicNamespaceOperationOrThrow(op, namespace);
 
-    return CatalogHandlers.loadNamespace(namespaceCatalog, namespace);
+    return catalogHandlerUtils.loadNamespace(namespaceCatalog, namespace);
   }
 
   public void namespaceExists(Namespace namespace) {
@@ -320,14 +322,14 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     authorizeBasicNamespaceOperationOrThrow(op, namespace);
 
     // TODO: Just skip CatalogHandlers for this one maybe
-    CatalogHandlers.loadNamespace(namespaceCatalog, namespace);
+    catalogHandlerUtils.loadNamespace(namespaceCatalog, namespace);
   }
 
   public void dropNamespace(Namespace namespace) {
     PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.DROP_NAMESPACE;
     authorizeBasicNamespaceOperationOrThrow(op, namespace);
 
-    CatalogHandlers.dropNamespace(namespaceCatalog, namespace);
+    catalogHandlerUtils.dropNamespace(namespaceCatalog, namespace);
   }
 
   public UpdateNamespacePropertiesResponse updateNamespaceProperties(
@@ -335,7 +337,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.UPDATE_NAMESPACE_PROPERTIES;
     authorizeBasicNamespaceOperationOrThrow(op, namespace);
 
-    return CatalogHandlers.updateNamespaceProperties(namespaceCatalog, 
namespace, request);
+    return catalogHandlerUtils.updateNamespaceProperties(namespaceCatalog, 
namespace, request);
   }
 
   public ListTablesResponse listTables(Namespace namespace, String pageToken, 
Integer pageSize) {
@@ -349,7 +351,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
           .nextPageToken(results.pageToken.toTokenString())
           .build();
     } else {
-      return CatalogHandlers.listTables(
+      return catalogHandlerUtils.listTables(
           baseCatalog, namespace, pageToken, String.valueOf(pageSize));
     }
   }
@@ -358,7 +360,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_TABLES;
     authorizeBasicNamespaceOperationOrThrow(op, namespace);
 
-    return CatalogHandlers.listTables(baseCatalog, namespace);
+    return catalogHandlerUtils.listTables(baseCatalog, namespace);
   }
 
   /**
@@ -391,7 +393,8 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
             .withWriteOrder(request.writeOrder())
             
.setProperties(reservedProperties.removeReservedProperties(request.properties()))
             .build();
-    return CatalogHandlers.createTable(baseCatalog, namespace, 
requestWithoutReservedProperties);
+    return catalogHandlerUtils.createTable(
+        baseCatalog, namespace, requestWithoutReservedProperties);
   }
 
   /**
@@ -553,7 +556,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
         op, TableIdentifier.of(namespace, request.name()));
 
-    return CatalogHandlers.registerTable(baseCatalog, namespace, request);
+    return catalogHandlerUtils.registerTable(baseCatalog, namespace, request);
   }
 
   public boolean sendNotification(TableIdentifier identifier, 
NotificationRequest request) {
@@ -646,7 +649,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
       }
     }
 
-    LoadTableResponse rawResponse = CatalogHandlers.loadTable(baseCatalog, 
tableIdentifier);
+    LoadTableResponse rawResponse = catalogHandlerUtils.loadTable(baseCatalog, 
tableIdentifier);
     return Optional.of(filterResponseToSnapshots(rawResponse, snapshots));
   }
 
@@ -824,7 +827,8 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     if (isStaticFacade(catalog)) {
       throw new BadRequestException("Cannot update table on static-facade 
external catalogs.");
     }
-    return CatalogHandlers.updateTable(baseCatalog, tableIdentifier, 
applyUpdateFilters(request));
+    return catalogHandlerUtils.updateTable(
+        baseCatalog, tableIdentifier, applyUpdateFilters(request));
   }
 
   public LoadTableResponse updateTableForStagedCreate(
@@ -841,7 +845,8 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     if (isStaticFacade(catalog)) {
       throw new BadRequestException("Cannot update table on static-facade 
external catalogs.");
     }
-    return CatalogHandlers.updateTable(baseCatalog, tableIdentifier, 
applyUpdateFilters(request));
+    return catalogHandlerUtils.updateTable(
+        baseCatalog, tableIdentifier, applyUpdateFilters(request));
   }
 
   public void dropTableWithoutPurge(TableIdentifier tableIdentifier) {
@@ -849,7 +854,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     authorizeBasicTableLikeOperationOrThrow(
         op, PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier);
 
-    CatalogHandlers.dropTable(baseCatalog, tableIdentifier);
+    catalogHandlerUtils.dropTable(baseCatalog, tableIdentifier);
   }
 
   public void dropTableWithPurge(TableIdentifier tableIdentifier) {
@@ -866,7 +871,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     if (isStaticFacade(catalog)) {
       throw new BadRequestException("Cannot drop table on static-facade 
external catalogs.");
     }
-    CatalogHandlers.purgeTable(baseCatalog, tableIdentifier);
+    catalogHandlerUtils.purgeTable(baseCatalog, tableIdentifier);
   }
 
   public void tableExists(TableIdentifier tableIdentifier) {
@@ -875,7 +880,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
         op, PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier);
 
     // TODO: Just skip CatalogHandlers for this one maybe
-    CatalogHandlers.loadTable(baseCatalog, tableIdentifier);
+    catalogHandlerUtils.loadTable(baseCatalog, tableIdentifier);
   }
 
   public void renameTable(RenameTableRequest request) {
@@ -892,7 +897,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     if (isStaticFacade(catalog)) {
       throw new BadRequestException("Cannot rename table on static-facade 
external catalogs.");
     }
-    CatalogHandlers.renameTable(baseCatalog, request);
+    catalogHandlerUtils.renameTable(baseCatalog, request);
   }
 
   public void commitTransaction(CommitTransactionRequest 
commitTransactionRequest) {
@@ -1012,7 +1017,8 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
           .nextPageToken(results.pageToken.toTokenString())
           .build();
     } else if (baseCatalog instanceof ViewCatalog viewCatalog) {
-      return CatalogHandlers.listViews(viewCatalog, namespace, pageToken, 
String.valueOf(pageSize));
+      return catalogHandlerUtils.listViews(
+          viewCatalog, namespace, pageToken, String.valueOf(pageSize));
     } else {
       throw new BadRequestException(
           "Unsupported operation: listViews with baseCatalog type: %s",
@@ -1024,7 +1030,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_VIEWS;
     authorizeBasicNamespaceOperationOrThrow(op, namespace);
 
-    return CatalogHandlers.listViews(viewCatalog, namespace);
+    return catalogHandlerUtils.listViews(viewCatalog, namespace);
   }
 
   public LoadViewResponse createView(Namespace namespace, CreateViewRequest 
request) {
@@ -1041,14 +1047,14 @@ public class IcebergCatalogHandler extends 
CatalogHandler implements AutoCloseab
     if (isStaticFacade(catalog)) {
       throw new BadRequestException("Cannot create view on static-facade 
external catalogs.");
     }
-    return CatalogHandlers.createView(viewCatalog, namespace, request);
+    return catalogHandlerUtils.createView(viewCatalog, namespace, request);
   }
 
   public LoadViewResponse loadView(TableIdentifier viewIdentifier) {
     PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LOAD_VIEW;
     authorizeBasicTableLikeOperationOrThrow(op, 
PolarisEntitySubType.ICEBERG_VIEW, viewIdentifier);
 
-    return CatalogHandlers.loadView(viewCatalog, viewIdentifier);
+    return catalogHandlerUtils.loadView(viewCatalog, viewIdentifier);
   }
 
   public LoadViewResponse replaceView(TableIdentifier viewIdentifier, 
UpdateTableRequest request) {
@@ -1064,14 +1070,14 @@ public class IcebergCatalogHandler extends 
CatalogHandler implements AutoCloseab
     if (isStaticFacade(catalog)) {
       throw new BadRequestException("Cannot replace view on static-facade 
external catalogs.");
     }
-    return CatalogHandlers.updateView(viewCatalog, viewIdentifier, 
applyUpdateFilters(request));
+    return catalogHandlerUtils.updateView(viewCatalog, viewIdentifier, 
applyUpdateFilters(request));
   }
 
   public void dropView(TableIdentifier viewIdentifier) {
     PolarisAuthorizableOperation op = PolarisAuthorizableOperation.DROP_VIEW;
     authorizeBasicTableLikeOperationOrThrow(op, 
PolarisEntitySubType.ICEBERG_VIEW, viewIdentifier);
 
-    CatalogHandlers.dropView(viewCatalog, viewIdentifier);
+    catalogHandlerUtils.dropView(viewCatalog, viewIdentifier);
   }
 
   public void viewExists(TableIdentifier viewIdentifier) {
@@ -1079,7 +1085,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     authorizeBasicTableLikeOperationOrThrow(op, 
PolarisEntitySubType.ICEBERG_VIEW, viewIdentifier);
 
     // TODO: Just skip CatalogHandlers for this one maybe
-    CatalogHandlers.loadView(viewCatalog, viewIdentifier);
+    catalogHandlerUtils.loadView(viewCatalog, viewIdentifier);
   }
 
   public void renameView(RenameTableRequest request) {
@@ -1096,7 +1102,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     if (isStaticFacade(catalog)) {
       throw new BadRequestException("Cannot rename view on static-facade 
external catalogs.");
     }
-    CatalogHandlers.renameView(viewCatalog, request);
+    catalogHandlerUtils.renameView(viewCatalog, request);
   }
 
   private @Nonnull LoadTableResponse filterResponseToSnapshots(
diff --git 
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
 
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index 34a3caf45..4301fe993 100644
--- 
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++ 
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -49,6 +49,7 @@ import 
org.apache.polaris.service.admin.api.PolarisCatalogsApi;
 import org.apache.polaris.service.catalog.DefaultCatalogPrefixParser;
 import org.apache.polaris.service.catalog.api.IcebergRestCatalogApi;
 import org.apache.polaris.service.catalog.api.IcebergRestConfigurationApi;
+import org.apache.polaris.service.catalog.iceberg.CatalogHandlerUtils;
 import org.apache.polaris.service.catalog.iceberg.IcebergCatalogAdapter;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
@@ -191,6 +192,9 @@ public record TestServices(
 
       ReservedProperties reservedProperties = ReservedProperties.NONE;
 
+      CatalogHandlerUtils catalogHandlerUtils =
+          new CatalogHandlerUtils(callContext.getPolarisCallContext(), 
configurationStore);
+
       IcebergCatalogAdapter service =
           new IcebergCatalogAdapter(
               realmContext,
@@ -201,7 +205,8 @@ public record TestServices(
               userSecretsManager,
               authorizer,
               new DefaultCatalogPrefixParser(),
-              reservedProperties);
+              reservedProperties,
+              catalogHandlerUtils);
 
       IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(service);
       IcebergRestConfigurationApi restConfigurationApi = new 
IcebergRestConfigurationApi(service);


Reply via email to