This is an automated email from the ASF dual-hosted git repository. dhuo pushed a commit to branch persistence-poc in repository https://gitbox.apache.org/repos/asf/polaris.git
commit b739fd1a70a198951e659d40b025651773528271 Author: Dennis Huo <[email protected]> AuthorDate: Wed Feb 26 08:19:38 2025 +0000 Add originalEntity to the writeEntity method to enable compare-and-swap behavior from the underlying BasePersistence. Pushdown all the deleteFromEntities* methods into PolarisMetaStoreSession and add deleteEntity to BasePersistence which encapsulates handling the separate slices. --- .../polaris/core/persistence/BasePersistence.java | 11 +++-- .../persistence/PolarisMetaStoreManagerImpl.java | 48 +++++++++++++--------- .../core/persistence/PolarisMetaStoreSession.java | 32 +++++++++++++-- 3 files changed, 64 insertions(+), 27 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java index 0e9e2fab..11810331 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java @@ -57,11 +57,14 @@ public interface BasePersistence { * @param callCtx call context * @param entity entity to persist * @param nameOrParentChanged if true, also write it to by-name lookups if applicable + * @param originalEntity original state of the entity to use for compare-and-swap purposes, or + * null if this is expected to be a brand-new entity */ void writeEntity( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity, - boolean nameOrParentChanged); + boolean nameOrParentChanged, + @Nullable PolarisBaseEntity originalEntity); /** * Write the specified grantRecord to the grant_records table. If there is a conflict (existing @@ -75,12 +78,12 @@ public interface BasePersistence { @Nonnull PolarisCallContext callCtx, @Nonnull PolarisGrantRecord grantRec); /** - * Delete the base entity from the entities table. + * Delete this entity from the meta store. * * @param callCtx call context - * @param entity entity record to delete + * @param entity entity to delete */ - void deleteFromEntities(@Nonnull PolarisCallContext callCtx, @Nonnull PolarisEntityCore entity); + void deleteEntity(@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity); /** * Delete the specified grantRecord to the grant_records table. diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManagerImpl.java index cafb828c..8fa1b185 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManagerImpl.java @@ -129,7 +129,7 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { entity.setToPurgeTimestamp(0); // write it - ms.writeEntity(callCtx, entity, true); + ms.writeEntity(callCtx, entity, true, null); } /** @@ -141,13 +141,15 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { * @param ms meta store * @param entity the entity which has been changed * @param nameOrParentChanged indicates if parent or name changed + * @param originalEntity the original state of the entity before changes * @return the entity with its version and lastUpdateTimestamp updated */ private @Nonnull PolarisBaseEntity persistEntityAfterChange( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisMetaStoreSession ms, @Nonnull PolarisBaseEntity entity, - boolean nameOrParentChanged) { + boolean nameOrParentChanged, + @Nonnull PolarisBaseEntity originalEntity) { // validate the entity type and subtype callCtx.getDiagServices().checkNotNull(entity, "unexpected_null_entity"); @@ -190,7 +192,7 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { entity.setEntityVersion(entity.getEntityVersion() + 1); // persist it to the various slices - ms.writeEntity(callCtx, entity, nameOrParentChanged); + ms.writeEntity(callCtx, entity, nameOrParentChanged, originalEntity); // return it return entity; @@ -222,9 +224,6 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { // creation timestamp must be filled callCtx.getDiagServices().check(entity.getDropTimestamp() == 0, "already_dropped"); - // delete it from active slice - ms.deleteFromEntitiesActive(callCtx, entity); - // for now drop all associated grants, etc. synchronously // delete ALL grant records to (if the entity is a grantee) and from that entity final List<PolarisGrantRecord> grantsOnGrantee = @@ -252,13 +251,13 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { List<PolarisBaseEntity> entities = ms.lookupEntities(callCtx, new ArrayList<>(entityIdsGrantChanged)); for (PolarisBaseEntity entityGrantChanged : entities) { + PolarisBaseEntity originalEntity = new PolarisBaseEntity(entityGrantChanged); entityGrantChanged.setGrantRecordsVersion(entityGrantChanged.getGrantRecordsVersion() + 1); - ms.writeEntity(callCtx, entityGrantChanged, false); + ms.writeEntity(callCtx, entityGrantChanged, false, originalEntity); } // remove the entity being dropped now - ms.deleteFromEntities(callCtx, entity); - ms.deleteFromEntitiesChangeTracking(callCtx, entity); + ms.deleteEntity(callCtx, entity); // if it is a principal, we also need to drop the secrets if (entity.getType() == PolarisEntityType.PRINCIPAL) { @@ -321,10 +320,11 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { callCtx .getDiagServices() .checkNotNull(granteeEntity, "grantee_not_found", "grantee={}", grantee); + PolarisBaseEntity originalGranteeEntity = new PolarisBaseEntity(granteeEntity); // grants have changed, we need to bump-up the grants version granteeEntity.setGrantRecordsVersion(granteeEntity.getGrantRecordsVersion() + 1); - ms.writeEntity(callCtx, granteeEntity, false); + ms.writeEntity(callCtx, granteeEntity, false, originalGranteeEntity); // we also need to invalidate the grants on that securable so that we can reload them. // load the securable and increment its grants version @@ -333,10 +333,11 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { callCtx .getDiagServices() .checkNotNull(securableEntity, "securable_not_found", "securable={}", securable); + PolarisBaseEntity originalSecurableEntity = new PolarisBaseEntity(securableEntity); // grants have changed, we need to bump-up the grants version securableEntity.setGrantRecordsVersion(securableEntity.getGrantRecordsVersion() + 1); - ms.writeEntity(callCtx, securableEntity, false); + ms.writeEntity(callCtx, securableEntity, false, originalSecurableEntity); // done, return the new grant record return grantRecord; @@ -396,10 +397,11 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { .getDiagServices() .checkNotNull( refreshGrantee, "missing_grantee", "grantRecord={} grantee={}", grantRecord, grantee); + PolarisBaseEntity originalRefreshGrantee = new PolarisBaseEntity(refreshGrantee); // grants have changed, we need to bump-up the grants version refreshGrantee.setGrantRecordsVersion(refreshGrantee.getGrantRecordsVersion() + 1); - ms.writeEntity(callCtx, refreshGrantee, false); + ms.writeEntity(callCtx, refreshGrantee, false, originalRefreshGrantee); // we also need to invalidate the grants on that securable so that we can reload them. // load the securable and increment its grants version @@ -413,10 +415,11 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { "grantRecord={} securable={}", grantRecord, securable); + PolarisBaseEntity originalRefreshSecurable = new PolarisBaseEntity(refreshSecurable); // grants have changed, we need to bump-up the grants version refreshSecurable.setGrantRecordsVersion(refreshSecurable.getGrantRecordsVersion() + 1); - ms.writeEntity(callCtx, refreshSecurable, false); + ms.writeEntity(callCtx, refreshSecurable, false, originalRefreshSecurable); } /** @@ -942,6 +945,7 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { } PolarisBaseEntity principal = loadEntityResult.getEntity(); + PolarisBaseEntity originalPrincipal = new PolarisBaseEntity(principal); Map<String, String> internalProps = PolarisObjectMapperUtil.deserializeProperties( callCtx, @@ -963,14 +967,14 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { principal.setInternalProperties( PolarisObjectMapperUtil.serializeProperties(callCtx, internalProps)); principal.setEntityVersion(principal.getEntityVersion() + 1); - ms.writeEntity(callCtx, principal, true); + ms.writeEntity(callCtx, principal, true, originalPrincipal); } else if (internalProps.containsKey( PolarisEntityConstants.PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_STATE)) { internalProps.remove(PolarisEntityConstants.PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_STATE); principal.setInternalProperties( PolarisObjectMapperUtil.serializeProperties(callCtx, internalProps)); principal.setEntityVersion(principal.getEntityVersion() + 1); - ms.writeEntity(callCtx, principal, true); + ms.writeEntity(callCtx, principal, true, originalPrincipal); } return secrets; } @@ -1153,6 +1157,8 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { return new EntityResult(BaseResult.ReturnStatus.TARGET_ENTITY_CONCURRENTLY_MODIFIED, null); } + PolarisBaseEntity originalEntity = new PolarisBaseEntity(entityRefreshed); + // update the two properties entityRefreshed.setInternalProperties(entity.getInternalProperties()); entityRefreshed.setProperties(entity.getProperties()); @@ -1160,7 +1166,7 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { // persist this entity after changing it. This will update the version and update the last // updated time. Because the entity version is changed, we will update the change tracking table PolarisBaseEntity persistedEntity = - this.persistEntityAfterChange(callCtx, ms, entityRefreshed, false); + this.persistEntityAfterChange(callCtx, ms, entityRefreshed, false, originalEntity); return new EntityResult(persistedEntity); } @@ -1305,8 +1311,9 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS, entityActiveRecord.getSubTypeCode()); } - // all good, delete the existing entity from the active slice - ms.deleteFromEntitiesActive(callCtx, refreshEntityToRename); + // Create a copy of the original before we change its fields so that we can pass in the + // old version for the persistence layer to work out whether to unlink previous name-lookups + PolarisBaseEntity originalEntity = new PolarisBaseEntity(refreshEntityToRename); // change its name now refreshEntityToRename.setName(renamedEntity.getName()); @@ -1322,7 +1329,7 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { // version. Indicate that the nameOrParent changed, so so that we also update any by-name // lookups if applicable PolarisBaseEntity renamedEntityToReturn = - this.persistEntityAfterChange(callCtx, ms, refreshEntityToRename, true); + this.persistEntityAfterChange(callCtx, ms, refreshEntityToRename, true, originalEntity); return new EntityResult(renamedEntityToReturn); } @@ -1961,6 +1968,7 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { availableTasks.forEach( task -> { + PolarisBaseEntity originalTask = new PolarisBaseEntity(task); Map<String, String> properties = PolarisObjectMapperUtil.deserializeProperties(callCtx, task.getProperties()); properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId); @@ -1974,7 +1982,7 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { + 1)); task.setEntityVersion(task.getEntityVersion() + 1); task.setProperties(PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); - ms.writeEntity(callCtx, task, false); + ms.writeEntity(callCtx, task, false, originalTask); }); return new EntitiesResult(availableTasks); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreSession.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreSession.java index 9eca18df..f18b57ae 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreSession.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreSession.java @@ -157,11 +157,20 @@ public abstract class PolarisMetaStoreSession implements BasePersistence { public void writeEntity( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity, - boolean nameOrParentChanged) { + boolean nameOrParentChanged, + @Nullable PolarisBaseEntity originalEntity) { + // TODO: Pull down relevant compare-and-swap semantics from PolarisMetaStoreManagerImpl + // into this layer. writeToEntities(callCtx, entity); writeToEntitiesChangeTracking(callCtx, entity); if (nameOrParentChanged) { + if (originalEntity != null) { + // In our case, rename isn't automatically handled when the main "entities" slice + // is updated; instead we must explicitly remove from the old entitiesActive + // key as well. + deleteFromEntitiesActive(callCtx, originalEntity); + } writeToEntitiesActive(callCtx, entity); } } @@ -199,13 +208,30 @@ public abstract class PolarisMetaStoreSession implements BasePersistence { protected abstract void writeToEntitiesChangeTracking( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity); + /** {@inheritDoc} */ + @Override + public void deleteEntity(@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) { + deleteFromEntitiesActive(callCtx, entity); + deleteFromEntities(callCtx, entity); + deleteFromEntitiesChangeTracking(callCtx, entity); + } + + /** + * Delete the base entity from the entities table. + * + * @param callCtx call context + * @param entity entity record to delete + */ + protected abstract void deleteFromEntities( + @Nonnull PolarisCallContext callCtx, @Nonnull PolarisEntityCore entity); + /** * Delete the base entity from the entities_active table. * * @param callCtx call context * @param entity entity record to delete */ - public abstract void deleteFromEntitiesActive( + protected abstract void deleteFromEntitiesActive( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisEntityCore entity); /** @@ -214,7 +240,7 @@ public abstract class PolarisMetaStoreSession implements BasePersistence { * @param callCtx call context * @param entity entity record to delete */ - public abstract void deleteFromEntitiesChangeTracking( + protected abstract void deleteFromEntitiesChangeTracking( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisEntityCore entity); /** Rollback the current transaction */
