dennishuo commented on code in PR #3237: URL: https://github.com/apache/polaris/pull/3237#discussion_r2613428814
########## persistence/nosql/persistence/metastore/src/main/java/org/apache/polaris/persistence/nosql/metastore/mutation/MutationAttempt.java: ########## @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.persistence.nosql.metastore.mutation; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; +import static org.apache.polaris.core.entity.PolarisEntityConstants.ENTITY_BASE_LOCATION; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.CATALOG_NOT_EMPTY; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_CANNOT_BE_RENAMED; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_NOT_FOUND; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_UNDROPPABLE; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.NAMESPACE_NOT_EMPTY; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.POLICY_HAS_MAPPINGS; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.TARGET_ENTITY_CONCURRENTLY_MODIFIED; +import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef; +import static org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet.entityIdSet; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.entitySubTypeCodeFromObjType; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToEntity; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToObj; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.maybeObjToPolarisPrincipalSecrets; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.objTypeForPolarisType; +import static org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMapping.POLICY_MAPPING_SERIALIZER; +import static org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj.POLICY_MAPPINGS_REF_NAME; +import static org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.identifierFromLocationString; +import static org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.indexKeyToIdentifierBuilder; +import static org.apache.polaris.persistence.nosql.metastore.indexaccess.IndexUtils.hasChildren; +import static org.apache.polaris.persistence.nosql.metastore.mutation.MutationResults.newMutableMutationResults; + +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Consumer; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisEntityCore; +import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.entity.PolarisPrincipalSecrets; +import org.apache.polaris.core.persistence.dao.entity.BaseResult; +import org.apache.polaris.core.storage.StorageLocation; +import org.apache.polaris.persistence.nosql.api.commit.CommitterState; +import org.apache.polaris.persistence.nosql.api.index.IndexKey; +import org.apache.polaris.persistence.nosql.api.index.UpdatableIndex; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.apache.polaris.persistence.nosql.coretypes.ContainerObj; +import org.apache.polaris.persistence.nosql.coretypes.ObjBase; +import org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet; +import org.apache.polaris.persistence.nosql.coretypes.changes.Change; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeAdd; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRemove; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRename; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeUpdate; +import org.apache.polaris.persistence.nosql.coretypes.content.PolicyObj; +import org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj; +import org.apache.polaris.persistence.nosql.metastore.ContentIdentifier; +import org.apache.polaris.persistence.nosql.metastore.committers.ChangeResult; +import org.apache.polaris.persistence.nosql.metastore.indexaccess.MemoizedIndexedAccess; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public record MutationAttempt( + UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType, + List<EntityUpdate> updates, + CommitterState<? extends ContainerObj, MutationResults> state, + UpdatableIndex<ObjRef> byName, + UpdatableIndex<IndexKey> byId, + UpdatableIndex<Change> changes, + UpdatableIndex<EntityIdSet> locations, + MemoizedIndexedAccess memoizedIndexedAccess) { + + private static final Logger LOGGER = LoggerFactory.getLogger(MutationAttempt.class); + + public static ObjBase objForChangeComparison( + PolarisBaseEntity entity, + Optional<PolarisPrincipalSecrets> currentSecrets, + ObjBase originalObj) { + return mapToObj(entity, currentSecrets) + .updateTimestamp(originalObj.createTimestamp()) + .id(originalObj.id()) + .numParts(originalObj.numParts()) + .entityVersion(originalObj.entityVersion()) + .createTimestamp(originalObj.createTimestamp()) + .build(); + } + + public ChangeResult<MutationResults> apply() { + var mutationResults = newMutableMutationResults(); + for (var update : updates) { + LOGGER.debug("Processing update {}", update); + + switch (update.operation()) { + case CREATE -> + applyEntityCreateMutation( + updateKeyForCatalogAndEntityType, + state, + byName, + byId, + changes, + locations, + update, + mutationResults); + case UPDATE -> + applyEntityUpdateMutation( + updateKeyForCatalogAndEntityType, + state, + byName, + byId, + changes, + locations, + update, + mutationResults); + case DELETE -> + applyEntityDeleteMutation( + updateKeyForCatalogAndEntityType, + state, + byName, + byId, + changes, + locations, + update, + mutationResults, + memoizedIndexedAccess); + default -> throw new IllegalStateException("Unexpected operation " + update.operation()); + } + } + + var doCommit = mutationResults.anyChange && !mutationResults.hardFailure; + LOGGER.debug( + "{} changes (has changes: {}, failures: {})", + doCommit ? "Committing" : "Not committing", + mutationResults.anyChange, + mutationResults.failuresAsString()); + + return doCommit + ? new ChangeResult.CommitChange<>(mutationResults) + : new ChangeResult.NoChange<>(mutationResults); + } + + private static void applyEntityDeleteMutation( + UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType, + CommitterState<? extends ContainerObj, MutationResults> state, + UpdatableIndex<ObjRef> byName, + UpdatableIndex<IndexKey> byId, + UpdatableIndex<Change> changes, + UpdatableIndex<EntityIdSet> locations, + EntityUpdate update, + MutationResults mutationResults, + MemoizedIndexedAccess memoizedIndexedAccess) { + var entity = update.entity(); + var entityType = entity.getType(); + var persistence = state.persistence(); + + var entityIdKey = IndexKey.key(entity.getId()); + var originalNameKey = byId.get(entityIdKey); + + if (originalNameKey == null) { + mutationResults.dropResult(ENTITY_NOT_FOUND); + return; + } + var originalRef = byName.get(originalNameKey); + if (originalRef == null) { + mutationResults.dropResult(ENTITY_NOT_FOUND); + return; + } + var originalObj = + (ObjBase) + state + .persistence() + .fetch( + originalRef, + objTypeForPolarisType(entityType, entity.getSubType()).targetClass()); + if (originalObj == null) { + mutationResults.dropResult(ENTITY_NOT_FOUND); + return; + } + if (entity.getEntityVersion() != originalObj.entityVersion()) { + mutationResults.dropResult(TARGET_ENTITY_CONCURRENTLY_MODIFIED); + return; + } + if (entity.cannotBeDroppedOrRenamed()) { + mutationResults.dropResult(ENTITY_UNDROPPABLE); + return; + } + + updateLocationsIndex(locations, originalObj, null); + + var ok = + switch (entityType) { + case NAMESPACE -> { + if (hasChildren( + updateKeyForCatalogAndEntityType.catalogId(), byName, byId, entity.getId())) { + mutationResults.dropResult(NAMESPACE_NOT_EMPTY); + yield false; + } + yield true; + } + case CATALOG -> { + var catalogState = memoizedIndexedAccess.catalogContent(entity.getId()); + + if (catalogState.nameIndex().map(idx -> idx.iterator().hasNext()).orElse(false)) { + mutationResults.dropResult(NAMESPACE_NOT_EMPTY); + yield false; + } + + // VALIDATION LOGIC COPIED + + var catalogRolesAccess = + memoizedIndexedAccess.indexedAccess( + entity.getId(), PolarisEntityType.CATALOG_ROLE.getCode()); + var numCatalogRoles = + catalogRolesAccess + .nameIndex() + .map( + idx -> { + var iter = idx.iterator(); + var cnt = 0; + if (iter.hasNext()) { + iter.next(); + cnt++; + } + if (iter.hasNext()) { + iter.next(); + cnt++; + } + return cnt; + }) + .orElse(0); + + // If we have 2, we cannot drop the catalog. + // If only one left, better be the admin role + if (numCatalogRoles > 1) { + mutationResults.dropResult(CATALOG_NOT_EMPTY); + yield false; + } + // If 1, drop the last catalog role. + // Should be the catalog admin role, but don't validate this. + // (No need to drop the catalog role here, it'll be eventually done by + // persistence-maintenance!) + + yield true; + } + case POLICY -> + memoizedIndexedAccess + .referenceHead(POLICY_MAPPINGS_REF_NAME, PolicyMappingsObj.class) + .map( + policyMappingsObj -> { + var index = + policyMappingsObj + .policyMappings() + .indexForRead(persistence, POLICY_MAPPING_SERIALIZER); + + var prefixKey = policyIndexPrefixKey((PolicyObj) originalObj, entity); + + var iter = index.iterator(prefixKey, prefixKey, false); + + if (iter.hasNext() && !update.cleanup()) { + mutationResults.dropResult(POLICY_HAS_MAPPINGS); + return false; + } + + while (iter.hasNext()) { + var elem = iter.next(); + var key = PolicyMappingsObj.PolicyMappingKey.fromIndexKey(elem.getKey()); + var reversed = key.reverse(); + + mutationResults.addPolicyIndexKeyToRemove(elem.getKey()); + mutationResults.addPolicyIndexKeyToRemove(reversed.toIndexKey()); + } + + return true; + }) + .orElse(true); + default -> true; + }; + if (ok) { + byId.remove(entityIdKey); + byName.remove(requireNonNull(originalNameKey)); + mutationResults.dropResult(entity); + + if (changes != null) { + changes.put(originalNameKey, ChangeRemove.builder().build()); + } + } + } + + private static void applyEntityUpdateMutation( + UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType, + CommitterState<? extends ContainerObj, MutationResults> state, + UpdatableIndex<ObjRef> byName, + UpdatableIndex<IndexKey> byId, + UpdatableIndex<Change> changes, + UpdatableIndex<EntityIdSet> locations, + EntityUpdate update, + MutationResults mutationResults) { + var entity = update.entity(); + var entityType = entity.getType(); + var persistence = state.persistence(); + var now = persistence.currentInstant(); + + var entityIdKey = IndexKey.key(entity.getId()); + var originalNameKey = byId.get(entityIdKey); + + var entityParentId = entity.getParentId(); + + if (originalNameKey == null) { + mutationResults.entityResult(ENTITY_NOT_FOUND); + return; + } + var originalRef = byName.get(originalNameKey); + if (originalRef == null) { + mutationResults.entityResult(ENTITY_NOT_FOUND); + return; + } + var originalObj = + (ObjBase) + state + .persistence() + .fetch( + originalRef, + objTypeForPolarisType(entityType, entity.getSubType()).targetClass()); + if (originalObj == null) { + mutationResults.entityResult(ENTITY_NOT_FOUND); + return; + } + if (entity.getEntityVersion() != originalObj.entityVersion()) { + mutationResults.entityResult(TARGET_ENTITY_CONCURRENTLY_MODIFIED); + return; + } + + var currentSecrets = maybeObjToPolarisPrincipalSecrets(originalObj); + + var renameOrMove = + entityParentId != originalObj.parentStableId() + || !entity.getName().equals(originalObj.name()); + + if (renameOrMove) { + if (entity.cannotBeDroppedOrRenamed()) { + mutationResults.entityResult(ENTITY_CANNOT_BE_RENAMED); + return; + } + if (!byName.remove(originalNameKey)) { Review Comment: Does `byName` represent a fixed overall version for the duration of this method or is it a read-through to the "head" version of persistence? My understanding is that it's a fixed/snapshot that then collects updates to apply at the end, is that correct? As in, the "immutable object index" from your design doc? If so, would that mean that since we already validated `byName.get(originalNameKey)` on line 326 that the `remove` here *must* succeed, or else we have a code bug? In that case this should probably be an assert/IllegalStateException rather than returning `ENTITY_NOT_FOUND` because some assumptions must have gone wrong. Or if `byName` is indeed subject to concurrent removal from somewhere else please add some comments around here indicating that it's possible and maybe additional comments around any other accesses in this method to the index to help explain how we safely reconcile the changing state. ########## persistence/nosql/persistence/metastore/src/main/java/org/apache/polaris/persistence/nosql/metastore/mutation/MutationAttempt.java: ########## @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.persistence.nosql.metastore.mutation; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; +import static org.apache.polaris.core.entity.PolarisEntityConstants.ENTITY_BASE_LOCATION; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.CATALOG_NOT_EMPTY; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_CANNOT_BE_RENAMED; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_NOT_FOUND; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_UNDROPPABLE; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.NAMESPACE_NOT_EMPTY; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.POLICY_HAS_MAPPINGS; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.TARGET_ENTITY_CONCURRENTLY_MODIFIED; +import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef; +import static org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet.entityIdSet; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.entitySubTypeCodeFromObjType; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToEntity; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToObj; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.maybeObjToPolarisPrincipalSecrets; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.objTypeForPolarisType; +import static org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMapping.POLICY_MAPPING_SERIALIZER; +import static org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj.POLICY_MAPPINGS_REF_NAME; +import static org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.identifierFromLocationString; +import static org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.indexKeyToIdentifierBuilder; +import static org.apache.polaris.persistence.nosql.metastore.indexaccess.IndexUtils.hasChildren; +import static org.apache.polaris.persistence.nosql.metastore.mutation.MutationResults.newMutableMutationResults; + +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Consumer; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisEntityCore; +import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.entity.PolarisPrincipalSecrets; +import org.apache.polaris.core.persistence.dao.entity.BaseResult; +import org.apache.polaris.core.storage.StorageLocation; +import org.apache.polaris.persistence.nosql.api.commit.CommitterState; +import org.apache.polaris.persistence.nosql.api.index.IndexKey; +import org.apache.polaris.persistence.nosql.api.index.UpdatableIndex; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.apache.polaris.persistence.nosql.coretypes.ContainerObj; +import org.apache.polaris.persistence.nosql.coretypes.ObjBase; +import org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet; +import org.apache.polaris.persistence.nosql.coretypes.changes.Change; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeAdd; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRemove; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRename; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeUpdate; +import org.apache.polaris.persistence.nosql.coretypes.content.PolicyObj; +import org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj; +import org.apache.polaris.persistence.nosql.metastore.ContentIdentifier; +import org.apache.polaris.persistence.nosql.metastore.committers.ChangeResult; +import org.apache.polaris.persistence.nosql.metastore.indexaccess.MemoizedIndexedAccess; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public record MutationAttempt( + UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType, + List<EntityUpdate> updates, + CommitterState<? extends ContainerObj, MutationResults> state, + UpdatableIndex<ObjRef> byName, + UpdatableIndex<IndexKey> byId, + UpdatableIndex<Change> changes, + UpdatableIndex<EntityIdSet> locations, + MemoizedIndexedAccess memoizedIndexedAccess) { + + private static final Logger LOGGER = LoggerFactory.getLogger(MutationAttempt.class); + + public static ObjBase objForChangeComparison( Review Comment: Please add javadoc comment for this one explaining intent and how it intends to canonicalize whichever fields for comparison. Is this to detect cases where a *real* update actually happens to result in an entity whose full contents are unchanged, or is there a common concurrency situation that is expected to cause "no change" mutations (e.g. on some kind of retry?) ########## persistence/nosql/persistence/metastore/src/main/java/org/apache/polaris/persistence/nosql/metastore/mutation/MutationAttempt.java: ########## @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.persistence.nosql.metastore.mutation; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; +import static org.apache.polaris.core.entity.PolarisEntityConstants.ENTITY_BASE_LOCATION; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.CATALOG_NOT_EMPTY; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_CANNOT_BE_RENAMED; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_NOT_FOUND; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_UNDROPPABLE; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.NAMESPACE_NOT_EMPTY; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.POLICY_HAS_MAPPINGS; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.TARGET_ENTITY_CONCURRENTLY_MODIFIED; +import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef; +import static org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet.entityIdSet; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.entitySubTypeCodeFromObjType; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToEntity; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToObj; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.maybeObjToPolarisPrincipalSecrets; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.objTypeForPolarisType; +import static org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMapping.POLICY_MAPPING_SERIALIZER; +import static org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj.POLICY_MAPPINGS_REF_NAME; +import static org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.identifierFromLocationString; +import static org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.indexKeyToIdentifierBuilder; +import static org.apache.polaris.persistence.nosql.metastore.indexaccess.IndexUtils.hasChildren; +import static org.apache.polaris.persistence.nosql.metastore.mutation.MutationResults.newMutableMutationResults; + +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Consumer; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisEntityCore; +import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.entity.PolarisPrincipalSecrets; +import org.apache.polaris.core.persistence.dao.entity.BaseResult; +import org.apache.polaris.core.storage.StorageLocation; +import org.apache.polaris.persistence.nosql.api.commit.CommitterState; +import org.apache.polaris.persistence.nosql.api.index.IndexKey; +import org.apache.polaris.persistence.nosql.api.index.UpdatableIndex; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.apache.polaris.persistence.nosql.coretypes.ContainerObj; +import org.apache.polaris.persistence.nosql.coretypes.ObjBase; +import org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet; +import org.apache.polaris.persistence.nosql.coretypes.changes.Change; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeAdd; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRemove; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRename; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeUpdate; +import org.apache.polaris.persistence.nosql.coretypes.content.PolicyObj; +import org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj; +import org.apache.polaris.persistence.nosql.metastore.ContentIdentifier; +import org.apache.polaris.persistence.nosql.metastore.committers.ChangeResult; +import org.apache.polaris.persistence.nosql.metastore.indexaccess.MemoizedIndexedAccess; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public record MutationAttempt( + UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType, + List<EntityUpdate> updates, + CommitterState<? extends ContainerObj, MutationResults> state, + UpdatableIndex<ObjRef> byName, + UpdatableIndex<IndexKey> byId, + UpdatableIndex<Change> changes, + UpdatableIndex<EntityIdSet> locations, + MemoizedIndexedAccess memoizedIndexedAccess) { + + private static final Logger LOGGER = LoggerFactory.getLogger(MutationAttempt.class); + + public static ObjBase objForChangeComparison( + PolarisBaseEntity entity, + Optional<PolarisPrincipalSecrets> currentSecrets, + ObjBase originalObj) { + return mapToObj(entity, currentSecrets) + .updateTimestamp(originalObj.createTimestamp()) + .id(originalObj.id()) + .numParts(originalObj.numParts()) + .entityVersion(originalObj.entityVersion()) + .createTimestamp(originalObj.createTimestamp()) + .build(); + } + + public ChangeResult<MutationResults> apply() { + var mutationResults = newMutableMutationResults(); + for (var update : updates) { + LOGGER.debug("Processing update {}", update); + + switch (update.operation()) { + case CREATE -> + applyEntityCreateMutation( + updateKeyForCatalogAndEntityType, + state, + byName, + byId, + changes, + locations, + update, + mutationResults); + case UPDATE -> + applyEntityUpdateMutation( + updateKeyForCatalogAndEntityType, + state, + byName, + byId, + changes, + locations, + update, + mutationResults); + case DELETE -> + applyEntityDeleteMutation( + updateKeyForCatalogAndEntityType, + state, + byName, + byId, + changes, + locations, + update, + mutationResults, + memoizedIndexedAccess); + default -> throw new IllegalStateException("Unexpected operation " + update.operation()); + } + } + + var doCommit = mutationResults.anyChange && !mutationResults.hardFailure; + LOGGER.debug( + "{} changes (has changes: {}, failures: {})", + doCommit ? "Committing" : "Not committing", + mutationResults.anyChange, + mutationResults.failuresAsString()); + + return doCommit + ? new ChangeResult.CommitChange<>(mutationResults) + : new ChangeResult.NoChange<>(mutationResults); + } + + private static void applyEntityDeleteMutation( + UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType, + CommitterState<? extends ContainerObj, MutationResults> state, + UpdatableIndex<ObjRef> byName, + UpdatableIndex<IndexKey> byId, + UpdatableIndex<Change> changes, + UpdatableIndex<EntityIdSet> locations, + EntityUpdate update, + MutationResults mutationResults, + MemoizedIndexedAccess memoizedIndexedAccess) { + var entity = update.entity(); + var entityType = entity.getType(); + var persistence = state.persistence(); + + var entityIdKey = IndexKey.key(entity.getId()); + var originalNameKey = byId.get(entityIdKey); + + if (originalNameKey == null) { + mutationResults.dropResult(ENTITY_NOT_FOUND); + return; + } + var originalRef = byName.get(originalNameKey); + if (originalRef == null) { + mutationResults.dropResult(ENTITY_NOT_FOUND); + return; + } + var originalObj = + (ObjBase) + state + .persistence() + .fetch( + originalRef, + objTypeForPolarisType(entityType, entity.getSubType()).targetClass()); + if (originalObj == null) { + mutationResults.dropResult(ENTITY_NOT_FOUND); + return; + } + if (entity.getEntityVersion() != originalObj.entityVersion()) { + mutationResults.dropResult(TARGET_ENTITY_CONCURRENTLY_MODIFIED); + return; + } + if (entity.cannotBeDroppedOrRenamed()) { + mutationResults.dropResult(ENTITY_UNDROPPABLE); + return; + } + + updateLocationsIndex(locations, originalObj, null); + + var ok = + switch (entityType) { + case NAMESPACE -> { + if (hasChildren( + updateKeyForCatalogAndEntityType.catalogId(), byName, byId, entity.getId())) { + mutationResults.dropResult(NAMESPACE_NOT_EMPTY); + yield false; + } + yield true; + } + case CATALOG -> { + var catalogState = memoizedIndexedAccess.catalogContent(entity.getId()); + + if (catalogState.nameIndex().map(idx -> idx.iterator().hasNext()).orElse(false)) { + mutationResults.dropResult(NAMESPACE_NOT_EMPTY); + yield false; + } + + // VALIDATION LOGIC COPIED + + var catalogRolesAccess = + memoizedIndexedAccess.indexedAccess( + entity.getId(), PolarisEntityType.CATALOG_ROLE.getCode()); + var numCatalogRoles = + catalogRolesAccess + .nameIndex() + .map( + idx -> { + var iter = idx.iterator(); + var cnt = 0; + if (iter.hasNext()) { + iter.next(); + cnt++; + } + if (iter.hasNext()) { + iter.next(); + cnt++; + } + return cnt; + }) + .orElse(0); + + // If we have 2, we cannot drop the catalog. + // If only one left, better be the admin role + if (numCatalogRoles > 1) { + mutationResults.dropResult(CATALOG_NOT_EMPTY); + yield false; + } + // If 1, drop the last catalog role. + // Should be the catalog admin role, but don't validate this. + // (No need to drop the catalog role here, it'll be eventually done by + // persistence-maintenance!) + + yield true; + } + case POLICY -> + memoizedIndexedAccess + .referenceHead(POLICY_MAPPINGS_REF_NAME, PolicyMappingsObj.class) + .map( + policyMappingsObj -> { + var index = + policyMappingsObj + .policyMappings() + .indexForRead(persistence, POLICY_MAPPING_SERIALIZER); + + var prefixKey = policyIndexPrefixKey((PolicyObj) originalObj, entity); + + var iter = index.iterator(prefixKey, prefixKey, false); + + if (iter.hasNext() && !update.cleanup()) { + mutationResults.dropResult(POLICY_HAS_MAPPINGS); + return false; + } + + while (iter.hasNext()) { + var elem = iter.next(); + var key = PolicyMappingsObj.PolicyMappingKey.fromIndexKey(elem.getKey()); + var reversed = key.reverse(); + + mutationResults.addPolicyIndexKeyToRemove(elem.getKey()); + mutationResults.addPolicyIndexKeyToRemove(reversed.toIndexKey()); + } + + return true; + }) + .orElse(true); + default -> true; + }; + if (ok) { + byId.remove(entityIdKey); + byName.remove(requireNonNull(originalNameKey)); + mutationResults.dropResult(entity); + + if (changes != null) { + changes.put(originalNameKey, ChangeRemove.builder().build()); + } + } + } + + private static void applyEntityUpdateMutation( + UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType, + CommitterState<? extends ContainerObj, MutationResults> state, + UpdatableIndex<ObjRef> byName, + UpdatableIndex<IndexKey> byId, + UpdatableIndex<Change> changes, + UpdatableIndex<EntityIdSet> locations, + EntityUpdate update, + MutationResults mutationResults) { + var entity = update.entity(); + var entityType = entity.getType(); + var persistence = state.persistence(); + var now = persistence.currentInstant(); + + var entityIdKey = IndexKey.key(entity.getId()); + var originalNameKey = byId.get(entityIdKey); + + var entityParentId = entity.getParentId(); + + if (originalNameKey == null) { + mutationResults.entityResult(ENTITY_NOT_FOUND); + return; + } + var originalRef = byName.get(originalNameKey); Review Comment: What are the scenarios that can lead to this `byName` not having this `originalNameKey` that we just got from `byId`? Would it indicate `byId` and `byName` getting out of sync somehow? ########## persistence/nosql/persistence/metastore/src/main/java/org/apache/polaris/persistence/nosql/metastore/mutation/MutationAttempt.java: ########## @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.persistence.nosql.metastore.mutation; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; +import static org.apache.polaris.core.entity.PolarisEntityConstants.ENTITY_BASE_LOCATION; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.CATALOG_NOT_EMPTY; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_CANNOT_BE_RENAMED; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_NOT_FOUND; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.ENTITY_UNDROPPABLE; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.NAMESPACE_NOT_EMPTY; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.POLICY_HAS_MAPPINGS; +import static org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.TARGET_ENTITY_CONCURRENTLY_MODIFIED; +import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef; +import static org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet.entityIdSet; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.entitySubTypeCodeFromObjType; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToEntity; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.mapToObj; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.maybeObjToPolarisPrincipalSecrets; +import static org.apache.polaris.persistence.nosql.coretypes.mapping.EntityObjMappings.objTypeForPolarisType; +import static org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMapping.POLICY_MAPPING_SERIALIZER; +import static org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj.POLICY_MAPPINGS_REF_NAME; +import static org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.identifierFromLocationString; +import static org.apache.polaris.persistence.nosql.metastore.ContentIdentifier.indexKeyToIdentifierBuilder; +import static org.apache.polaris.persistence.nosql.metastore.indexaccess.IndexUtils.hasChildren; +import static org.apache.polaris.persistence.nosql.metastore.mutation.MutationResults.newMutableMutationResults; + +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Consumer; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisEntityCore; +import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.entity.PolarisPrincipalSecrets; +import org.apache.polaris.core.persistence.dao.entity.BaseResult; +import org.apache.polaris.core.storage.StorageLocation; +import org.apache.polaris.persistence.nosql.api.commit.CommitterState; +import org.apache.polaris.persistence.nosql.api.index.IndexKey; +import org.apache.polaris.persistence.nosql.api.index.UpdatableIndex; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.apache.polaris.persistence.nosql.coretypes.ContainerObj; +import org.apache.polaris.persistence.nosql.coretypes.ObjBase; +import org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet; +import org.apache.polaris.persistence.nosql.coretypes.changes.Change; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeAdd; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRemove; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeRename; +import org.apache.polaris.persistence.nosql.coretypes.changes.ChangeUpdate; +import org.apache.polaris.persistence.nosql.coretypes.content.PolicyObj; +import org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj; +import org.apache.polaris.persistence.nosql.metastore.ContentIdentifier; +import org.apache.polaris.persistence.nosql.metastore.committers.ChangeResult; +import org.apache.polaris.persistence.nosql.metastore.indexaccess.MemoizedIndexedAccess; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public record MutationAttempt( + UpdateKeyForCatalogAndEntityType updateKeyForCatalogAndEntityType, + List<EntityUpdate> updates, + CommitterState<? extends ContainerObj, MutationResults> state, + UpdatableIndex<ObjRef> byName, + UpdatableIndex<IndexKey> byId, + UpdatableIndex<Change> changes, + UpdatableIndex<EntityIdSet> locations, + MemoizedIndexedAccess memoizedIndexedAccess) { + + private static final Logger LOGGER = LoggerFactory.getLogger(MutationAttempt.class); + + public static ObjBase objForChangeComparison( + PolarisBaseEntity entity, + Optional<PolarisPrincipalSecrets> currentSecrets, + ObjBase originalObj) { + return mapToObj(entity, currentSecrets) + .updateTimestamp(originalObj.createTimestamp()) Review Comment: Is this intentional to use `originalObj.createTimestamp` as the `updateTimestamp` for the change-comparison object? If so, definitely needs some extensive comments explaining why. If the intent is to canonicalize `updateTimestamp` to not be relevant for semantic comparison, it might be better to set it to 0L or something to indicate intentional/obvious missing values rather than having comparison objects floating around that have the `createTimestamp` written in. ########## persistence/nosql/persistence/metastore/src/main/java/org/apache/polaris/persistence/nosql/metastore/ContentIdentifier.java: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.metastore; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import java.util.ArrayList; +import java.util.List; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.polaris.immutables.PolarisImmutable; +import org.apache.polaris.persistence.nosql.api.index.IndexKey; +import org.apache.polaris.service.types.PolicyIdentifier; +import org.immutables.value.Value; + [email protected](underrideToString = "asDotDelimitedString") +@PolarisImmutable +public interface ContentIdentifier { + @Value.Parameter + @JsonValue + List<String> elements(); + + static ContentIdentifier identifier(List<String> elements) { + return ImmutableContentIdentifier.of(elements); + } + + static ContentIdentifier identifier(String[] namespace, String name) { + return ImmutableContentIdentifier.builder().addElements(namespace).addElements(name).build(); + } + + static ContentIdentifier identifier(String... elements) { + return ImmutableContentIdentifier.of(List.of(elements)); + } + + static ContentIdentifier identifierFor(PolicyIdentifier identifier) { + return identifier(identifier.getNamespace().levels(), identifier.getName()); + } + + static ContentIdentifier identifierFor(TableIdentifier identifier) { + return identifier(identifier.namespace().levels(), identifier.name()); + } + + static ContentIdentifier identifierFor(Namespace namespace) { + return identifier(namespace.levels()); + } + + static ContentIdentifier identifierFromLocationString(String locationString) { + var builder = builder(); + var len = locationString.length(); + var off = -1; + for (var i = 0; i < len; i++) { + var c = locationString.charAt(i); + checkArgument(c >= ' ', "Control characters are forbidden in locations"); + if (c == '/' || c == '\\') { Review Comment: That raises an interesting point - we'd ideally be able to incorporate storage-type-specific canonicalization logic. In this code for example, presumably the `\\` is specifically "windows local filesystem" use cases (which really should only be very limited test/dev scenarios where overlap-checks probably aren't relevant anyways; any "sensitive" production use cases would be really weird to run on a single-node Windows environment with engine and Polaris catalog in the same place, or else some NFS mount). But for a nio Filesystem, it's actually *correct* to canonicalize with real filesystem syntax, such as collapsing "foo/./././././bar" into "foo/bar" or "foo/bar/.." becoming "foo" when it comes to "overlap detection" because overlap detection is precisely supposed to prevent security issues by sharing directories. So it becomes important to avoid creative syntax circumventing our check. And yet, as mentioned before, the "local filesystem" use cases where paths have fancy syntactic interpretations also happen to be the cases where overlap-check probably doesn't actually matter. I guess it's fine to leave as-is for now, but custom parsing of paths is probably not something we want to keep as ad-hoc inline logic here in the long-term. It might be worth adding a comment or filing a github issue to call out that this logic isn't necessarily authoritative for any future developers who might read it as gospel. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
