flyrain commented on code in PR #3719:
URL: https://github.com/apache/polaris/pull/3719#discussion_r2875618292


##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java:
##########
@@ -287,25 +287,75 @@ protected Map<String, String> properties() {
 
   @Override
   public Table registerTable(TableIdentifier identifier, String 
metadataFileLocation) {
+    return registerTable(identifier, metadataFileLocation, false);
+  }
+
+  /**
+   * Register a table with optional overwrite semantics.
+   *
+   * <p>When {@code overwrite} is false (the default) this behaves like a 
normal register and will
+   * fail if the table already exists. When {@code overwrite} is true and the 
named table already
+   * exists, this method updates the table's stored metadata-location to point 
at the provided
+   * metadata file. The overwrite path performs additional validation to 
ensure the supplied
+   * metadata file and its location are consistent with the table's resolved 
storage configuration.
+   *
+   * @param identifier the table identifier
+   * @param metadataFileLocation the metadata file location
+   * @param overwrite if true, update existing table metadata; if false, throw 
exception if table
+   *     exists
+   * @return the registered table
+   */
+  public Table registerTable(
+      TableIdentifier identifier, String metadataFileLocation, boolean 
overwrite) {
+    LOGGER.debug(
+        "registerTable called with identifier={}, metadataFileLocation={}, 
overwrite={}",
+        identifier,
+        metadataFileLocation,
+        overwrite);
     Preconditions.checkArgument(
         identifier != null && isValidIdentifier(identifier), "Invalid 
identifier: %s", identifier);
     Preconditions.checkArgument(
         metadataFileLocation != null && !metadataFileLocation.isEmpty(),
         "Cannot register an empty metadata file location as a table");
 
     int lastSlashIndex = metadataFileLocation.lastIndexOf("/");
+    LOGGER.debug(
+        "Computed lastSlashIndex={} for metadataFileLocation={}",
+        lastSlashIndex,
+        metadataFileLocation);
     Preconditions.checkArgument(
         lastSlashIndex != -1,
         "Invalid metadata file location; metadata file location must be 
absolute and contain a '/': %s",
         metadataFileLocation);
 
     // Throw an exception if this table already exists in the catalog.
-    if (tableExists(identifier)) {
+    boolean tableExists = tableExists(identifier);
+    LOGGER.debug("tableExists for identifier {} = {}", identifier, 
tableExists);
+    if (!overwrite && tableExists) {
+      LOGGER.debug("Table already exists and overwrite is false for 
identifier={}", identifier);

Review Comment:
   Nit: we can  remove the debug log as we throw an exception right after it.



##########
polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizableOperation.java:
##########
@@ -127,6 +128,7 @@ public enum PolarisAuthorizableOperation {
   CREATE_TABLE_STAGED(TABLE_CREATE),
   CREATE_TABLE_STAGED_WITH_WRITE_DELEGATION(EnumSet.of(TABLE_CREATE, 
TABLE_WRITE_DATA)),
   REGISTER_TABLE(TABLE_CREATE),
+  REGISTER_TABLE_OVERWRITE(TABLE_FULL_METADATA),

Review Comment:
   Given that Polaris doesn't persist the value of this enum, it is probably 
fine to insert a new one in the middle. 



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -612,13 +615,133 @@ public LoadTableResponse createTableStaged(
    * @return ETagged {@link LoadTableResponse} to uniquely identify the table 
metadata
    */
   public LoadTableResponse registerTable(Namespace namespace, 
RegisterTableRequest request) {
-    PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.REGISTER_TABLE;
-    authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
-        op, TableIdentifier.of(namespace, request.name()));
+    TableIdentifier identifier = TableIdentifier.of(namespace, request.name());
+    boolean overwrite = request.overwrite();
+
+    LOGGER.debug(
+        "registerTable: identifier={}, overwrite={}, request={}", identifier, 
overwrite, request);
 
+    if (overwrite) {
+      LOGGER.debug("registerTable: overwrite requested for {}", identifier);
+      authorizeRegisterTableOverwriteOrCreate(identifier);
+      return registerTableWithOverwrite(identifier, request);
+    }
+
+    // Creating new table requires REGISTER_TABLE privilege
+    PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.REGISTER_TABLE;
+    authorizeCreateTableLikeUnderNamespaceOperationOrThrow(op, identifier);
+    LOGGER.debug("registerTable: authorized REGISTER_TABLE for {}", 
identifier);
     return catalogHandlerUtils().registerTable(baseCatalog, namespace, 
request);
   }
 
+  /**
+   * Authorize registerTable with overwrite=true.
+   *
+   * <p>If the table exists, require REGISTER_TABLE_OVERWRITE on the table; 
otherwise require
+   * REGISTER_TABLE on the parent namespace.
+   *
+   * <p>Resolve both the namespace and an optional passthrough table path in 
one pass because the
+   * standard helpers either assume the table exists or always authorize 
against the namespace.
+   * Also, baseCatalog.tableExists() cannot be used here since 
initializeCatalog() has not run.
+   */
+  private void authorizeRegisterTableOverwriteOrCreate(TableIdentifier 
identifier) {
+    LOGGER.debug("authorizeRegisterTableOverwriteOrCreate: start for {}", 
identifier);
+    // Build a resolution manifest that includes the namespace and optional 
table path.
+    resolutionManifest = newResolutionManifest();
+    resolutionManifest.addPath(
+        new ResolverPath(
+            Arrays.asList(identifier.namespace().levels()), 
PolarisEntityType.NAMESPACE),
+        identifier.namespace());
+    resolutionManifest.addPassthroughPath(
+        new ResolverPath(
+            PolarisCatalogHelpers.tableIdentifierToList(identifier),
+            PolarisEntityType.TABLE_LIKE,
+            true /* optional */),
+        identifier);
+    resolutionManifest.resolveAll();
+    PolarisResolvedPathWrapper tableTarget =
+        resolutionManifest.getResolvedPath(
+            identifier, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ICEBERG_TABLE, true);
+
+    if (tableTarget != null) {
+      LOGGER.debug(
+          "authorizeRegisterTableOverwriteOrCreate: found existing table 
target for {}, requiring REGISTER_TABLE_OVERWRITE",
+          identifier);
+      // Overwrite on an existing table requires full metadata permissions.
+      authorizer()
+          .authorizeOrThrow(
+              polarisPrincipal(),
+              resolutionManifest.getAllActivatedCatalogRoleAndPrincipalRoles(),
+              PolarisAuthorizableOperation.REGISTER_TABLE_OVERWRITE,
+              tableTarget,
+              null /* secondary */);
+      initializeCatalog();
+      LOGGER.debug(
+          "registerTable: overwrite=true, authorized for 
REGISTER_TABLE_OVERWRITE on existing table {}",
+          identifier);
+      return;
+    }
+
+    // Table doesn't exist, fall back to standard register-table authorization.
+    LOGGER.debug(
+        "authorizeRegisterTableOverwriteOrCreate: table not found for {}, 
falling back to REGISTER_TABLE on namespace",
+        identifier);
+    PolarisResolvedPathWrapper namespaceTarget =
+        resolutionManifest.getResolvedPath(identifier.namespace(), true);
+    if (namespaceTarget == null) {
+      LOGGER.debug(
+          "authorizeRegisterTableOverwriteOrCreate: namespace not found for 
{}",
+          identifier.namespace());
+      throw new NoSuchNamespaceException("Namespace does not exist: %s", 
identifier.namespace());
+    }
+    authorizer()
+        .authorizeOrThrow(
+            polarisPrincipal(),
+            resolutionManifest.getAllActivatedCatalogRoleAndPrincipalRoles(),
+            PolarisAuthorizableOperation.REGISTER_TABLE,
+            namespaceTarget,
+            null /* secondary */);
+    initializeCatalog();
+    LOGGER.debug(
+        "authorizeRegisterTableOverwriteOrCreate: authorized REGISTER_TABLE on 
namespace {}",
+        identifier.namespace());
+  }
+
+  private LoadTableResponse registerTableWithOverwrite(
+      TableIdentifier identifier, RegisterTableRequest request) {
+    LOGGER.debug(
+        "registerTableWithOverwrite: identifier={}, metadataLocation={}",
+        identifier,
+        request.metadataLocation());
+    // Handle Polaris-specific overwrite logic
+    if (baseCatalog instanceof IcebergCatalog icebergCatalog) {
+      LOGGER.debug(
+          "registerTableWithOverwrite: using IcebergCatalog.registerTable for 
{}", identifier);

Review Comment:
   this debug message seems not useful, can we remove it?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -612,13 +615,133 @@ public LoadTableResponse createTableStaged(
    * @return ETagged {@link LoadTableResponse} to uniquely identify the table 
metadata
    */
   public LoadTableResponse registerTable(Namespace namespace, 
RegisterTableRequest request) {
-    PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.REGISTER_TABLE;
-    authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
-        op, TableIdentifier.of(namespace, request.name()));
+    TableIdentifier identifier = TableIdentifier.of(namespace, request.name());
+    boolean overwrite = request.overwrite();
+
+    LOGGER.debug(
+        "registerTable: identifier={}, overwrite={}, request={}", identifier, 
overwrite, request);
 
+    if (overwrite) {
+      LOGGER.debug("registerTable: overwrite requested for {}", identifier);
+      authorizeRegisterTableOverwriteOrCreate(identifier);
+      return registerTableWithOverwrite(identifier, request);
+    }
+
+    // Creating new table requires REGISTER_TABLE privilege
+    PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.REGISTER_TABLE;
+    authorizeCreateTableLikeUnderNamespaceOperationOrThrow(op, identifier);
+    LOGGER.debug("registerTable: authorized REGISTER_TABLE for {}", 
identifier);
     return catalogHandlerUtils().registerTable(baseCatalog, namespace, 
request);
   }
 
+  /**
+   * Authorize registerTable with overwrite=true.
+   *
+   * <p>If the table exists, require REGISTER_TABLE_OVERWRITE on the table; 
otherwise require
+   * REGISTER_TABLE on the parent namespace.
+   *
+   * <p>Resolve both the namespace and an optional passthrough table path in 
one pass because the
+   * standard helpers either assume the table exists or always authorize 
against the namespace.
+   * Also, baseCatalog.tableExists() cannot be used here since 
initializeCatalog() has not run.
+   */
+  private void authorizeRegisterTableOverwriteOrCreate(TableIdentifier 
identifier) {

Review Comment:
   I think we may just call it `authorizeRegisterTableOverwrite`, as 
register-table itself implies creation.



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -612,13 +615,133 @@ public LoadTableResponse createTableStaged(
    * @return ETagged {@link LoadTableResponse} to uniquely identify the table 
metadata
    */
   public LoadTableResponse registerTable(Namespace namespace, 
RegisterTableRequest request) {
-    PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.REGISTER_TABLE;
-    authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
-        op, TableIdentifier.of(namespace, request.name()));
+    TableIdentifier identifier = TableIdentifier.of(namespace, request.name());
+    boolean overwrite = request.overwrite();
+
+    LOGGER.debug(
+        "registerTable: identifier={}, overwrite={}, request={}", identifier, 
overwrite, request);
 
+    if (overwrite) {
+      LOGGER.debug("registerTable: overwrite requested for {}", identifier);
+      authorizeRegisterTableOverwriteOrCreate(identifier);
+      return registerTableWithOverwrite(identifier, request);
+    }
+
+    // Creating new table requires REGISTER_TABLE privilege
+    PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.REGISTER_TABLE;
+    authorizeCreateTableLikeUnderNamespaceOperationOrThrow(op, identifier);
+    LOGGER.debug("registerTable: authorized REGISTER_TABLE for {}", 
identifier);
     return catalogHandlerUtils().registerTable(baseCatalog, namespace, 
request);
   }
 
+  /**
+   * Authorize registerTable with overwrite=true.
+   *
+   * <p>If the table exists, require REGISTER_TABLE_OVERWRITE on the table; 
otherwise require
+   * REGISTER_TABLE on the parent namespace.
+   *
+   * <p>Resolve both the namespace and an optional passthrough table path in 
one pass because the
+   * standard helpers either assume the table exists or always authorize 
against the namespace.
+   * Also, baseCatalog.tableExists() cannot be used here since 
initializeCatalog() has not run.
+   */
+  private void authorizeRegisterTableOverwriteOrCreate(TableIdentifier 
identifier) {
+    LOGGER.debug("authorizeRegisterTableOverwriteOrCreate: start for {}", 
identifier);
+    // Build a resolution manifest that includes the namespace and optional 
table path.
+    resolutionManifest = newResolutionManifest();
+    resolutionManifest.addPath(
+        new ResolverPath(
+            Arrays.asList(identifier.namespace().levels()), 
PolarisEntityType.NAMESPACE),
+        identifier.namespace());
+    resolutionManifest.addPassthroughPath(
+        new ResolverPath(
+            PolarisCatalogHelpers.tableIdentifierToList(identifier),
+            PolarisEntityType.TABLE_LIKE,
+            true /* optional */),
+        identifier);
+    resolutionManifest.resolveAll();
+    PolarisResolvedPathWrapper tableTarget =
+        resolutionManifest.getResolvedPath(
+            identifier, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ICEBERG_TABLE, true);
+
+    if (tableTarget != null) {
+      LOGGER.debug(
+          "authorizeRegisterTableOverwriteOrCreate: found existing table 
target for {}, requiring REGISTER_TABLE_OVERWRITE",
+          identifier);
+      // Overwrite on an existing table requires full metadata permissions.
+      authorizer()
+          .authorizeOrThrow(
+              polarisPrincipal(),
+              resolutionManifest.getAllActivatedCatalogRoleAndPrincipalRoles(),
+              PolarisAuthorizableOperation.REGISTER_TABLE_OVERWRITE,
+              tableTarget,
+              null /* secondary */);
+      initializeCatalog();
+      LOGGER.debug(
+          "registerTable: overwrite=true, authorized for 
REGISTER_TABLE_OVERWRITE on existing table {}",
+          identifier);
+      return;
+    }
+
+    // Table doesn't exist, fall back to standard register-table authorization.
+    LOGGER.debug(
+        "authorizeRegisterTableOverwriteOrCreate: table not found for {}, 
falling back to REGISTER_TABLE on namespace",
+        identifier);
+    PolarisResolvedPathWrapper namespaceTarget =
+        resolutionManifest.getResolvedPath(identifier.namespace(), true);
+    if (namespaceTarget == null) {
+      LOGGER.debug(
+          "authorizeRegisterTableOverwriteOrCreate: namespace not found for 
{}",
+          identifier.namespace());
+      throw new NoSuchNamespaceException("Namespace does not exist: %s", 
identifier.namespace());
+    }
+    authorizer()
+        .authorizeOrThrow(
+            polarisPrincipal(),
+            resolutionManifest.getAllActivatedCatalogRoleAndPrincipalRoles(),
+            PolarisAuthorizableOperation.REGISTER_TABLE,
+            namespaceTarget,
+            null /* secondary */);
+    initializeCatalog();
+    LOGGER.debug(
+        "authorizeRegisterTableOverwriteOrCreate: authorized REGISTER_TABLE on 
namespace {}",
+        identifier.namespace());
+  }
+
+  private LoadTableResponse registerTableWithOverwrite(
+      TableIdentifier identifier, RegisterTableRequest request) {
+    LOGGER.debug(
+        "registerTableWithOverwrite: identifier={}, metadataLocation={}",
+        identifier,
+        request.metadataLocation());
+    // Handle Polaris-specific overwrite logic
+    if (baseCatalog instanceof IcebergCatalog icebergCatalog) {
+      LOGGER.debug(
+          "registerTableWithOverwrite: using IcebergCatalog.registerTable for 
{}", identifier);
+      // Use the overwrite-capable registration path for IcebergCatalog
+      Table table = icebergCatalog.registerTable(identifier, 
request.metadataLocation(), true);
+      if (table instanceof BaseTable baseTable) {
+        TableMetadata metadata = baseTable.operations().current();
+        LOGGER.debug(
+            "registerTableWithOverwrite: registered and loaded metadata for {} 
(metadataLocation={})",
+            identifier,
+            metadata.location());
+        return LoadTableResponse.builder().withTableMetadata(metadata).build();
+      }
+      LOGGER.debug(
+          "registerTableWithOverwrite: unexpected table implementation for 
{}", identifier);
+      throw new IllegalStateException("Cannot wrap catalog that does not 
produce BaseTable");
+    }
+
+    // For non-Polaris catalogs, reject overwrite to prevent accidental 
overwrites
+    LOGGER.debug(
+        "registerTableWithOverwrite: unsupported baseCatalog type {} for 
overwrite on {}",
+        baseCatalog.getClass().getName(),
+        identifier);

Review Comment:
   Can we remove the debug log as the next line throws?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -612,13 +615,133 @@ public LoadTableResponse createTableStaged(
    * @return ETagged {@link LoadTableResponse} to uniquely identify the table 
metadata
    */
   public LoadTableResponse registerTable(Namespace namespace, 
RegisterTableRequest request) {
-    PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.REGISTER_TABLE;
-    authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
-        op, TableIdentifier.of(namespace, request.name()));
+    TableIdentifier identifier = TableIdentifier.of(namespace, request.name());
+    boolean overwrite = request.overwrite();
+
+    LOGGER.debug(
+        "registerTable: identifier={}, overwrite={}, request={}", identifier, 
overwrite, request);
 
+    if (overwrite) {
+      LOGGER.debug("registerTable: overwrite requested for {}", identifier);
+      authorizeRegisterTableOverwriteOrCreate(identifier);
+      return registerTableWithOverwrite(identifier, request);
+    }
+
+    // Creating new table requires REGISTER_TABLE privilege
+    PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.REGISTER_TABLE;
+    authorizeCreateTableLikeUnderNamespaceOperationOrThrow(op, identifier);
+    LOGGER.debug("registerTable: authorized REGISTER_TABLE for {}", 
identifier);
     return catalogHandlerUtils().registerTable(baseCatalog, namespace, 
request);
   }
 
+  /**
+   * Authorize registerTable with overwrite=true.
+   *
+   * <p>If the table exists, require REGISTER_TABLE_OVERWRITE on the table; 
otherwise require
+   * REGISTER_TABLE on the parent namespace.
+   *
+   * <p>Resolve both the namespace and an optional passthrough table path in 
one pass because the
+   * standard helpers either assume the table exists or always authorize 
against the namespace.
+   * Also, baseCatalog.tableExists() cannot be used here since 
initializeCatalog() has not run.
+   */
+  private void authorizeRegisterTableOverwriteOrCreate(TableIdentifier 
identifier) {
+    LOGGER.debug("authorizeRegisterTableOverwriteOrCreate: start for {}", 
identifier);
+    // Build a resolution manifest that includes the namespace and optional 
table path.
+    resolutionManifest = newResolutionManifest();
+    resolutionManifest.addPath(
+        new ResolverPath(
+            Arrays.asList(identifier.namespace().levels()), 
PolarisEntityType.NAMESPACE),
+        identifier.namespace());
+    resolutionManifest.addPassthroughPath(
+        new ResolverPath(
+            PolarisCatalogHelpers.tableIdentifierToList(identifier),
+            PolarisEntityType.TABLE_LIKE,
+            true /* optional */),
+        identifier);
+    resolutionManifest.resolveAll();
+    PolarisResolvedPathWrapper tableTarget =
+        resolutionManifest.getResolvedPath(
+            identifier, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ICEBERG_TABLE, true);
+
+    if (tableTarget != null) {
+      LOGGER.debug(
+          "authorizeRegisterTableOverwriteOrCreate: found existing table 
target for {}, requiring REGISTER_TABLE_OVERWRITE",
+          identifier);
+      // Overwrite on an existing table requires full metadata permissions.
+      authorizer()
+          .authorizeOrThrow(
+              polarisPrincipal(),
+              resolutionManifest.getAllActivatedCatalogRoleAndPrincipalRoles(),
+              PolarisAuthorizableOperation.REGISTER_TABLE_OVERWRITE,
+              tableTarget,
+              null /* secondary */);
+      initializeCatalog();
+      LOGGER.debug(
+          "registerTable: overwrite=true, authorized for 
REGISTER_TABLE_OVERWRITE on existing table {}",
+          identifier);
+      return;
+    }
+
+    // Table doesn't exist, fall back to standard register-table authorization.
+    LOGGER.debug(
+        "authorizeRegisterTableOverwriteOrCreate: table not found for {}, 
falling back to REGISTER_TABLE on namespace",
+        identifier);
+    PolarisResolvedPathWrapper namespaceTarget =
+        resolutionManifest.getResolvedPath(identifier.namespace(), true);
+    if (namespaceTarget == null) {
+      LOGGER.debug(
+          "authorizeRegisterTableOverwriteOrCreate: namespace not found for 
{}",
+          identifier.namespace());
+      throw new NoSuchNamespaceException("Namespace does not exist: %s", 
identifier.namespace());
+    }
+    authorizer()
+        .authorizeOrThrow(
+            polarisPrincipal(),
+            resolutionManifest.getAllActivatedCatalogRoleAndPrincipalRoles(),
+            PolarisAuthorizableOperation.REGISTER_TABLE,
+            namespaceTarget,
+            null /* secondary */);
+    initializeCatalog();
+    LOGGER.debug(
+        "authorizeRegisterTableOverwriteOrCreate: authorized REGISTER_TABLE on 
namespace {}",
+        identifier.namespace());
+  }
+
+  private LoadTableResponse registerTableWithOverwrite(
+      TableIdentifier identifier, RegisterTableRequest request) {
+    LOGGER.debug(
+        "registerTableWithOverwrite: identifier={}, metadataLocation={}",
+        identifier,
+        request.metadataLocation());
+    // Handle Polaris-specific overwrite logic
+    if (baseCatalog instanceof IcebergCatalog icebergCatalog) {
+      LOGGER.debug(
+          "registerTableWithOverwrite: using IcebergCatalog.registerTable for 
{}", identifier);
+      // Use the overwrite-capable registration path for IcebergCatalog
+      Table table = icebergCatalog.registerTable(identifier, 
request.metadataLocation(), true);
+      if (table instanceof BaseTable baseTable) {
+        TableMetadata metadata = baseTable.operations().current();
+        LOGGER.debug(
+            "registerTableWithOverwrite: registered and loaded metadata for {} 
(metadataLocation={})",
+            identifier,
+            metadata.location());
+        return LoadTableResponse.builder().withTableMetadata(metadata).build();
+      }
+      LOGGER.debug(
+          "registerTableWithOverwrite: unexpected table implementation for 
{}", identifier);

Review Comment:
   Can we remove this debug log?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -612,13 +615,133 @@ public LoadTableResponse createTableStaged(
    * @return ETagged {@link LoadTableResponse} to uniquely identify the table 
metadata
    */
   public LoadTableResponse registerTable(Namespace namespace, 
RegisterTableRequest request) {
-    PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.REGISTER_TABLE;
-    authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
-        op, TableIdentifier.of(namespace, request.name()));
+    TableIdentifier identifier = TableIdentifier.of(namespace, request.name());
+    boolean overwrite = request.overwrite();
+
+    LOGGER.debug(
+        "registerTable: identifier={}, overwrite={}, request={}", identifier, 
overwrite, request);
 
+    if (overwrite) {
+      LOGGER.debug("registerTable: overwrite requested for {}", identifier);
+      authorizeRegisterTableOverwriteOrCreate(identifier);
+      return registerTableWithOverwrite(identifier, request);
+    }
+
+    // Creating new table requires REGISTER_TABLE privilege
+    PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.REGISTER_TABLE;
+    authorizeCreateTableLikeUnderNamespaceOperationOrThrow(op, identifier);
+    LOGGER.debug("registerTable: authorized REGISTER_TABLE for {}", 
identifier);
     return catalogHandlerUtils().registerTable(baseCatalog, namespace, 
request);
   }
 
+  /**
+   * Authorize registerTable with overwrite=true.
+   *
+   * <p>If the table exists, require REGISTER_TABLE_OVERWRITE on the table; 
otherwise require
+   * REGISTER_TABLE on the parent namespace.
+   *
+   * <p>Resolve both the namespace and an optional passthrough table path in 
one pass because the
+   * standard helpers either assume the table exists or always authorize 
against the namespace.
+   * Also, baseCatalog.tableExists() cannot be used here since 
initializeCatalog() has not run.
+   */
+  private void authorizeRegisterTableOverwriteOrCreate(TableIdentifier 
identifier) {
+    LOGGER.debug("authorizeRegisterTableOverwriteOrCreate: start for {}", 
identifier);
+    // Build a resolution manifest that includes the namespace and optional 
table path.
+    resolutionManifest = newResolutionManifest();
+    resolutionManifest.addPath(
+        new ResolverPath(
+            Arrays.asList(identifier.namespace().levels()), 
PolarisEntityType.NAMESPACE),
+        identifier.namespace());
+    resolutionManifest.addPassthroughPath(
+        new ResolverPath(
+            PolarisCatalogHelpers.tableIdentifierToList(identifier),
+            PolarisEntityType.TABLE_LIKE,
+            true /* optional */),
+        identifier);
+    resolutionManifest.resolveAll();
+    PolarisResolvedPathWrapper tableTarget =
+        resolutionManifest.getResolvedPath(
+            identifier, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ICEBERG_TABLE, true);
+
+    if (tableTarget != null) {
+      LOGGER.debug(
+          "authorizeRegisterTableOverwriteOrCreate: found existing table 
target for {}, requiring REGISTER_TABLE_OVERWRITE",
+          identifier);
+      // Overwrite on an existing table requires full metadata permissions.
+      authorizer()
+          .authorizeOrThrow(
+              polarisPrincipal(),
+              resolutionManifest.getAllActivatedCatalogRoleAndPrincipalRoles(),
+              PolarisAuthorizableOperation.REGISTER_TABLE_OVERWRITE,
+              tableTarget,
+              null /* secondary */);
+      initializeCatalog();
+      LOGGER.debug(
+          "registerTable: overwrite=true, authorized for 
REGISTER_TABLE_OVERWRITE on existing table {}",
+          identifier);
+      return;
+    }
+
+    // Table doesn't exist, fall back to standard register-table authorization.

Review Comment:
   Agreed with @nandorKollar, I think a more consistent behavior here is still 
to use operation `REGISTER_TABLE_OVERWRITE`. 



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -612,13 +615,133 @@ public LoadTableResponse createTableStaged(
    * @return ETagged {@link LoadTableResponse} to uniquely identify the table 
metadata
    */
   public LoadTableResponse registerTable(Namespace namespace, 
RegisterTableRequest request) {
-    PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.REGISTER_TABLE;
-    authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
-        op, TableIdentifier.of(namespace, request.name()));
+    TableIdentifier identifier = TableIdentifier.of(namespace, request.name());
+    boolean overwrite = request.overwrite();
+
+    LOGGER.debug(
+        "registerTable: identifier={}, overwrite={}, request={}", identifier, 
overwrite, request);
 
+    if (overwrite) {
+      LOGGER.debug("registerTable: overwrite requested for {}", identifier);
+      authorizeRegisterTableOverwriteOrCreate(identifier);
+      return registerTableWithOverwrite(identifier, request);
+    }
+
+    // Creating new table requires REGISTER_TABLE privilege
+    PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.REGISTER_TABLE;
+    authorizeCreateTableLikeUnderNamespaceOperationOrThrow(op, identifier);
+    LOGGER.debug("registerTable: authorized REGISTER_TABLE for {}", 
identifier);
     return catalogHandlerUtils().registerTable(baseCatalog, namespace, 
request);
   }
 
+  /**
+   * Authorize registerTable with overwrite=true.
+   *
+   * <p>If the table exists, require REGISTER_TABLE_OVERWRITE on the table; 
otherwise require
+   * REGISTER_TABLE on the parent namespace.
+   *
+   * <p>Resolve both the namespace and an optional passthrough table path in 
one pass because the
+   * standard helpers either assume the table exists or always authorize 
against the namespace.
+   * Also, baseCatalog.tableExists() cannot be used here since 
initializeCatalog() has not run.
+   */
+  private void authorizeRegisterTableOverwriteOrCreate(TableIdentifier 
identifier) {
+    LOGGER.debug("authorizeRegisterTableOverwriteOrCreate: start for {}", 
identifier);
+    // Build a resolution manifest that includes the namespace and optional 
table path.
+    resolutionManifest = newResolutionManifest();
+    resolutionManifest.addPath(
+        new ResolverPath(
+            Arrays.asList(identifier.namespace().levels()), 
PolarisEntityType.NAMESPACE),
+        identifier.namespace());
+    resolutionManifest.addPassthroughPath(
+        new ResolverPath(
+            PolarisCatalogHelpers.tableIdentifierToList(identifier),
+            PolarisEntityType.TABLE_LIKE,
+            true /* optional */),
+        identifier);
+    resolutionManifest.resolveAll();
+    PolarisResolvedPathWrapper tableTarget =
+        resolutionManifest.getResolvedPath(
+            identifier, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ICEBERG_TABLE, true);
+
+    if (tableTarget != null) {
+      LOGGER.debug(
+          "authorizeRegisterTableOverwriteOrCreate: found existing table 
target for {}, requiring REGISTER_TABLE_OVERWRITE",
+          identifier);
+      // Overwrite on an existing table requires full metadata permissions.
+      authorizer()
+          .authorizeOrThrow(
+              polarisPrincipal(),
+              resolutionManifest.getAllActivatedCatalogRoleAndPrincipalRoles(),
+              PolarisAuthorizableOperation.REGISTER_TABLE_OVERWRITE,
+              tableTarget,
+              null /* secondary */);
+      initializeCatalog();
+      LOGGER.debug(
+          "registerTable: overwrite=true, authorized for 
REGISTER_TABLE_OVERWRITE on existing table {}",
+          identifier);
+      return;
+    }
+
+    // Table doesn't exist, fall back to standard register-table authorization.

Review Comment:
   We can also consolidate the code by 
   
   ```
   var target = getResolvedPath()
   if (target == null) {
       target = resolutionManifest.getResolvedPath(identifier.namespace(), 
true);
   }
   
    authorizer()
             .authorizeOrThrow(
                 polarisPrincipal(),
                 
resolutionManifest.getAllActivatedCatalogRoleAndPrincipalRoles(),
                 PolarisAuthorizableOperation.REGISTER_TABLE_OVERWRITE,
                 target,
                 null);
   ```



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java:
##########
@@ -330,6 +380,155 @@ public Table registerTable(TableIdentifier identifier, 
String metadataFileLocati
     return new BaseTable(ops, fullTableName(name(), identifier), 
metricsReporter());
   }
 
+  private Table overwriteRegisteredTable(
+      TableIdentifier identifier, String metadataFileLocation, String 
locationDir) {
+    LOGGER.debug(
+        "overwriteRegisteredTable called with identifier={}, 
metadataFileLocation={}, locationDir={}",
+        identifier,
+        metadataFileLocation,
+        locationDir);
+
+    /*
+     * High-level overview:
+     * - Resolve the authorized parent for the table so we know the storage 
context.
+     * - Validate and read the provided metadata file using a FileIO tied to 
that
+     *   storage context.
+     * - Ensure the target entity exists and is the correct table-like subtype.
+     * - Replace the stored entity properties (including metadata-location) 
with
+     *   values derived from the parsed TableMetadata and persist the change.
+     */
+
+    // Resolve the table path to obtain storage context for overwrite 
validation.
+    PolarisResolvedPathWrapper resolvedPath =
+        resolvedEntityView.getPassthroughResolvedPath(
+            identifier, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ANY_SUBTYPE);
+    if (resolvedPath == null || resolvedPath.getRawLeafEntity() == null) {
+      LOGGER.debug("No resolved path or leaf entity found for identifier={}", 
identifier);
+      throw new NoSuchTableException("Table does not exist: %s", identifier);
+    }
+    LOGGER.debug(
+        "Resolved path for identifier={} -> leafEntityId={}, 
leafEntityName={}",
+        identifier,
+        resolvedPath.getRawLeafEntity().getId(),
+        resolvedPath.getRawLeafEntity().getName());
+
+    // Validate the supplied metadata file location against the resolved 
storage.
+    LOGGER.debug(
+        "Validating supplied metadataFileLocation={} against resolved storage 
for identifier={}",
+        metadataFileLocation,
+        identifier);
+    validateLocationForTableLike(identifier, metadataFileLocation, 
resolvedPath);
+
+    // Configure FileIO for the resolved storage and read the metadata file.
+    LOGGER.debug(
+        "Loading FileIO for identifier={} to read metadata at {}", identifier, 
locationDir);
+    FileIO fileIO =
+        loadFileIOForTableLike(
+            identifier,
+            Set.of(locationDir),
+            resolvedPath,
+            new HashMap<>(tableDefaultProperties),
+            Set.of(PolarisStorageActions.READ, PolarisStorageActions.LIST));
+
+    LOGGER.debug(
+        "Reading TableMetadata from metadataFileLocation={} using resolved 
FileIO",
+        metadataFileLocation);
+    TableMetadata metadata = TableMetadataParser.read(fileIO, 
metadataFileLocation);
+    LOGGER.debug(
+        "Parsed TableMetadata for identifier={} -> uuid={}, location={}",
+        identifier,
+        metadata.uuid(),
+        metadata.location());
+
+    LOGGER.debug(
+        "Validating metadata.location()={} for identifier={}", 
metadata.location(), identifier);
+    validateLocationForTableLike(identifier, metadata.location(), 
resolvedPath);
+    validateMetadataFileInTableDir(identifier, metadata);
+    LOGGER.debug(
+        "Metadata file validated to be within table directory for 
identifier={}", identifier);
+
+    PolarisResolvedPathWrapper resolvedStorageEntity = resolvedPath;
+
+    List<PolarisEntity> resolvedNamespace = resolvedPath.getRawParentPath();
+    Set<String> dataLocations =
+        StorageUtil.getLocationsUsedByTable(metadata.location(), 
metadata.properties());
+    LOGGER.debug(
+        "Validating data locations {} for identifier={} against resolved 
storage entity",
+        dataLocations,
+        identifier);
+    // Mirror updateTable location validation to prevent invalid or 
overlapping locations.
+    CatalogUtils.validateLocationsForTableLike(
+        realmConfig, identifier, dataLocations, resolvedStorageEntity);
+    dataLocations.forEach(
+        location ->
+            validateNoLocationOverlap(
+                catalogEntity,
+                identifier,
+                resolvedNamespace,
+                location,
+                resolvedStorageEntity.getRawLeafEntity()));
+    LOGGER.debug("Location overlap validation completed for identifier={}", 
identifier);
+
+    // Ensure the raw entity is an Iceberg table-like entity (not a 
view/generic table).
+    PolarisEntity rawEntity = resolvedPath.getRawLeafEntity();
+    LOGGER.debug(
+        "Existing entity for identifier={} has subType={}", identifier, 
rawEntity.getSubType());
+    if (rawEntity.getSubType() == PolarisEntitySubType.ICEBERG_VIEW) {
+      LOGGER.debug(
+          "Existing entity is a view for identifier={} - cannot overwrite as 
table", identifier);
+      throw new AlreadyExistsException("View with same name already exists: 
%s", identifier);
+    } else if (rawEntity.getSubType() == PolarisEntitySubType.GENERIC_TABLE) {
+      LOGGER.debug(
+          "Existing entity is a generic table for identifier={} - cannot 
overwrite as iceberg table",
+          identifier);
+      throw new AlreadyExistsException(
+          "Generic table with same name already exists: %s", identifier);
+    }
+
+    IcebergTableLikeEntity existingEntity = 
IcebergTableLikeEntity.of(rawEntity);
+    if (existingEntity == null) {

Review Comment:
   is it possible existingEntity is null?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java:
##########
@@ -330,6 +380,155 @@ public Table registerTable(TableIdentifier identifier, 
String metadataFileLocati
     return new BaseTable(ops, fullTableName(name(), identifier), 
metricsReporter());
   }
 
+  private Table overwriteRegisteredTable(
+      TableIdentifier identifier, String metadataFileLocation, String 
locationDir) {
+    LOGGER.debug(
+        "overwriteRegisteredTable called with identifier={}, 
metadataFileLocation={}, locationDir={}",
+        identifier,
+        metadataFileLocation,
+        locationDir);
+
+    /*
+     * High-level overview:
+     * - Resolve the authorized parent for the table so we know the storage 
context.
+     * - Validate and read the provided metadata file using a FileIO tied to 
that
+     *   storage context.
+     * - Ensure the target entity exists and is the correct table-like subtype.
+     * - Replace the stored entity properties (including metadata-location) 
with
+     *   values derived from the parsed TableMetadata and persist the change.
+     */
+
+    // Resolve the table path to obtain storage context for overwrite 
validation.
+    PolarisResolvedPathWrapper resolvedPath =
+        resolvedEntityView.getPassthroughResolvedPath(
+            identifier, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ANY_SUBTYPE);
+    if (resolvedPath == null || resolvedPath.getRawLeafEntity() == null) {
+      LOGGER.debug("No resolved path or leaf entity found for identifier={}", 
identifier);
+      throw new NoSuchTableException("Table does not exist: %s", identifier);
+    }
+    LOGGER.debug(
+        "Resolved path for identifier={} -> leafEntityId={}, 
leafEntityName={}",
+        identifier,
+        resolvedPath.getRawLeafEntity().getId(),
+        resolvedPath.getRawLeafEntity().getName());
+
+    // Validate the supplied metadata file location against the resolved 
storage.
+    LOGGER.debug(
+        "Validating supplied metadataFileLocation={} against resolved storage 
for identifier={}",
+        metadataFileLocation,
+        identifier);
+    validateLocationForTableLike(identifier, metadataFileLocation, 
resolvedPath);
+
+    // Configure FileIO for the resolved storage and read the metadata file.
+    LOGGER.debug(
+        "Loading FileIO for identifier={} to read metadata at {}", identifier, 
locationDir);
+    FileIO fileIO =
+        loadFileIOForTableLike(
+            identifier,
+            Set.of(locationDir),
+            resolvedPath,
+            new HashMap<>(tableDefaultProperties),
+            Set.of(PolarisStorageActions.READ, PolarisStorageActions.LIST));
+
+    LOGGER.debug(
+        "Reading TableMetadata from metadataFileLocation={} using resolved 
FileIO",
+        metadataFileLocation);
+    TableMetadata metadata = TableMetadataParser.read(fileIO, 
metadataFileLocation);
+    LOGGER.debug(
+        "Parsed TableMetadata for identifier={} -> uuid={}, location={}",
+        identifier,
+        metadata.uuid(),
+        metadata.location());
+
+    LOGGER.debug(
+        "Validating metadata.location()={} for identifier={}", 
metadata.location(), identifier);
+    validateLocationForTableLike(identifier, metadata.location(), 
resolvedPath);
+    validateMetadataFileInTableDir(identifier, metadata);
+    LOGGER.debug(
+        "Metadata file validated to be within table directory for 
identifier={}", identifier);
+
+    PolarisResolvedPathWrapper resolvedStorageEntity = resolvedPath;
+
+    List<PolarisEntity> resolvedNamespace = resolvedPath.getRawParentPath();
+    Set<String> dataLocations =
+        StorageUtil.getLocationsUsedByTable(metadata.location(), 
metadata.properties());
+    LOGGER.debug(
+        "Validating data locations {} for identifier={} against resolved 
storage entity",
+        dataLocations,
+        identifier);
+    // Mirror updateTable location validation to prevent invalid or 
overlapping locations.
+    CatalogUtils.validateLocationsForTableLike(
+        realmConfig, identifier, dataLocations, resolvedStorageEntity);
+    dataLocations.forEach(
+        location ->
+            validateNoLocationOverlap(
+                catalogEntity,
+                identifier,
+                resolvedNamespace,
+                location,
+                resolvedStorageEntity.getRawLeafEntity()));
+    LOGGER.debug("Location overlap validation completed for identifier={}", 
identifier);
+
+    // Ensure the raw entity is an Iceberg table-like entity (not a 
view/generic table).
+    PolarisEntity rawEntity = resolvedPath.getRawLeafEntity();
+    LOGGER.debug(
+        "Existing entity for identifier={} has subType={}", identifier, 
rawEntity.getSubType());
+    if (rawEntity.getSubType() == PolarisEntitySubType.ICEBERG_VIEW) {
+      LOGGER.debug(
+          "Existing entity is a view for identifier={} - cannot overwrite as 
table", identifier);
+      throw new AlreadyExistsException("View with same name already exists: 
%s", identifier);
+    } else if (rawEntity.getSubType() == PolarisEntitySubType.GENERIC_TABLE) {
+      LOGGER.debug(
+          "Existing entity is a generic table for identifier={} - cannot 
overwrite as iceberg table",
+          identifier);
+      throw new AlreadyExistsException(
+          "Generic table with same name already exists: %s", identifier);
+    }
+
+    IcebergTableLikeEntity existingEntity = 
IcebergTableLikeEntity.of(rawEntity);
+    if (existingEntity == null) {
+      LOGGER.debug(
+          "Failed to convert rawEntity to IcebergTableLikeEntity for 
identifier={}", identifier);
+      throw new NoSuchTableException("Table does not exist: %s", identifier);
+    }
+
+    // Build updated entity from parsed metadata and persist the update.
+    // NOTE: This updates the Iceberg table UUID to match the new metadata 
file's UUID
+    // (via buildTableMetadataPropertiesMap which extracts metadata.uuid()).
+    // The Polaris entity ID remains unchanged to preserve grants/permissions.
+    // See https://lists.apache.org/thread/b5k7vdng904zr3n3q8wv83y8l30rnd4c
+    // and https://lists.apache.org/thread/k3595bttvohb6c3ms36o16gppdfllqmp
+    Map<String, String> storedProperties = 
buildTableMetadataPropertiesMap(metadata);
+    LOGGER.debug(
+        "Built storedProperties for identifier={} with {} entries and 
metadata.uuid={}",
+        identifier,
+        storedProperties.size(),
+        storedProperties.get(IcebergTableLikeEntity.TABLE_UUID));
+    IcebergTableLikeEntity updatedEntity =
+        new IcebergTableLikeEntity.Builder(existingEntity)
+            .setInternalProperties(storedProperties)
+            .setMetadataLocation(metadataFileLocation)
+            .build();
+
+    LOGGER.debug(
+        "Updating stored entity for identifier={} (entityId={}) with new 
metadataLocation={}",
+        identifier,
+        existingEntity.getId(),
+        metadataFileLocation);
+    updateTableLike(identifier, updatedEntity);
+    LOGGER.debug("updateTableLike succeeded for identifier={}", identifier);
+
+    // Refresh TableOperations so the in-memory table reflects the new 
metadata.
+    LOGGER.debug("Refreshing TableOperations for identifier={}", identifier);
+    TableOperations ops = newTableOps(identifier);
+    ops.refresh();
+    LOGGER.debug("TableOperations refresh completed for identifier={}", 
identifier);
+    Table result = new BaseTable(ops, fullTableName(name(), identifier), 
metricsReporter());
+    LOGGER.debug(
+        "Returning BaseTable for identifier={} with tableName={}", identifier, 
result.name());
+    return result;

Review Comment:
   ```suggestion
       return new BaseTable(ops, fullTableName(name(), identifier), 
metricsReporter());
   ```



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java:
##########
@@ -330,6 +380,155 @@ public Table registerTable(TableIdentifier identifier, 
String metadataFileLocati
     return new BaseTable(ops, fullTableName(name(), identifier), 
metricsReporter());
   }
 
+  private Table overwriteRegisteredTable(
+      TableIdentifier identifier, String metadataFileLocation, String 
locationDir) {
+    LOGGER.debug(
+        "overwriteRegisteredTable called with identifier={}, 
metadataFileLocation={}, locationDir={}",
+        identifier,
+        metadataFileLocation,
+        locationDir);
+
+    /*
+     * High-level overview:
+     * - Resolve the authorized parent for the table so we know the storage 
context.
+     * - Validate and read the provided metadata file using a FileIO tied to 
that
+     *   storage context.
+     * - Ensure the target entity exists and is the correct table-like subtype.
+     * - Replace the stored entity properties (including metadata-location) 
with
+     *   values derived from the parsed TableMetadata and persist the change.
+     */
+
+    // Resolve the table path to obtain storage context for overwrite 
validation.
+    PolarisResolvedPathWrapper resolvedPath =
+        resolvedEntityView.getPassthroughResolvedPath(
+            identifier, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ANY_SUBTYPE);
+    if (resolvedPath == null || resolvedPath.getRawLeafEntity() == null) {
+      LOGGER.debug("No resolved path or leaf entity found for identifier={}", 
identifier);
+      throw new NoSuchTableException("Table does not exist: %s", identifier);
+    }
+    LOGGER.debug(
+        "Resolved path for identifier={} -> leafEntityId={}, 
leafEntityName={}",
+        identifier,
+        resolvedPath.getRawLeafEntity().getId(),
+        resolvedPath.getRawLeafEntity().getName());
+
+    // Validate the supplied metadata file location against the resolved 
storage.
+    LOGGER.debug(
+        "Validating supplied metadataFileLocation={} against resolved storage 
for identifier={}",
+        metadataFileLocation,
+        identifier);
+    validateLocationForTableLike(identifier, metadataFileLocation, 
resolvedPath);
+
+    // Configure FileIO for the resolved storage and read the metadata file.
+    LOGGER.debug(
+        "Loading FileIO for identifier={} to read metadata at {}", identifier, 
locationDir);
+    FileIO fileIO =
+        loadFileIOForTableLike(
+            identifier,
+            Set.of(locationDir),
+            resolvedPath,
+            new HashMap<>(tableDefaultProperties),
+            Set.of(PolarisStorageActions.READ, PolarisStorageActions.LIST));
+
+    LOGGER.debug(
+        "Reading TableMetadata from metadataFileLocation={} using resolved 
FileIO",
+        metadataFileLocation);
+    TableMetadata metadata = TableMetadataParser.read(fileIO, 
metadataFileLocation);
+    LOGGER.debug(
+        "Parsed TableMetadata for identifier={} -> uuid={}, location={}",
+        identifier,
+        metadata.uuid(),
+        metadata.location());
+
+    LOGGER.debug(
+        "Validating metadata.location()={} for identifier={}", 
metadata.location(), identifier);
+    validateLocationForTableLike(identifier, metadata.location(), 
resolvedPath);
+    validateMetadataFileInTableDir(identifier, metadata);
+    LOGGER.debug(
+        "Metadata file validated to be within table directory for 
identifier={}", identifier);
+
+    PolarisResolvedPathWrapper resolvedStorageEntity = resolvedPath;
+
+    List<PolarisEntity> resolvedNamespace = resolvedPath.getRawParentPath();
+    Set<String> dataLocations =
+        StorageUtil.getLocationsUsedByTable(metadata.location(), 
metadata.properties());
+    LOGGER.debug(
+        "Validating data locations {} for identifier={} against resolved 
storage entity",
+        dataLocations,
+        identifier);
+    // Mirror updateTable location validation to prevent invalid or 
overlapping locations.
+    CatalogUtils.validateLocationsForTableLike(
+        realmConfig, identifier, dataLocations, resolvedStorageEntity);
+    dataLocations.forEach(
+        location ->
+            validateNoLocationOverlap(
+                catalogEntity,
+                identifier,
+                resolvedNamespace,
+                location,
+                resolvedStorageEntity.getRawLeafEntity()));
+    LOGGER.debug("Location overlap validation completed for identifier={}", 
identifier);
+
+    // Ensure the raw entity is an Iceberg table-like entity (not a 
view/generic table).
+    PolarisEntity rawEntity = resolvedPath.getRawLeafEntity();
+    LOGGER.debug(
+        "Existing entity for identifier={} has subType={}", identifier, 
rawEntity.getSubType());
+    if (rawEntity.getSubType() == PolarisEntitySubType.ICEBERG_VIEW) {
+      LOGGER.debug(
+          "Existing entity is a view for identifier={} - cannot overwrite as 
table", identifier);
+      throw new AlreadyExistsException("View with same name already exists: 
%s", identifier);
+    } else if (rawEntity.getSubType() == PolarisEntitySubType.GENERIC_TABLE) {
+      LOGGER.debug(
+          "Existing entity is a generic table for identifier={} - cannot 
overwrite as iceberg table",
+          identifier);
+      throw new AlreadyExistsException(
+          "Generic table with same name already exists: %s", identifier);
+    }

Review Comment:
   I think we may check if it is an Iceberg table, otherwise, the logic is 
fragile when there is a new  `PolarisEntitySubType` introduced, like udf. 
   ```suggestion
       if (rawEntity.getSubType() != PolarisEntitySubType.ICEBERG_TABLE) {
         throw new AlreadyExistsException(...);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to