dennishuo commented on code in PR #3237:
URL: https://github.com/apache/polaris/pull/3237#discussion_r2613428814


##########
persistence/nosql/persistence/metastore/src/main/java/org/apache/polaris/persistence/nosql/metastore/mutation/MutationAttempt.java:
##########
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.persistence.nosql.metastore.mutation;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.polaris.core.entity.PolarisEntityConstants.ENTITY_BASE_LOCATION;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.CATALOG_NOT_EMPTY;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_CANNOT_BE_RENAMED;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_NOT_FOUND;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_UNDROPPABLE;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.NAMESPACE_NOT_EMPTY;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.POLICY_HAS_MAPPINGS;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.TARGET_ENTITY_CONCURRENTLY_MODIFIED;
+import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
+import static 
org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet.entityIdSet;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.entitySubTypeCodeFromObjType;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToEntity;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToObj;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.maybeObjToPolarisPrincipalSecrets;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.objTypeForPolarisType;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMapping.POLICY_MAPPING_SERIALIZER;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj.POLICY_MAPPINGS_REF_NAME;
+import static 
org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.identifierFromLocationString;
+import static 
org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.indexKeyToIdentifierBuilder;
+import static 
org.apache.polaris.persistence.nosql.metastore.indexaccess.IndexUtils.hasChildren;
+import static 
org.apache.polaris.persistence.nosql.metastore.mutation.MutationResults.newMutableMutationResults;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisEntityCore;
+import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
+import org.apache.polaris.core.persistence.dao.entity.BaseResult;
+import org.apache.polaris.core.storage.StorageLocation;
+import org.apache.polaris.persistence.nosql.api.commit.CommitterState;
+import org.apache.polaris.persistence.nosql.api.index.IndexKey;
+import org.apache.polaris.persistence.nosql.api.index.UpdatableIndex;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.coretypes.ContainerObj;
+import org.apache.polaris.persistence.nosql.coretypes.ObjBase;
+import org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet;
+import org.apache.polaris.persistence.nosql.coretypes.changes.Change;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeAdd;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRemove;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRename;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeUpdate;
+import org.apache.polaris.persistence.nosql.coretypes.content.PolicyObj;
+import org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj;
+import org.apache.polaris.persistence.nosql.metastore.ContentIdentifier;
+import org.apache.polaris.persistence.nosql.metastore.committers.ChangeResult;
+import 
org.apache.polaris.persistence.nosql.metastore.indexaccess.MemoizedIndexedAccess;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public record MutationAttempt(
+    UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType,
+    List<EntityUpdate> updates,
+    CommitterState<? extends ContainerObj, MutationResults> state,
+    UpdatableIndex<ObjRef> byName,
+    UpdatableIndex<IndexKey> byId,
+    UpdatableIndex<Change> changes,
+    UpdatableIndex<EntityIdSet> locations,
+    MemoizedIndexedAccess memoizedIndexedAccess) {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MutationAttempt.class);
+
+  public static ObjBase objForChangeComparison(
+      PolarisBaseEntity entity,
+      Optional<PolarisPrincipalSecrets> currentSecrets,
+      ObjBase originalObj) {
+    return mapToObj(entity, currentSecrets)
+        .updateTimestamp(originalObj.createTimestamp())
+        .id(originalObj.id())
+        .numParts(originalObj.numParts())
+        .entityVersion(originalObj.entityVersion())
+        .createTimestamp(originalObj.createTimestamp())
+        .build();
+  }
+
+  public ChangeResult<MutationResults> apply() {
+    var mutationResults = newMutableMutationResults();
+    for (var update : updates) {
+      LOGGER.debug("Processing update {}", update);
+
+      switch (update.operation()) {
+        case CREATE ->
+            applyEntityCreateMutation(
+                updateKeyForCatalogAndEntityType,
+                state,
+                byName,
+                byId,
+                changes,
+                locations,
+                update,
+                mutationResults);
+        case UPDATE ->
+            applyEntityUpdateMutation(
+                updateKeyForCatalogAndEntityType,
+                state,
+                byName,
+                byId,
+                changes,
+                locations,
+                update,
+                mutationResults);
+        case DELETE ->
+            applyEntityDeleteMutation(
+                updateKeyForCatalogAndEntityType,
+                state,
+                byName,
+                byId,
+                changes,
+                locations,
+                update,
+                mutationResults,
+                memoizedIndexedAccess);
+        default -> throw new IllegalStateException("Unexpected operation " + 
update.operation());
+      }
+    }
+
+    var doCommit = mutationResults.anyChange && !mutationResults.hardFailure;
+    LOGGER.debug(
+        "{} changes (has changes: {}, failures: {})",
+        doCommit ? "Committing" : "Not committing",
+        mutationResults.anyChange,
+        mutationResults.failuresAsString());
+
+    return doCommit
+        ? new ChangeResult.CommitChange<>(mutationResults)
+        : new ChangeResult.NoChange<>(mutationResults);
+  }
+
+  private static void applyEntityDeleteMutation(
+      UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType,
+      CommitterState<? extends ContainerObj, MutationResults> state,
+      UpdatableIndex<ObjRef> byName,
+      UpdatableIndex<IndexKey> byId,
+      UpdatableIndex<Change> changes,
+      UpdatableIndex<EntityIdSet> locations,
+      EntityUpdate update,
+      MutationResults mutationResults,
+      MemoizedIndexedAccess memoizedIndexedAccess) {
+    var entity = update.entity();
+    var entityType = entity.getType();
+    var persistence = state.persistence();
+
+    var entityIdKey = IndexKey.key(entity.getId());
+    var originalNameKey = byId.get(entityIdKey);
+
+    if (originalNameKey == null) {
+      mutationResults.dropResult(ENTITY_NOT_FOUND);
+      return;
+    }
+    var originalRef = byName.get(originalNameKey);
+    if (originalRef == null) {
+      mutationResults.dropResult(ENTITY_NOT_FOUND);
+      return;
+    }
+    var originalObj =
+        (ObjBase)
+            state
+                .persistence()
+                .fetch(
+                    originalRef,
+                    objTypeForPolarisType(entityType, 
entity.getSubType()).targetClass());
+    if (originalObj == null) {
+      mutationResults.dropResult(ENTITY_NOT_FOUND);
+      return;
+    }
+    if (entity.getEntityVersion() != originalObj.entityVersion()) {
+      mutationResults.dropResult(TARGET_ENTITY_CONCURRENTLY_MODIFIED);
+      return;
+    }
+    if (entity.cannotBeDroppedOrRenamed()) {
+      mutationResults.dropResult(ENTITY_UNDROPPABLE);
+      return;
+    }
+
+    updateLocationsIndex(locations, originalObj, null);
+
+    var ok =
+        switch (entityType) {
+          case NAMESPACE -> {
+            if (hasChildren(
+                updateKeyForCatalogAndEntityType.catalogId(), byName, byId, 
entity.getId())) {
+              mutationResults.dropResult(NAMESPACE_NOT_EMPTY);
+              yield false;
+            }
+            yield true;
+          }
+          case CATALOG -> {
+            var catalogState = 
memoizedIndexedAccess.catalogContent(entity.getId());
+
+            if (catalogState.nameIndex().map(idx -> 
idx.iterator().hasNext()).orElse(false)) {
+              mutationResults.dropResult(NAMESPACE_NOT_EMPTY);
+              yield false;
+            }
+
+            // VALIDATION LOGIC COPIED
+
+            var catalogRolesAccess =
+                memoizedIndexedAccess.indexedAccess(
+                    entity.getId(), PolarisEntityType.CATALOG_ROLE.getCode());
+            var numCatalogRoles =
+                catalogRolesAccess
+                    .nameIndex()
+                    .map(
+                        idx -> {
+                          var iter = idx.iterator();
+                          var cnt = 0;
+                          if (iter.hasNext()) {
+                            iter.next();
+                            cnt++;
+                          }
+                          if (iter.hasNext()) {
+                            iter.next();
+                            cnt++;
+                          }
+                          return cnt;
+                        })
+                    .orElse(0);
+
+            // If we have 2, we cannot drop the catalog.
+            // If only one left, better be the admin role
+            if (numCatalogRoles > 1) {
+              mutationResults.dropResult(CATALOG_NOT_EMPTY);
+              yield false;
+            }
+            // If 1, drop the last catalog role.
+            // Should be the catalog admin role, but don't validate this.
+            // (No need to drop the catalog role here, it'll be eventually 
done by
+            // persistence-maintenance!)
+
+            yield true;
+          }
+          case POLICY ->
+              memoizedIndexedAccess
+                  .referenceHead(POLICY_MAPPINGS_REF_NAME, 
PolicyMappingsObj.class)
+                  .map(
+                      policyMappingsObj -> {
+                        var index =
+                            policyMappingsObj
+                                .policyMappings()
+                                .indexForRead(persistence, 
POLICY_MAPPING_SERIALIZER);
+
+                        var prefixKey = policyIndexPrefixKey((PolicyObj) 
originalObj, entity);
+
+                        var iter = index.iterator(prefixKey, prefixKey, false);
+
+                        if (iter.hasNext() && !update.cleanup()) {
+                          mutationResults.dropResult(POLICY_HAS_MAPPINGS);
+                          return false;
+                        }
+
+                        while (iter.hasNext()) {
+                          var elem = iter.next();
+                          var key = 
PolicyMappingsObj.PolicyMappingKey.fromIndexKey(elem.getKey());
+                          var reversed = key.reverse();
+
+                          
mutationResults.addPolicyIndexKeyToRemove(elem.getKey());
+                          
mutationResults.addPolicyIndexKeyToRemove(reversed.toIndexKey());
+                        }
+
+                        return true;
+                      })
+                  .orElse(true);
+          default -> true;
+        };
+    if (ok) {
+      byId.remove(entityIdKey);
+      byName.remove(requireNonNull(originalNameKey));
+      mutationResults.dropResult(entity);
+
+      if (changes != null) {
+        changes.put(originalNameKey, ChangeRemove.builder().build());
+      }
+    }
+  }
+
+  private static void applyEntityUpdateMutation(
+      UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType,
+      CommitterState<? extends ContainerObj, MutationResults> state,
+      UpdatableIndex<ObjRef> byName,
+      UpdatableIndex<IndexKey> byId,
+      UpdatableIndex<Change> changes,
+      UpdatableIndex<EntityIdSet> locations,
+      EntityUpdate update,
+      MutationResults mutationResults) {
+    var entity = update.entity();
+    var entityType = entity.getType();
+    var persistence = state.persistence();
+    var now = persistence.currentInstant();
+
+    var entityIdKey = IndexKey.key(entity.getId());
+    var originalNameKey = byId.get(entityIdKey);
+
+    var entityParentId = entity.getParentId();
+
+    if (originalNameKey == null) {
+      mutationResults.entityResult(ENTITY_NOT_FOUND);
+      return;
+    }
+    var originalRef = byName.get(originalNameKey);
+    if (originalRef == null) {
+      mutationResults.entityResult(ENTITY_NOT_FOUND);
+      return;
+    }
+    var originalObj =
+        (ObjBase)
+            state
+                .persistence()
+                .fetch(
+                    originalRef,
+                    objTypeForPolarisType(entityType, 
entity.getSubType()).targetClass());
+    if (originalObj == null) {
+      mutationResults.entityResult(ENTITY_NOT_FOUND);
+      return;
+    }
+    if (entity.getEntityVersion() != originalObj.entityVersion()) {
+      mutationResults.entityResult(TARGET_ENTITY_CONCURRENTLY_MODIFIED);
+      return;
+    }
+
+    var currentSecrets = maybeObjToPolarisPrincipalSecrets(originalObj);
+
+    var renameOrMove =
+        entityParentId != originalObj.parentStableId()
+            || !entity.getName().equals(originalObj.name());
+
+    if (renameOrMove) {
+      if (entity.cannotBeDroppedOrRenamed()) {
+        mutationResults.entityResult(ENTITY_CANNOT_BE_RENAMED);
+        return;
+      }
+      if (!byName.remove(originalNameKey)) {

Review Comment:
   Does `byName` represent a fixed overall version for the duration of this 
method or is it a read-through to the "head" version of persistence?
   
   My understanding is that it's a fixed/snapshot that then collects updates to 
apply at the end, is that correct? As in, the "immutable object index" from 
your design doc?
   
   If so, would that mean that since we already validated 
`byName.get(originalNameKey)` on line 326 that the `remove` here *must* 
succeed, or else we have a code bug? In that case this should probably be an 
assert/IllegalStateException rather than returning `ENTITY_NOT_FOUND` because 
some assumptions must have gone wrong.
   
   Or if `byName` is indeed subject to concurrent removal from somewhere else 
please add some comments around here indicating that it's possible and maybe 
additional comments around any other accesses in this method to the index to 
help explain how we safely reconcile the changing state.



##########
persistence/nosql/persistence/metastore/src/main/java/org/apache/polaris/persistence/nosql/metastore/mutation/MutationAttempt.java:
##########
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.persistence.nosql.metastore.mutation;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.polaris.core.entity.PolarisEntityConstants.ENTITY_BASE_LOCATION;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.CATALOG_NOT_EMPTY;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_CANNOT_BE_RENAMED;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_NOT_FOUND;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_UNDROPPABLE;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.NAMESPACE_NOT_EMPTY;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.POLICY_HAS_MAPPINGS;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.TARGET_ENTITY_CONCURRENTLY_MODIFIED;
+import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
+import static 
org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet.entityIdSet;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.entitySubTypeCodeFromObjType;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToEntity;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToObj;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.maybeObjToPolarisPrincipalSecrets;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.objTypeForPolarisType;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMapping.POLICY_MAPPING_SERIALIZER;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj.POLICY_MAPPINGS_REF_NAME;
+import static 
org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.identifierFromLocationString;
+import static 
org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.indexKeyToIdentifierBuilder;
+import static 
org.apache.polaris.persistence.nosql.metastore.indexaccess.IndexUtils.hasChildren;
+import static 
org.apache.polaris.persistence.nosql.metastore.mutation.MutationResults.newMutableMutationResults;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisEntityCore;
+import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
+import org.apache.polaris.core.persistence.dao.entity.BaseResult;
+import org.apache.polaris.core.storage.StorageLocation;
+import org.apache.polaris.persistence.nosql.api.commit.CommitterState;
+import org.apache.polaris.persistence.nosql.api.index.IndexKey;
+import org.apache.polaris.persistence.nosql.api.index.UpdatableIndex;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.coretypes.ContainerObj;
+import org.apache.polaris.persistence.nosql.coretypes.ObjBase;
+import org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet;
+import org.apache.polaris.persistence.nosql.coretypes.changes.Change;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeAdd;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRemove;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRename;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeUpdate;
+import org.apache.polaris.persistence.nosql.coretypes.content.PolicyObj;
+import org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj;
+import org.apache.polaris.persistence.nosql.metastore.ContentIdentifier;
+import org.apache.polaris.persistence.nosql.metastore.committers.ChangeResult;
+import 
org.apache.polaris.persistence.nosql.metastore.indexaccess.MemoizedIndexedAccess;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public record MutationAttempt(
+    UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType,
+    List<EntityUpdate> updates,
+    CommitterState<? extends ContainerObj, MutationResults> state,
+    UpdatableIndex<ObjRef> byName,
+    UpdatableIndex<IndexKey> byId,
+    UpdatableIndex<Change> changes,
+    UpdatableIndex<EntityIdSet> locations,
+    MemoizedIndexedAccess memoizedIndexedAccess) {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MutationAttempt.class);
+
+  public static ObjBase objForChangeComparison(

Review Comment:
   Please add javadoc comment for this one explaining intent and how it intends 
to canonicalize whichever fields for comparison.
   
   Is this to detect cases where a *real* update actually happens to result in 
an entity whose full contents are unchanged, or is there a common concurrency 
situation that is expected to cause "no change" mutations (e.g. on some kind of 
retry?)



##########
persistence/nosql/persistence/metastore/src/main/java/org/apache/polaris/persistence/nosql/metastore/mutation/MutationAttempt.java:
##########
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.persistence.nosql.metastore.mutation;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.polaris.core.entity.PolarisEntityConstants.ENTITY_BASE_LOCATION;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.CATALOG_NOT_EMPTY;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_CANNOT_BE_RENAMED;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_NOT_FOUND;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_UNDROPPABLE;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.NAMESPACE_NOT_EMPTY;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.POLICY_HAS_MAPPINGS;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.TARGET_ENTITY_CONCURRENTLY_MODIFIED;
+import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
+import static 
org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet.entityIdSet;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.entitySubTypeCodeFromObjType;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToEntity;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToObj;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.maybeObjToPolarisPrincipalSecrets;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.objTypeForPolarisType;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMapping.POLICY_MAPPING_SERIALIZER;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj.POLICY_MAPPINGS_REF_NAME;
+import static 
org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.identifierFromLocationString;
+import static 
org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.indexKeyToIdentifierBuilder;
+import static 
org.apache.polaris.persistence.nosql.metastore.indexaccess.IndexUtils.hasChildren;
+import static 
org.apache.polaris.persistence.nosql.metastore.mutation.MutationResults.newMutableMutationResults;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisEntityCore;
+import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
+import org.apache.polaris.core.persistence.dao.entity.BaseResult;
+import org.apache.polaris.core.storage.StorageLocation;
+import org.apache.polaris.persistence.nosql.api.commit.CommitterState;
+import org.apache.polaris.persistence.nosql.api.index.IndexKey;
+import org.apache.polaris.persistence.nosql.api.index.UpdatableIndex;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.coretypes.ContainerObj;
+import org.apache.polaris.persistence.nosql.coretypes.ObjBase;
+import org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet;
+import org.apache.polaris.persistence.nosql.coretypes.changes.Change;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeAdd;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRemove;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRename;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeUpdate;
+import org.apache.polaris.persistence.nosql.coretypes.content.PolicyObj;
+import org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj;
+import org.apache.polaris.persistence.nosql.metastore.ContentIdentifier;
+import org.apache.polaris.persistence.nosql.metastore.committers.ChangeResult;
+import 
org.apache.polaris.persistence.nosql.metastore.indexaccess.MemoizedIndexedAccess;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public record MutationAttempt(
+    UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType,
+    List<EntityUpdate> updates,
+    CommitterState<? extends ContainerObj, MutationResults> state,
+    UpdatableIndex<ObjRef> byName,
+    UpdatableIndex<IndexKey> byId,
+    UpdatableIndex<Change> changes,
+    UpdatableIndex<EntityIdSet> locations,
+    MemoizedIndexedAccess memoizedIndexedAccess) {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MutationAttempt.class);
+
+  public static ObjBase objForChangeComparison(
+      PolarisBaseEntity entity,
+      Optional<PolarisPrincipalSecrets> currentSecrets,
+      ObjBase originalObj) {
+    return mapToObj(entity, currentSecrets)
+        .updateTimestamp(originalObj.createTimestamp())
+        .id(originalObj.id())
+        .numParts(originalObj.numParts())
+        .entityVersion(originalObj.entityVersion())
+        .createTimestamp(originalObj.createTimestamp())
+        .build();
+  }
+
+  public ChangeResult<MutationResults> apply() {
+    var mutationResults = newMutableMutationResults();
+    for (var update : updates) {
+      LOGGER.debug("Processing update {}", update);
+
+      switch (update.operation()) {
+        case CREATE ->
+            applyEntityCreateMutation(
+                updateKeyForCatalogAndEntityType,
+                state,
+                byName,
+                byId,
+                changes,
+                locations,
+                update,
+                mutationResults);
+        case UPDATE ->
+            applyEntityUpdateMutation(
+                updateKeyForCatalogAndEntityType,
+                state,
+                byName,
+                byId,
+                changes,
+                locations,
+                update,
+                mutationResults);
+        case DELETE ->
+            applyEntityDeleteMutation(
+                updateKeyForCatalogAndEntityType,
+                state,
+                byName,
+                byId,
+                changes,
+                locations,
+                update,
+                mutationResults,
+                memoizedIndexedAccess);
+        default -> throw new IllegalStateException("Unexpected operation " + 
update.operation());
+      }
+    }
+
+    var doCommit = mutationResults.anyChange && !mutationResults.hardFailure;
+    LOGGER.debug(
+        "{} changes (has changes: {}, failures: {})",
+        doCommit ? "Committing" : "Not committing",
+        mutationResults.anyChange,
+        mutationResults.failuresAsString());
+
+    return doCommit
+        ? new ChangeResult.CommitChange<>(mutationResults)
+        : new ChangeResult.NoChange<>(mutationResults);
+  }
+
+  private static void applyEntityDeleteMutation(
+      UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType,
+      CommitterState<? extends ContainerObj, MutationResults> state,
+      UpdatableIndex<ObjRef> byName,
+      UpdatableIndex<IndexKey> byId,
+      UpdatableIndex<Change> changes,
+      UpdatableIndex<EntityIdSet> locations,
+      EntityUpdate update,
+      MutationResults mutationResults,
+      MemoizedIndexedAccess memoizedIndexedAccess) {
+    var entity = update.entity();
+    var entityType = entity.getType();
+    var persistence = state.persistence();
+
+    var entityIdKey = IndexKey.key(entity.getId());
+    var originalNameKey = byId.get(entityIdKey);
+
+    if (originalNameKey == null) {
+      mutationResults.dropResult(ENTITY_NOT_FOUND);
+      return;
+    }
+    var originalRef = byName.get(originalNameKey);
+    if (originalRef == null) {
+      mutationResults.dropResult(ENTITY_NOT_FOUND);
+      return;
+    }
+    var originalObj =
+        (ObjBase)
+            state
+                .persistence()
+                .fetch(
+                    originalRef,
+                    objTypeForPolarisType(entityType, 
entity.getSubType()).targetClass());
+    if (originalObj == null) {
+      mutationResults.dropResult(ENTITY_NOT_FOUND);
+      return;
+    }
+    if (entity.getEntityVersion() != originalObj.entityVersion()) {
+      mutationResults.dropResult(TARGET_ENTITY_CONCURRENTLY_MODIFIED);
+      return;
+    }
+    if (entity.cannotBeDroppedOrRenamed()) {
+      mutationResults.dropResult(ENTITY_UNDROPPABLE);
+      return;
+    }
+
+    updateLocationsIndex(locations, originalObj, null);
+
+    var ok =
+        switch (entityType) {
+          case NAMESPACE -> {
+            if (hasChildren(
+                updateKeyForCatalogAndEntityType.catalogId(), byName, byId, 
entity.getId())) {
+              mutationResults.dropResult(NAMESPACE_NOT_EMPTY);
+              yield false;
+            }
+            yield true;
+          }
+          case CATALOG -> {
+            var catalogState = 
memoizedIndexedAccess.catalogContent(entity.getId());
+
+            if (catalogState.nameIndex().map(idx -> 
idx.iterator().hasNext()).orElse(false)) {
+              mutationResults.dropResult(NAMESPACE_NOT_EMPTY);
+              yield false;
+            }
+
+            // VALIDATION LOGIC COPIED
+
+            var catalogRolesAccess =
+                memoizedIndexedAccess.indexedAccess(
+                    entity.getId(), PolarisEntityType.CATALOG_ROLE.getCode());
+            var numCatalogRoles =
+                catalogRolesAccess
+                    .nameIndex()
+                    .map(
+                        idx -> {
+                          var iter = idx.iterator();
+                          var cnt = 0;
+                          if (iter.hasNext()) {
+                            iter.next();
+                            cnt++;
+                          }
+                          if (iter.hasNext()) {
+                            iter.next();
+                            cnt++;
+                          }
+                          return cnt;
+                        })
+                    .orElse(0);
+
+            // If we have 2, we cannot drop the catalog.
+            // If only one left, better be the admin role
+            if (numCatalogRoles > 1) {
+              mutationResults.dropResult(CATALOG_NOT_EMPTY);
+              yield false;
+            }
+            // If 1, drop the last catalog role.
+            // Should be the catalog admin role, but don't validate this.
+            // (No need to drop the catalog role here, it'll be eventually 
done by
+            // persistence-maintenance!)
+
+            yield true;
+          }
+          case POLICY ->
+              memoizedIndexedAccess
+                  .referenceHead(POLICY_MAPPINGS_REF_NAME, 
PolicyMappingsObj.class)
+                  .map(
+                      policyMappingsObj -> {
+                        var index =
+                            policyMappingsObj
+                                .policyMappings()
+                                .indexForRead(persistence, 
POLICY_MAPPING_SERIALIZER);
+
+                        var prefixKey = policyIndexPrefixKey((PolicyObj) 
originalObj, entity);
+
+                        var iter = index.iterator(prefixKey, prefixKey, false);
+
+                        if (iter.hasNext() && !update.cleanup()) {
+                          mutationResults.dropResult(POLICY_HAS_MAPPINGS);
+                          return false;
+                        }
+
+                        while (iter.hasNext()) {
+                          var elem = iter.next();
+                          var key = 
PolicyMappingsObj.PolicyMappingKey.fromIndexKey(elem.getKey());
+                          var reversed = key.reverse();
+
+                          
mutationResults.addPolicyIndexKeyToRemove(elem.getKey());
+                          
mutationResults.addPolicyIndexKeyToRemove(reversed.toIndexKey());
+                        }
+
+                        return true;
+                      })
+                  .orElse(true);
+          default -> true;
+        };
+    if (ok) {
+      byId.remove(entityIdKey);
+      byName.remove(requireNonNull(originalNameKey));
+      mutationResults.dropResult(entity);
+
+      if (changes != null) {
+        changes.put(originalNameKey, ChangeRemove.builder().build());
+      }
+    }
+  }
+
+  private static void applyEntityUpdateMutation(
+      UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType,
+      CommitterState<? extends ContainerObj, MutationResults> state,
+      UpdatableIndex<ObjRef> byName,
+      UpdatableIndex<IndexKey> byId,
+      UpdatableIndex<Change> changes,
+      UpdatableIndex<EntityIdSet> locations,
+      EntityUpdate update,
+      MutationResults mutationResults) {
+    var entity = update.entity();
+    var entityType = entity.getType();
+    var persistence = state.persistence();
+    var now = persistence.currentInstant();
+
+    var entityIdKey = IndexKey.key(entity.getId());
+    var originalNameKey = byId.get(entityIdKey);
+
+    var entityParentId = entity.getParentId();
+
+    if (originalNameKey == null) {
+      mutationResults.entityResult(ENTITY_NOT_FOUND);
+      return;
+    }
+    var originalRef = byName.get(originalNameKey);

Review Comment:
   What are the scenarios that can lead to this `byName` not having this 
`originalNameKey` that we just got from `byId`? Would it indicate `byId` and 
`byName` getting out of sync somehow?



##########
persistence/nosql/persistence/metastore/src/main/java/org/apache/polaris/persistence/nosql/metastore/mutation/MutationAttempt.java:
##########
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.persistence.nosql.metastore.mutation;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.polaris.core.entity.PolarisEntityConstants.ENTITY_BASE_LOCATION;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.CATALOG_NOT_EMPTY;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_CANNOT_BE_RENAMED;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_NOT_FOUND;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_UNDROPPABLE;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.NAMESPACE_NOT_EMPTY;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.POLICY_HAS_MAPPINGS;
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.TARGET_ENTITY_CONCURRENTLY_MODIFIED;
+import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
+import static 
org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet.entityIdSet;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.entitySubTypeCodeFromObjType;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToEntity;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToObj;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.maybeObjToPolarisPrincipalSecrets;
+import static 
org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.objTypeForPolarisType;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMapping.POLICY_MAPPING_SERIALIZER;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj.POLICY_MAPPINGS_REF_NAME;
+import static 
org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.identifierFromLocationString;
+import static 
org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.indexKeyToIdentifierBuilder;
+import static 
org.apache.polaris.persistence.nosql.metastore.indexaccess.IndexUtils.hasChildren;
+import static 
org.apache.polaris.persistence.nosql.metastore.mutation.MutationResults.newMutableMutationResults;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisEntityCore;
+import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
+import org.apache.polaris.core.persistence.dao.entity.BaseResult;
+import org.apache.polaris.core.storage.StorageLocation;
+import org.apache.polaris.persistence.nosql.api.commit.CommitterState;
+import org.apache.polaris.persistence.nosql.api.index.IndexKey;
+import org.apache.polaris.persistence.nosql.api.index.UpdatableIndex;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.coretypes.ContainerObj;
+import org.apache.polaris.persistence.nosql.coretypes.ObjBase;
+import org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet;
+import org.apache.polaris.persistence.nosql.coretypes.changes.Change;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeAdd;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRemove;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRename;
+import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeUpdate;
+import org.apache.polaris.persistence.nosql.coretypes.content.PolicyObj;
+import org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj;
+import org.apache.polaris.persistence.nosql.metastore.ContentIdentifier;
+import org.apache.polaris.persistence.nosql.metastore.committers.ChangeResult;
+import 
org.apache.polaris.persistence.nosql.metastore.indexaccess.MemoizedIndexedAccess;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public record MutationAttempt(
+    UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType,
+    List<EntityUpdate> updates,
+    CommitterState<? extends ContainerObj, MutationResults> state,
+    UpdatableIndex<ObjRef> byName,
+    UpdatableIndex<IndexKey> byId,
+    UpdatableIndex<Change> changes,
+    UpdatableIndex<EntityIdSet> locations,
+    MemoizedIndexedAccess memoizedIndexedAccess) {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MutationAttempt.class);
+
+  public static ObjBase objForChangeComparison(
+      PolarisBaseEntity entity,
+      Optional<PolarisPrincipalSecrets> currentSecrets,
+      ObjBase originalObj) {
+    return mapToObj(entity, currentSecrets)
+        .updateTimestamp(originalObj.createTimestamp())

Review Comment:
   Is this intentional to use `originalObj.createTimestamp` as the 
`updateTimestamp` for the change-comparison object? If so, definitely needs 
some extensive comments explaining why.
   
   If the intent is to canonicalize `updateTimestamp` to not be relevant for 
semantic comparison, it might be better to set it to 0L or something to 
indicate intentional/obvious missing values rather than having comparison 
objects floating around that have the `createTimestamp` written in.



##########
persistence/nosql/persistence/metastore/src/main/java/org/apache/polaris/persistence/nosql/metastore/ContentIdentifier.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.metastore;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.apache.polaris.persistence.nosql.api.index.IndexKey;
+import org.apache.polaris.service.types.PolicyIdentifier;
+import org.immutables.value.Value;
+
[email protected](underrideToString = "asDotDelimitedString")
+@PolarisImmutable
+public interface ContentIdentifier {
+  @Value.Parameter
+  @JsonValue
+  List<String> elements();
+
+  static ContentIdentifier identifier(List<String> elements) {
+    return ImmutableContentIdentifier.of(elements);
+  }
+
+  static ContentIdentifier identifier(String[] namespace, String name) {
+    return 
ImmutableContentIdentifier.builder().addElements(namespace).addElements(name).build();
+  }
+
+  static ContentIdentifier identifier(String... elements) {
+    return ImmutableContentIdentifier.of(List.of(elements));
+  }
+
+  static ContentIdentifier identifierFor(PolicyIdentifier identifier) {
+    return identifier(identifier.getNamespace().levels(), 
identifier.getName());
+  }
+
+  static ContentIdentifier identifierFor(TableIdentifier identifier) {
+    return identifier(identifier.namespace().levels(), identifier.name());
+  }
+
+  static ContentIdentifier identifierFor(Namespace namespace) {
+    return identifier(namespace.levels());
+  }
+
+  static ContentIdentifier identifierFromLocationString(String locationString) 
{
+    var builder = builder();
+    var len = locationString.length();
+    var off = -1;
+    for (var i = 0; i < len; i++) {
+      var c = locationString.charAt(i);
+      checkArgument(c >= ' ', "Control characters are forbidden in locations");
+      if (c == '/' || c == '\\') {

Review Comment:
   That raises an interesting point - we'd ideally be able to incorporate 
storage-type-specific canonicalization logic.
   
   In this code for example, presumably the `\\` is specifically "windows local 
filesystem" use cases (which really should only be very limited test/dev 
scenarios where overlap-checks probably aren't relevant anyways; any 
"sensitive" production use cases would be really weird to run on a single-node 
Windows environment with engine and Polaris catalog in the same place, or else 
some NFS mount).
   
   But for a nio Filesystem, it's actually *correct* to canonicalize with real 
filesystem syntax, such as collapsing "foo/./././././bar" into "foo/bar" or 
"foo/bar/.." becoming "foo" when it comes to "overlap detection" because 
overlap detection is precisely supposed to prevent security issues by sharing 
directories. So it becomes important to avoid creative syntax circumventing our 
check.
   
   And yet, as mentioned before, the "local filesystem" use cases where paths 
have fancy syntactic interpretations also happen to be the cases where 
overlap-check probably doesn't actually matter.
   
   I guess it's fine to leave as-is for now, but custom parsing of paths is 
probably not something we want to keep as ad-hoc inline logic here in the 
long-term. It might be worth adding a comment or filing a github issue to call 
out that this logic isn't necessarily authoritative for any future developers 
who might read it as gospel.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to