dimas-b commented on code in PR #4606:
URL: https://github.com/apache/polaris/pull/4606#discussion_r3357360359


##########
polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java:
##########
@@ -551,6 +565,18 @@ public static void enforceFeatureEnabledOrThrow(
           .defaultValue(false)
           .buildFeatureConfiguration();
 
+  public static final FeatureConfiguration<Boolean> 
DEFAULT_TABLE_LOCATION_USE_UUID =
+      PolarisConfiguration.<Boolean>builder()
+          .key("DEFAULT_TABLE_LOCATION_USE_UUID")

Review Comment:
   "UUID" is too impl.-specific, I think. Suggestion: 
`DEFAULT_TABLE_LOCATION_RANDOMIZATION_ENABLED`



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java:
##########
@@ -458,9 +458,9 @@ protected TableOperations newTableOps(TableIdentifier 
tableIdentifier) {
 
   @Override
   protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    String namespaceLocation;
     if (tableIdentifier.namespace().isEmpty()) {
-      return SLASH.join(
-          defaultNamespaceLocation(tableIdentifier.namespace()), 
tableIdentifier.name());
+      namespaceLocation = 
defaultNamespaceLocation(tableIdentifier.namespace());

Review Comment:
   This is confusing. We know the result will be `defaultBaseLocation`.



##########
polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java:
##########
@@ -287,6 +287,20 @@ public static void enforceFeatureEnabledOrThrow(
           .defaultValue(false)
           .buildFeatureConfiguration();
 
+  public static final FeatureConfiguration<Boolean> 
ALLOW_CLIENT_SPECIFIED_TABLE_LOCATION =
+      PolarisConfiguration.<Boolean>builder()
+          .key("ALLOW_CLIENT_SPECIFIED_TABLE_LOCATION")
+          
.catalogConfig("polaris.config.allow.client-specified.table.location")
+          .description(
+              "If set to true, Polaris honors a `location` (and the 
`write.data.path` / "
+                  + "`write.metadata.path` properties) explicitly supplied in 
a create-table or "
+                  + "create-view request (subject to the usual 
allowed-location and overlap "
+                  + "validation). If set to false (the default), such a 
request is rejected and "
+                  + "Polaris always generates the managed location. "
+                  + "See DEFAULT_TABLE_LOCATION_USE_UUID.")
+          .defaultValue(false)

Review Comment:
   This default contradicts current Polaris behaviour, does it not? This mean 
this would be a breaking change to existing deployments.
   
   Should we default to `true` and document the recommendation of using random 
locations instead?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -509,12 +511,105 @@ public LoadTableResponse createTableDirect(
     throw new IllegalStateException("Cannot wrap catalog that does not produce 
BaseTable");
   }
 
+  /**
+   * Table/view properties through which a caller can specify a storage 
location, in addition to the
+   * top-level {@code location} field. These redirect where data/metadata 
files are written, so they
+   * are gated by the same configuration as an explicit location.
+   */
+  private static final List<String> CLIENT_LOCATION_PROPERTY_KEYS =
+      List.of(
+          IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY,
+          IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY);
+
+  /**
+   * Rejects a caller-supplied table or view location unless the catalog is 
configured to allow it.
+   * By default Polaris generates the managed location for new tables and 
views; honoring a
+   * caller-specified location is opt-in (see {@link
+   * FeatureConfiguration#ALLOW_CLIENT_SPECIFIED_TABLE_LOCATION}). This covers 
both the top-level
+   * {@code location} field and the {@code write.data.path} / {@code 
write.metadata.path}
+   * properties.
+   */
+  private void rejectClientSpecifiedLocationIfDisallowed(
+      String requestLocation, Map<String, String> requestProperties) {
+    enforceClientSpecifiedLocationAllowed(
+        clientSpecifiedLocationSource(requestLocation, requestProperties));
+  }
+
+  /**
+   * Rejects an update that changes a table/view location (via {@code 
SetLocation} or the {@code
+   * write.data.path} / {@code write.metadata.path} properties) unless the 
catalog is configured to
+   * allow caller-specified locations.
+   */
+  private void rejectClientSpecifiedLocationIfDisallowed(UpdateTableRequest 
request) {
+    
enforceClientSpecifiedLocationAllowed(clientSpecifiedLocationSource(request));
+  }
+
+  private void enforceClientSpecifiedLocationAllowed(String specifiedBy) {
+    // Federated catalogs delegate location handling to the remote catalog, so 
this constraint
+    // only applies to Polaris-managed catalogs.
+    if (specifiedBy == null || isFederated) {
+      return;
+    }
+    if (!realmConfig()
+        .getConfig(
+            FeatureConfiguration.ALLOW_CLIENT_SPECIFIED_TABLE_LOCATION,
+            getResolvedCatalogEntity())) {
+      throw new BadRequestException(
+          "Specifying a table or view location (%s) is not allowed; set %s to 
true to allow it.",

Review Comment:
   Re: `set %s to true to allow it` - if we disallow it for a reason, we should 
be suggest an "easy button" for by-passing this check. Instead users should 
read the docs and make a mindful decision, I think.



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java:
##########
@@ -469,9 +469,14 @@ protected String defaultWarehouseLocation(TableIdentifier 
tableIdentifier) {
         throw noSuchNamespaceException(tableIdentifier.namespace());
       }
       List<PolarisEntity> namespacePath = resolvedNamespace.getRawFullPath();
-      String namespaceLocation = resolveLocationForPath(diagnostics, 
namespacePath);
-      return SLASH.join(namespaceLocation, tableIdentifier.name());
+      namespaceLocation = resolveLocationForPath(diagnostics, namespacePath);
     }
+    String location = SLASH.join(namespaceLocation, tableIdentifier.name());
+    if (realmConfig.getConfig(
+        FeatureConfiguration.DEFAULT_TABLE_LOCATION_USE_UUID, catalogEntity)) {
+      location = location + "-" + UUID.randomUUID();

Review Comment:
   `UUID.toString()` is too verbose (36 chars) for the amount of randomness the 
UUID contains. 
   
   Long paths risk overflowing max AWS session policy size (cred. vending).
   
   I propose `Long.toUnsignedString(randomLong, Character.MAX_RADIX)` padded to 
13 chars.
   
   If one random long is still considered "too risky" (which I do not think it 
is), we could still use two longs (same as UUID) and require only 26 chars.



##########
runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java:
##########
@@ -184,7 +195,12 @@ public Builder realmContext(RealmContext realmContext) {
     }
 
     public Builder config(Map<String, Object> config) {
-      this.config = config;
+      // Pin the location-related feature flags to their pre-UUID behavior 
unless the caller
+      // overrides them, so existing tests that pass caller-specified 
locations or assert
+      // structured default locations keep working under the new production 
defaults.
+      Map<String, Object> merged = new HashMap<>(LOCATION_TEST_DEFAULTS);
+      merged.putAll(config);
+      this.config = merged;

Review Comment:
   I'd rename this to `putConfig()` and just call `this.config.putAll(config)`



##########
polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java:
##########
@@ -551,6 +565,18 @@ public static void enforceFeatureEnabledOrThrow(
           .defaultValue(false)
           .buildFeatureConfiguration();
 
+  public static final FeatureConfiguration<Boolean> 
DEFAULT_TABLE_LOCATION_USE_UUID =
+      PolarisConfiguration.<Boolean>builder()
+          .key("DEFAULT_TABLE_LOCATION_USE_UUID")
+          .catalogConfig("polaris.config.default-table-location.use-uuid")
+          .description(
+              "When enabled (the default), a managed location generated for a 
table or view "
+                  + "created without an explicit location has a random UUID 
appended, so that each "

Review Comment:
   Same here: I believe it is preferable to avoid referring to "UUID" 
specifically. Stating that the location is randomized should be sufficient for 
end users and allow more impl. flexibility in Polaris code.



##########
runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergTableLocationUuidTest.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.catalog.iceberg;
+
+import static 
org.apache.polaris.core.config.FeatureConfiguration.ALLOW_CLIENT_SPECIFIED_TABLE_LOCATION;
+import static 
org.apache.polaris.core.config.FeatureConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION;
+import static 
org.apache.polaris.core.config.FeatureConfiguration.DEFAULT_TABLE_LOCATION_USE_UUID;
+import static org.apache.polaris.service.admin.PolarisAuthzTestBase.SCHEMA;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.google.common.collect.ImmutableMap;
+import jakarta.ws.rs.core.Response;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.MetadataUpdate;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.BadRequestException;
+import org.apache.iceberg.rest.requests.CommitTransactionRequest;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.util.LocationUtil;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.CatalogProperties;
+import org.apache.polaris.core.admin.model.CreateCatalogRequest;
+import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
+import org.apache.polaris.service.TestServices;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * Tests for the UUID-based default table location feature ({@link
+ * 
org.apache.polaris.core.config.FeatureConfiguration#DEFAULT_TABLE_LOCATION_USE_UUID})
 and the
+ * opt-in for caller-specified locations ({@link
+ * 
org.apache.polaris.core.config.FeatureConfiguration#ALLOW_CLIENT_SPECIFIED_TABLE_LOCATION}).
+ */
+public class IcebergTableLocationUuidTest {
+
+  private static final String NAMESPACE = "ns";
+  private static final String CATALOG = "test-catalog";
+  // UUID v7
+  private static final UUID IDEMPOTENCY_KEY = new UUID(116617318654508422L, 
-7820829973016961092L);
+
+  private static final Map<String, Object> FILE_STORAGE_CONFIG =
+      Map.of(
+          "ALLOW_INSECURE_STORAGE_TYPES",
+          "true",
+          "SUPPORTED_CATALOG_STORAGE_TYPES",
+          List.of("FILE", "S3"));
+
+  private String getTableName() {
+    return "table_" + UUID.randomUUID();
+  }
+
+  /**
+   * Builds a server config of the shared file-storage settings plus the given 
feature overrides.
+   */
+  private static Map<String, Object> serverConfig(String... featureOverrides) {
+    ImmutableMap.Builder<String, Object> builder =
+        ImmutableMap.<String, Object>builder().putAll(FILE_STORAGE_CONFIG);
+    for (int i = 0; i < featureOverrides.length; i += 2) {
+      builder.put(featureOverrides[i], featureOverrides[i + 1]);
+    }
+    return builder.build();
+  }
+
+  private void createCatalogAndNamespace(
+      TestServices services, Map<String, String> catalogConfig, String 
catalogLocation) {
+    CatalogProperties.Builder propertiesBuilder =
+        CatalogProperties.builder()
+            .setDefaultBaseLocation(String.format("%s/%s", catalogLocation, 
CATALOG))
+            .putAll(catalogConfig);
+    StorageConfigInfo config =
+        FileStorageConfigInfo.builder()
+            .setStorageType(StorageConfigInfo.StorageTypeEnum.FILE)
+            .build();
+    Catalog catalogObject =
+        new Catalog(
+            Catalog.TypeEnum.INTERNAL,
+            CATALOG,
+            propertiesBuilder.build(),
+            1725487592064L,
+            1725487592064L,
+            1,
+            config);
+    try (Response response =
+        services
+            .catalogsApi()
+            .createCatalog(
+                new CreateCatalogRequest(catalogObject),
+                services.realmContext(),
+                services.securityContext())) {
+      
assertThat(response.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode());
+    }
+
+    CreateNamespaceRequest createNamespaceRequest =
+        
CreateNamespaceRequest.builder().withNamespace(Namespace.of(NAMESPACE)).build();
+    try (Response response =
+        services
+            .restApi()
+            .createNamespace(
+                CATALOG,
+                createNamespaceRequest,
+                IDEMPOTENCY_KEY,
+                services.realmContext(),
+                services.securityContext())) {
+      
assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+    }
+  }
+
+  /** Creates a table without a caller-specified location and returns the 
location it was given. */
+  private String createTableWithName(TestServices services, String name) {
+    CreateTableRequest createTableRequest =
+        CreateTableRequest.builder().withName(name).withSchema(SCHEMA).build();
+    try (Response response =
+        services
+            .restApi()
+            .createTable(
+                CATALOG,
+                NAMESPACE,
+                createTableRequest,
+                null,
+                IDEMPOTENCY_KEY,
+                services.realmContext(),
+                services.securityContext())) {
+      
assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+      return 
response.readEntity(LoadTableResponse.class).tableMetadata().location();
+    }
+  }
+
+  private Response.StatusType createTableWithLocation(TestServices services, 
String location) {
+    return submitCreateTable(
+        services,
+        CreateTableRequest.builder()
+            .withName(getTableName())
+            .withLocation(location)
+            .withSchema(SCHEMA)
+            .build());
+  }
+
+  /** Creates a table with no top-level location but the given property set. */
+  private Response.StatusType createTableWithProperty(
+      TestServices services, String propertyKey, String propertyValue) {
+    return submitCreateTable(
+        services,
+        CreateTableRequest.builder()
+            .withName(getTableName())
+            .withSchema(SCHEMA)
+            .setProperty(propertyKey, propertyValue)
+            .build());
+  }
+
+  private Response.StatusType submitCreateTable(TestServices services, 
CreateTableRequest request) {
+    try (Response response =
+        services
+            .restApi()
+            .createTable(
+                CATALOG,
+                NAMESPACE,
+                request,
+                null,
+                IDEMPOTENCY_KEY,
+                services.realmContext(),
+                services.securityContext())) {
+      return response.getStatusInfo();
+    }
+  }
+
+  @Test
+  @DisplayName("Default table location has a UUID suffix when the feature is 
enabled")
+  void testDefaultLocationHasUuid(@TempDir Path tempDir) {
+    TestServices services =
+        TestServices.builder()
+            .config(serverConfig(DEFAULT_TABLE_LOCATION_USE_UUID.key(), 
"true"))
+            .build();
+    String baseLocation =
+        
LocationUtil.stripTrailingSlash(tempDir.toAbsolutePath().toUri().toString());
+    createCatalogAndNamespace(services, Map.of(), baseLocation);
+
+    String tableName = getTableName();
+    String location = createTableWithName(services, tableName);
+
+    String expectedPrefix =
+        String.format("%s/%s/%s/%s-", baseLocation, CATALOG, NAMESPACE, 
tableName);
+    assertThat(location).startsWith(expectedPrefix);
+    String suffix = location.substring(expectedPrefix.length());
+    // The suffix must be a parseable UUID, i.e. unique and unpredictable.
+    assertThat(UUID.fromString(suffix)).isNotNull();
+  }
+
+  @Test
+  @DisplayName("Sibling tables with adjacent names get non-overlapping UUID 
locations")
+  void testSiblingTablesDoNotOverlap(@TempDir Path tempDir) {
+    TestServices services =
+        TestServices.builder()
+            .config(serverConfig(DEFAULT_TABLE_LOCATION_USE_UUID.key(), 
"true"))
+            .build();
+    String baseLocation =
+        
LocationUtil.stripTrailingSlash(tempDir.toAbsolutePath().toUri().toString());
+    createCatalogAndNamespace(services, Map.of(), baseLocation);
+
+    // "t1" and "t1a" are adjacent by name and would collide under prefix 
matching without a UUID.
+    String loc1 = 
LocationUtil.stripTrailingSlash(createTableWithName(services, "t1"));
+    String loc2 = 
LocationUtil.stripTrailingSlash(createTableWithName(services, "t1a"));
+
+    assertThat(loc1).isNotEqualTo(loc2);
+    assertThat(loc1 + "/").doesNotStartWith(loc2 + "/");

Review Comment:
   I do not see value in this test given that other tests ensure the server 
adds location suffixes in this case.



##########
runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergTableLocationUuidTest.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.catalog.iceberg;
+
+import static 
org.apache.polaris.core.config.FeatureConfiguration.ALLOW_CLIENT_SPECIFIED_TABLE_LOCATION;
+import static 
org.apache.polaris.core.config.FeatureConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION;
+import static 
org.apache.polaris.core.config.FeatureConfiguration.DEFAULT_TABLE_LOCATION_USE_UUID;
+import static org.apache.polaris.service.admin.PolarisAuthzTestBase.SCHEMA;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.google.common.collect.ImmutableMap;
+import jakarta.ws.rs.core.Response;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.MetadataUpdate;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.BadRequestException;
+import org.apache.iceberg.rest.requests.CommitTransactionRequest;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.util.LocationUtil;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.CatalogProperties;
+import org.apache.polaris.core.admin.model.CreateCatalogRequest;
+import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
+import org.apache.polaris.service.TestServices;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * Tests for the UUID-based default table location feature ({@link
+ * 
org.apache.polaris.core.config.FeatureConfiguration#DEFAULT_TABLE_LOCATION_USE_UUID})
 and the
+ * opt-in for caller-specified locations ({@link
+ * 
org.apache.polaris.core.config.FeatureConfiguration#ALLOW_CLIENT_SPECIFIED_TABLE_LOCATION}).
+ */
+public class IcebergTableLocationUuidTest {
+
+  private static final String NAMESPACE = "ns";
+  private static final String CATALOG = "test-catalog";
+  // UUID v7
+  private static final UUID IDEMPOTENCY_KEY = new UUID(116617318654508422L, 
-7820829973016961092L);
+
+  private static final Map<String, Object> FILE_STORAGE_CONFIG =
+      Map.of(
+          "ALLOW_INSECURE_STORAGE_TYPES",
+          "true",
+          "SUPPORTED_CATALOG_STORAGE_TYPES",
+          List.of("FILE", "S3"));
+
+  private String getTableName() {
+    return "table_" + UUID.randomUUID();
+  }
+
+  /**
+   * Builds a server config of the shared file-storage settings plus the given 
feature overrides.
+   */
+  private static Map<String, Object> serverConfig(String... featureOverrides) {
+    ImmutableMap.Builder<String, Object> builder =
+        ImmutableMap.<String, Object>builder().putAll(FILE_STORAGE_CONFIG);
+    for (int i = 0; i < featureOverrides.length; i += 2) {
+      builder.put(featureOverrides[i], featureOverrides[i + 1]);
+    }
+    return builder.build();
+  }
+
+  private void createCatalogAndNamespace(
+      TestServices services, Map<String, String> catalogConfig, String 
catalogLocation) {
+    CatalogProperties.Builder propertiesBuilder =
+        CatalogProperties.builder()
+            .setDefaultBaseLocation(String.format("%s/%s", catalogLocation, 
CATALOG))
+            .putAll(catalogConfig);
+    StorageConfigInfo config =
+        FileStorageConfigInfo.builder()
+            .setStorageType(StorageConfigInfo.StorageTypeEnum.FILE)
+            .build();
+    Catalog catalogObject =
+        new Catalog(
+            Catalog.TypeEnum.INTERNAL,
+            CATALOG,
+            propertiesBuilder.build(),
+            1725487592064L,
+            1725487592064L,
+            1,
+            config);
+    try (Response response =
+        services
+            .catalogsApi()
+            .createCatalog(
+                new CreateCatalogRequest(catalogObject),
+                services.realmContext(),
+                services.securityContext())) {
+      
assertThat(response.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode());
+    }
+
+    CreateNamespaceRequest createNamespaceRequest =
+        
CreateNamespaceRequest.builder().withNamespace(Namespace.of(NAMESPACE)).build();
+    try (Response response =
+        services
+            .restApi()
+            .createNamespace(
+                CATALOG,
+                createNamespaceRequest,
+                IDEMPOTENCY_KEY,
+                services.realmContext(),
+                services.securityContext())) {
+      
assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+    }
+  }
+
+  /** Creates a table without a caller-specified location and returns the 
location it was given. */
+  private String createTableWithName(TestServices services, String name) {
+    CreateTableRequest createTableRequest =
+        CreateTableRequest.builder().withName(name).withSchema(SCHEMA).build();
+    try (Response response =
+        services
+            .restApi()
+            .createTable(
+                CATALOG,
+                NAMESPACE,
+                createTableRequest,
+                null,
+                IDEMPOTENCY_KEY,
+                services.realmContext(),
+                services.securityContext())) {
+      
assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+      return 
response.readEntity(LoadTableResponse.class).tableMetadata().location();
+    }
+  }
+
+  private Response.StatusType createTableWithLocation(TestServices services, 
String location) {
+    return submitCreateTable(
+        services,
+        CreateTableRequest.builder()
+            .withName(getTableName())
+            .withLocation(location)
+            .withSchema(SCHEMA)
+            .build());
+  }
+
+  /** Creates a table with no top-level location but the given property set. */
+  private Response.StatusType createTableWithProperty(
+      TestServices services, String propertyKey, String propertyValue) {
+    return submitCreateTable(
+        services,
+        CreateTableRequest.builder()
+            .withName(getTableName())
+            .withSchema(SCHEMA)
+            .setProperty(propertyKey, propertyValue)
+            .build());
+  }
+
+  private Response.StatusType submitCreateTable(TestServices services, 
CreateTableRequest request) {
+    try (Response response =
+        services
+            .restApi()
+            .createTable(
+                CATALOG,
+                NAMESPACE,
+                request,
+                null,
+                IDEMPOTENCY_KEY,
+                services.realmContext(),
+                services.securityContext())) {
+      return response.getStatusInfo();
+    }
+  }
+
+  @Test
+  @DisplayName("Default table location has a UUID suffix when the feature is 
enabled")
+  void testDefaultLocationHasUuid(@TempDir Path tempDir) {
+    TestServices services =
+        TestServices.builder()
+            .config(serverConfig(DEFAULT_TABLE_LOCATION_USE_UUID.key(), 
"true"))
+            .build();
+    String baseLocation =
+        
LocationUtil.stripTrailingSlash(tempDir.toAbsolutePath().toUri().toString());
+    createCatalogAndNamespace(services, Map.of(), baseLocation);
+
+    String tableName = getTableName();
+    String location = createTableWithName(services, tableName);
+
+    String expectedPrefix =
+        String.format("%s/%s/%s/%s-", baseLocation, CATALOG, NAMESPACE, 
tableName);
+    assertThat(location).startsWith(expectedPrefix);
+    String suffix = location.substring(expectedPrefix.length());
+    // The suffix must be a parseable UUID, i.e. unique and unpredictable.
+    assertThat(UUID.fromString(suffix)).isNotNull();

Review Comment:
   How does not assert that the suffix is `unique and unpredictable`?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -509,12 +511,105 @@ public LoadTableResponse createTableDirect(
     throw new IllegalStateException("Cannot wrap catalog that does not produce 
BaseTable");
   }
 
+  /**
+   * Table/view properties through which a caller can specify a storage 
location, in addition to the
+   * top-level {@code location} field. These redirect where data/metadata 
files are written, so they
+   * are gated by the same configuration as an explicit location.
+   */
+  private static final List<String> CLIENT_LOCATION_PROPERTY_KEYS =
+      List.of(
+          IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY,
+          IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY);
+
+  /**
+   * Rejects a caller-supplied table or view location unless the catalog is 
configured to allow it.
+   * By default Polaris generates the managed location for new tables and 
views; honoring a
+   * caller-specified location is opt-in (see {@link
+   * FeatureConfiguration#ALLOW_CLIENT_SPECIFIED_TABLE_LOCATION}). This covers 
both the top-level
+   * {@code location} field and the {@code write.data.path} / {@code 
write.metadata.path}
+   * properties.
+   */
+  private void rejectClientSpecifiedLocationIfDisallowed(
+      String requestLocation, Map<String, String> requestProperties) {
+    enforceClientSpecifiedLocationAllowed(
+        clientSpecifiedLocationSource(requestLocation, requestProperties));
+  }
+
+  /**
+   * Rejects an update that changes a table/view location (via {@code 
SetLocation} or the {@code
+   * write.data.path} / {@code write.metadata.path} properties) unless the 
catalog is configured to
+   * allow caller-specified locations.
+   */
+  private void rejectClientSpecifiedLocationIfDisallowed(UpdateTableRequest 
request) {
+    
enforceClientSpecifiedLocationAllowed(clientSpecifiedLocationSource(request));
+  }
+
+  private void enforceClientSpecifiedLocationAllowed(String specifiedBy) {
+    // Federated catalogs delegate location handling to the remote catalog, so 
this constraint
+    // only applies to Polaris-managed catalogs.
+    if (specifiedBy == null || isFederated) {
+      return;
+    }
+    if (!realmConfig()
+        .getConfig(
+            FeatureConfiguration.ALLOW_CLIENT_SPECIFIED_TABLE_LOCATION,
+            getResolvedCatalogEntity())) {
+      throw new BadRequestException(
+          "Specifying a table or view location (%s) is not allowed; set %s to 
true to allow it.",
+          specifiedBy, 
FeatureConfiguration.ALLOW_CLIENT_SPECIFIED_TABLE_LOCATION.catalogConfig());

Review Comment:
   `specifiedBy` can be "location", leading the confusing message like 
`Specifying a table or view location (location) is not allowed; [...]`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to