This is an automated email from the ASF dual-hosted git repository. dhuo pushed a commit to branch persistence-poc in repository https://gitbox.apache.org/repos/asf/polaris.git
commit 5743dd260d0c8b659e2a2a48032138b5f642a522 Author: Dennis Huo <[email protected]> AuthorDate: Wed Feb 26 07:39:36 2025 +0000 Pushdown all calls to writeToEntities into PolarisMetaStoreSession, and add writeEntity method to BasePersistence, with a default impl in PolarisMetaStoreSession containing what was previously in PolarisMetaStoreManagerImpl. This now protects all writes in PolarisMetaStoreManagerImpl from dealing with the three-table implementation detail. Technically slightly changes the ordering of updates within a transaction for renameEntity, but is arguably a more correct ordering, and the ordering doesn't interleave reads anyways. --- .../polaris/core/persistence/BasePersistence.java | 12 +++-- .../persistence/PolarisMetaStoreManagerImpl.java | 59 +++++++--------------- .../core/persistence/PolarisMetaStoreSession.java | 33 +++++++++++- 3 files changed, 57 insertions(+), 47 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java index 9ac8268b..0e9e2fab 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java @@ -52,14 +52,16 @@ public interface BasePersistence { long generateNewId(@Nonnull PolarisCallContext callCtx); /** - * Write the base entity to the entities table. If there is a conflict (existing record with the - * same id), all attributes of the new record will replace the existing one. + * Write this entity to the meta store. * * @param callCtx call context - * @param entity entity record to write, potentially replacing an existing entity record with the - * same key + * @param entity entity to persist + * @param nameOrParentChanged if true, also write it to by-name lookups if applicable */ - void writeToEntities(@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity); + void writeEntity( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisBaseEntity entity, + boolean nameOrParentChanged); /** * Write the specified grantRecord to the grant_records table. If there is a conflict (existing diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManagerImpl.java index 9fa92514..cafb828c 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManagerImpl.java @@ -68,27 +68,6 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { /** mapper, allows to serialize/deserialize properties to/from JSON */ private static final ObjectMapper MAPPER = new ObjectMapper(); - /** - * Write this entity to the meta store. - * - * @param callCtx call context - * @param ms meta store in read/write mode - * @param entity entity to persist - * @param writeToActive if true, write it to active - */ - private void writeEntity( - @Nonnull PolarisCallContext callCtx, - @Nonnull PolarisMetaStoreSession ms, - @Nonnull PolarisBaseEntity entity, - boolean writeToActive) { - ms.writeToEntities(callCtx, entity); - ms.writeToEntitiesChangeTracking(callCtx, entity); - - if (writeToActive) { - ms.writeToEntitiesActive(callCtx, entity); - } - } - /** * Persist the specified new entity. Persist will write this entity in the ENTITIES, in the * ENTITIES_ACTIVE and finally in the ENTITIES_CHANGE_TRACKING tables @@ -150,7 +129,7 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { entity.setToPurgeTimestamp(0); // write it - this.writeEntity(callCtx, ms, entity, true); + ms.writeEntity(callCtx, entity, true); } /** @@ -161,12 +140,14 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { * @param callCtx call context * @param ms meta store * @param entity the entity which has been changed + * @param nameOrParentChanged indicates if parent or name changed * @return the entity with its version and lastUpdateTimestamp updated */ private @Nonnull PolarisBaseEntity persistEntityAfterChange( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisMetaStoreSession ms, - @Nonnull PolarisBaseEntity entity) { + @Nonnull PolarisBaseEntity entity, + boolean nameOrParentChanged) { // validate the entity type and subtype callCtx.getDiagServices().checkNotNull(entity, "unexpected_null_entity"); @@ -209,7 +190,7 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { entity.setEntityVersion(entity.getEntityVersion() + 1); // persist it to the various slices - this.writeEntity(callCtx, ms, entity, false); + ms.writeEntity(callCtx, entity, nameOrParentChanged); // return it return entity; @@ -272,8 +253,7 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { ms.lookupEntities(callCtx, new ArrayList<>(entityIdsGrantChanged)); for (PolarisBaseEntity entityGrantChanged : entities) { entityGrantChanged.setGrantRecordsVersion(entityGrantChanged.getGrantRecordsVersion() + 1); - ms.writeToEntities(callCtx, entityGrantChanged); - ms.writeToEntitiesChangeTracking(callCtx, entityGrantChanged); + ms.writeEntity(callCtx, entityGrantChanged, false); } // remove the entity being dropped now @@ -344,7 +324,7 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { // grants have changed, we need to bump-up the grants version granteeEntity.setGrantRecordsVersion(granteeEntity.getGrantRecordsVersion() + 1); - this.writeEntity(callCtx, ms, granteeEntity, false); + ms.writeEntity(callCtx, granteeEntity, false); // we also need to invalidate the grants on that securable so that we can reload them. // load the securable and increment its grants version @@ -356,7 +336,7 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { // grants have changed, we need to bump-up the grants version securableEntity.setGrantRecordsVersion(securableEntity.getGrantRecordsVersion() + 1); - this.writeEntity(callCtx, ms, securableEntity, false); + ms.writeEntity(callCtx, securableEntity, false); // done, return the new grant record return grantRecord; @@ -419,7 +399,7 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { // grants have changed, we need to bump-up the grants version refreshGrantee.setGrantRecordsVersion(refreshGrantee.getGrantRecordsVersion() + 1); - this.writeEntity(callCtx, ms, refreshGrantee, false); + ms.writeEntity(callCtx, refreshGrantee, false); // we also need to invalidate the grants on that securable so that we can reload them. // load the securable and increment its grants version @@ -436,7 +416,7 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { // grants have changed, we need to bump-up the grants version refreshSecurable.setGrantRecordsVersion(refreshSecurable.getGrantRecordsVersion() + 1); - this.writeEntity(callCtx, ms, refreshSecurable, false); + ms.writeEntity(callCtx, refreshSecurable, false); } /** @@ -983,14 +963,14 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { principal.setInternalProperties( PolarisObjectMapperUtil.serializeProperties(callCtx, internalProps)); principal.setEntityVersion(principal.getEntityVersion() + 1); - writeEntity(callCtx, ms, principal, true); + ms.writeEntity(callCtx, principal, true); } else if (internalProps.containsKey( PolarisEntityConstants.PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_STATE)) { internalProps.remove(PolarisEntityConstants.PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_STATE); principal.setInternalProperties( PolarisObjectMapperUtil.serializeProperties(callCtx, internalProps)); principal.setEntityVersion(principal.getEntityVersion() + 1); - writeEntity(callCtx, ms, principal, true); + ms.writeEntity(callCtx, principal, true); } return secrets; } @@ -1179,7 +1159,8 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { // persist this entity after changing it. This will update the version and update the last // updated time. Because the entity version is changed, we will update the change tracking table - PolarisBaseEntity persistedEntity = this.persistEntityAfterChange(callCtx, ms, entityRefreshed); + PolarisBaseEntity persistedEntity = + this.persistEntityAfterChange(callCtx, ms, entityRefreshed, false); return new EntityResult(persistedEntity); } @@ -1337,13 +1318,11 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { refreshEntityToRename.setParentId(resolver.getParentId()); } - // persist back to the active slice with its new name and parent - ms.writeToEntitiesActive(callCtx, refreshEntityToRename); - - // persist the entity after change. This wil update the lastUpdateTimestamp and bump up the - // version + // persist the entity after change. This will update the lastUpdateTimestamp and bump up the + // version. Indicate that the nameOrParent changed, so so that we also update any by-name + // lookups if applicable PolarisBaseEntity renamedEntityToReturn = - this.persistEntityAfterChange(callCtx, ms, refreshEntityToRename); + this.persistEntityAfterChange(callCtx, ms, refreshEntityToRename, true); return new EntityResult(renamedEntityToReturn); } @@ -1995,7 +1974,7 @@ public class PolarisMetaStoreManagerImpl extends BaseMetaStoreManager { + 1)); task.setEntityVersion(task.getEntityVersion() + 1); task.setProperties(PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); - writeEntity(callCtx, ms, task, false); + ms.writeEntity(callCtx, task, false); }); return new EntitiesResult(availableTasks); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreSession.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreSession.java index 883ac0f2..9eca18df 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreSession.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreSession.java @@ -91,6 +91,10 @@ public abstract class PolarisMetaStoreSession implements BasePersistence { long parentId, int typeCode, @Nonnull String name) { + // TODO: Consistently pull down the runInTransaction logic without running into conflicting + // nested transactions into here so that instead of having the caller be responsible for + // initiating the runInReadTransaction, we make this method call inherently safe to do + // the two-phase lookup. PolarisEntitiesActiveKey entityActiveKey = new PolarisEntitiesActiveKey(catalogId, parentId, typeCode, name); @@ -148,6 +152,31 @@ public abstract class PolarisMetaStoreSession implements BasePersistence { public abstract List<PolarisEntityActiveRecord> lookupEntityActiveBatch( @Nonnull PolarisCallContext callCtx, List<PolarisEntitiesActiveKey> entityActiveKeys); + /** {@inheritDoc} */ + @Override + public void writeEntity( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisBaseEntity entity, + boolean nameOrParentChanged) { + writeToEntities(callCtx, entity); + writeToEntitiesChangeTracking(callCtx, entity); + + if (nameOrParentChanged) { + writeToEntitiesActive(callCtx, entity); + } + } + + /** + * Write the base entity to the entities table. If there is a conflict (existing record with the + * same id), all attributes of the new record will replace the existing one. + * + * @param callCtx call context + * @param entity entity record to write, potentially replacing an existing entity record with the + * same key + */ + protected abstract void writeToEntities( + @Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity); + /** * Write the base entity to the entities_active table. If there is a conflict (existing record * with the same PK), all attributes of the new record will replace the existing one. @@ -156,7 +185,7 @@ public abstract class PolarisMetaStoreSession implements BasePersistence { * @param entity entity record to write, potentially replacing an existing entity record with the * same key */ - public abstract void writeToEntitiesActive( + protected abstract void writeToEntitiesActive( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity); /** @@ -167,7 +196,7 @@ public abstract class PolarisMetaStoreSession implements BasePersistence { * @param entity entity record to write, potentially replacing an existing entity record with the * same key */ - public abstract void writeToEntitiesChangeTracking( + protected abstract void writeToEntitiesChangeTracking( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity); /**
