This is an automated email from the ASF dual-hosted git repository. github-bot pushed a commit to branch cherry-pick-20e2ae1e-to-branch-1.2 in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 8822f88767ed69d83f8eab2b1c90b3341b79375f Author: yangyang zhong <[email protected]> AuthorDate: Fri Mar 20 19:31:24 2026 +0800 [#9518] improvement(authz): support batch get owner (#9914) ### What changes were proposed in this pull request? support batch get owner ### Why are the changes needed? Fix: #9518 ### Does this PR introduce _any_ user-facing change? None ### How was this patch tested? Existing IT --------- Co-authored-by: Rory <[email protected]> --- .../java/org/apache/gravitino/GravitinoEnv.java | 2 +- .../org/apache/gravitino/RelationalEntity.java | 87 ++++++++++ .../gravitino/SupportsRelationOperations.java | 29 ++++ .../gravitino/cache/CaffeineEntityCache.java | 10 ++ .../org/apache/gravitino/cache/EntityCache.java | 15 ++ .../org/apache/gravitino/cache/NoOpsCache.java | 7 + .../org/apache/gravitino/cache/SegmentedLock.java | 124 ++++++++++++-- .../gravitino/storage/relational/JDBCBackend.java | 39 +++++ .../storage/relational/RelationalEntityStore.java | 85 ++++++++++ .../storage/relational/mapper/OwnerMetaMapper.java | 9 ++ .../mapper/OwnerMetaSQLProviderFactory.java | 8 + .../provider/base/OwnerMetaBaseSQLProvider.java | 29 ++++ .../storage/relational/po/UserOwnerRelPO.java | 30 ++++ .../relational/service/OwnerMetaService.java | 65 ++++++++ .../relational/service/TestOwnerMetaService.java | 180 +++++++++++++++++++++ .../server/authorization/MetadataAuthzHelper.java | 20 +++ 16 files changed, 722 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java index b0a58554d4..4918457bd0 100644 --- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java +++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java @@ -456,7 +456,7 @@ public class GravitinoEnv { } public boolean cacheEnabled() { - return config.get(Configs.CACHE_ENABLED); + return config == null || config.get(Configs.CACHE_ENABLED); } public void start() { diff --git a/core/src/main/java/org/apache/gravitino/RelationalEntity.java b/core/src/main/java/org/apache/gravitino/RelationalEntity.java new file mode 100644 index 0000000000..4c9cd20bcc --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/RelationalEntity.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino; + +/** + * Represents a directed relation between two entities. The source is identified by a {@link + * NameIdentifier} and an {@link Entity.EntityType}; the target is the actual resolved entity, + * avoiding additional database round-trips to look it up. + * + * @param <T> the type of the target entity + */ +public class RelationalEntity<T extends Entity & HasIdentifier> { + private final SupportsRelationOperations.Type type; + private final NameIdentifier source; + private final Entity.EntityType sourceType; + private final T targetEntity; + + /** + * Constructs a RelationalEntity. + * + * @param type the relation type + * @param source the source identifier + * @param sourceType the entity type of the source + * @param targetEntity the resolved target entity + */ + public RelationalEntity( + SupportsRelationOperations.Type type, + NameIdentifier source, + Entity.EntityType sourceType, + T targetEntity) { + this.type = type; + this.source = source; + this.sourceType = sourceType; + this.targetEntity = targetEntity; + } + + /** + * Gets the relation type. + * + * @return the type of the relation + */ + public SupportsRelationOperations.Type type() { + return type; + } + + /** + * Gets the source identifier. + * + * @return the source identifier + */ + public NameIdentifier source() { + return source; + } + + /** + * Gets the entity type of the source. + * + * @return the source entity type + */ + public Entity.EntityType sourceType() { + return sourceType; + } + + /** + * Gets the resolved target entity. + * + * @return the target entity + */ + public T targetEntity() { + return targetEntity; + } +} diff --git a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java index 7d8aca914e..854760060d 100644 --- a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java +++ b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java @@ -92,6 +92,35 @@ public interface SupportsRelationOperations { Type relType, NameIdentifier nameIdentifier, Entity.EntityType identType, boolean allFields) throws IOException; + /** + * Retrieves the relations for a batch of source entities in a single call. + * + * <p>This is the batch counterpart of {@link #listEntitiesByRelation}. Instead of issuing one + * query per source entity, callers can supply a list of identifiers and receive all matching + * {@link RelationalEntity} objects in one round-trip to the backend. + * + * <p>For example, to fetch the owners for a collection of tables in one call: + * + * <pre> + * batchListEntitiesByRelation(OWNER_REL, tableIdentifiers, EntityType.TABLE); + * </pre> + * + * <p>Each returned {@link RelationalEntity} carries the source identifier, source entity type, + * and the resolved target entity, allowing callers to correlate results back to their inputs. + * Source identifiers that have no related entity will produce no {@link RelationalEntity} entry + * in the result; they will not cause an error. + * + * @param relType The type of relation to query. + * @param nameIdentifiers The list of source entity identifiers to look up relations for. + * @param identType The entity type that each element in {@code nameIdentifiers} represents. + * @return A list of {@link RelationalEntity} objects, one per (source, target) pair found. May be + * empty but never null. + * @throws IOException If a storage-related error occurs during the batch query. + */ + List<RelationalEntity<?>> batchListEntitiesByRelation( + Type relType, List<NameIdentifier> nameIdentifiers, Entity.EntityType identType) + throws IOException; + /** * Get a specific entity that is related to a given source entity. * diff --git a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java index ede392c996..952bb1e193 100644 --- a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java +++ b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java @@ -340,6 +340,16 @@ public class CaffeineEntityCache extends BaseEntityCache { return segmentedLock.withLockAndThrow(key, action); } + /** {@inheritDoc} */ + @Override + public <T, E extends Exception> T withMultipleKeyCacheLock( + List<EntityCacheKey> keys, EntityCache.ThrowingSupplier<T, E> action) throws E { + Preconditions.checkArgument(keys != null, "Keys cannot be null"); + Preconditions.checkArgument(action != null, "Action cannot be null"); + + return segmentedLock.withMultipleKeyLockAndThrow(keys, action); + } + /** * Removes the expired entity from the cache. This method is a hook method for the Cache, when an * entry expires, it will call this method. diff --git a/core/src/main/java/org/apache/gravitino/cache/EntityCache.java b/core/src/main/java/org/apache/gravitino/cache/EntityCache.java index 3ad6de1e8a..218c5099d0 100644 --- a/core/src/main/java/org/apache/gravitino/cache/EntityCache.java +++ b/core/src/main/java/org/apache/gravitino/cache/EntityCache.java @@ -19,6 +19,7 @@ package org.apache.gravitino.cache; +import java.util.List; import org.apache.gravitino.Entity; /** @@ -61,6 +62,20 @@ public interface EntityCache extends SupportsEntityStoreCache, SupportsRelationE <T, E extends Exception> T withCacheLock(EntityCacheKey key, ThrowingSupplier<T, E> action) throws E; + /** + * Acquires locks for multiple cache keys and executes the action, returning the result. Keys are + * locked in a consistent order to avoid deadlocks. + * + * @param keys The cache keys to lock + * @param action The action to execute while holding all locks + * @param <T> The type of the result + * @param <E> The type of exception that may be thrown + * @return The result of the action + * @throws E if the action throws an exception of type E + */ + <T, E extends Exception> T withMultipleKeyCacheLock( + List<EntityCacheKey> keys, ThrowingSupplier<T, E> action) throws E; + /** * A functional interface that represents a supplier that may throw an exception. * diff --git a/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java b/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java index 74babb75a5..a9c9939bc1 100644 --- a/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java +++ b/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java @@ -71,6 +71,13 @@ public class NoOpsCache extends BaseEntityCache { return opLock.withLockAndThrow(key, action); } + /** {@inheritDoc} */ + @Override + public <T, E extends Exception> T withMultipleKeyCacheLock( + List<EntityCacheKey> keys, ThrowingSupplier<T, E> action) throws E { + return opLock.withMultipleKeyLockAndThrow(keys, action); + } + /** {@inheritDoc} */ @Override public <E extends Entity & HasIdentifier> Optional<E> getIfPresent( diff --git a/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java b/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java index dce8eb63ce..c620125900 100644 --- a/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java +++ b/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java @@ -21,18 +21,30 @@ package org.apache.gravitino.cache; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.Striped; +import java.util.Comparator; +import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * Segmented lock for improved concurrency. Divides locks into segments to reduce contention. * Supports global clearing operations that require exclusive access to all segments. */ public class SegmentedLock { - private final Striped<Lock> stripedLocks; private static final Object NULL_KEY = new Object(); + private final Striped<Lock> stripedLocks; + + /** + * Stable index for each stripe lock, built once at construction. Used to sort locks in {@link + * #getDistinctSortedLocks} with a guaranteed total order (no hash-collision ties). + */ + private final Map<Lock, Integer> lockIndices; + /** CountDownLatch for global operations - null when no operation is in progress */ private final AtomicReference<CountDownLatch> globalOperationLatch = new AtomicReference<>(); @@ -50,6 +62,10 @@ public class SegmentedLock { } this.stripedLocks = Striped.lock(numSegments); + this.lockIndices = + IntStream.range(0, this.stripedLocks.size()) + .boxed() + .collect(Collectors.toMap(this.stripedLocks::getAt, i -> i)); } /** @@ -176,6 +192,65 @@ public class SegmentedLock { } } + /** + * Acquires locks for multiple keys in a consistent order (sorted by stripe index) to avoid + * deadlocks, then executes the action. + * + * @param keys The keys to lock + * @param action Action to run + * @param <E> Exception type + * @throws E Exception from the action + */ + public <E extends Exception> void withMultipleKeyLockAndThrow( + List<? extends Object> keys, EntityCache.ThrowingRunnable<E> action) throws E { + waitForGlobalComplete(); + List<Lock> sortedLocks = getDistinctSortedLocks(keys); + acquireAllLocks(sortedLocks); + try { + action.run(); + } finally { + releaseAllLocks(sortedLocks); + } + } + + /** + * Acquires locks for multiple keys in a consistent order (sorted by stripe index) to avoid + * deadlocks, then executes the action and returns the result. + * + * @param keys The keys to lock + * @param action Action to run + * @param <T> Result type + * @param <E> Exception type + * @return Action result + * @throws E Exception from the action + */ + public <T, E extends Exception> T withMultipleKeyLockAndThrow( + List<? extends Object> keys, EntityCache.ThrowingSupplier<T, E> action) throws E { + waitForGlobalComplete(); + List<Lock> sortedLocks = getDistinctSortedLocks(keys); + acquireAllLocks(sortedLocks); + try { + return action.get(); + } finally { + releaseAllLocks(sortedLocks); + } + } + + /** Checks if a global operation is currently in progress. */ + @VisibleForTesting + public boolean isClearing() { + return globalOperationLatch.get() != null; + } + + /** + * Returns number of lock segments. + * + * @return Number of segments + */ + public int getNumSegments() { + return stripedLocks.size(); + } + /** * Executes a global clearing operation with exclusive access to all segments. This method sets * the clearing flag and ensures no other operations can proceed until the clearing is complete. @@ -202,6 +277,38 @@ public class SegmentedLock { } } + private List<Lock> getDistinctSortedLocks(List<? extends Object> keys) { + return keys.stream() + .map(this::getSegmentLock) + .distinct() + .sorted(Comparator.comparingInt(lockIndices::get)) + .collect(Collectors.toList()); + } + + private void acquireAllLocks(List<Lock> locks) { + int lockedCount = 0; + try { + for (Lock lock : locks) { + lock.lockInterruptibly(); + lockedCount++; + } + } catch (InterruptedException e) { + releaseLocks(locks, lockedCount); + Thread.currentThread().interrupt(); + throw new IllegalStateException("Thread was interrupted while acquiring batch lock", e); + } + } + + private static void releaseLocks(List<Lock> locks, int lockedCount) { + for (int i = lockedCount - 1; i >= 0; i--) { + locks.get(i).unlock(); + } + } + + private void releaseAllLocks(List<Lock> locks) { + releaseLocks(locks, locks.size()); + } + /** * Waits for any ongoing global operation to complete. This method is called by regular operations * to ensure they don't interfere with global operations. @@ -218,19 +325,4 @@ public class SegmentedLock { } } } - - /** Checks if a global operation is currently in progress. */ - @VisibleForTesting - public boolean isClearing() { - return globalOperationLatch.get() != null; - } - - /** - * Returns number of lock segments. - * - * @return Number of segments - */ - public int getNumSegments() { - return stripedLocks.size(); - } } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java index 2f4becd77e..900065f87a 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java @@ -38,6 +38,7 @@ import org.apache.gravitino.EntityAlreadyExistsException; import org.apache.gravitino.HasIdentifier; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; +import org.apache.gravitino.RelationalEntity; import org.apache.gravitino.SupportsRelationOperations; import org.apache.gravitino.UnsupportedEntityTypeException; import org.apache.gravitino.exceptions.NoSuchEntityException; @@ -350,6 +351,31 @@ public class JDBCBackend implements RelationalBackend { case JOB_TEMPLATE: return (List<E>) JobTemplateMetaService.getInstance().batchGetJobTemplateByIdentifier(identifiers); + case USER: + // TODO: I will add batch operations for users, groups and views + List<E> users = Lists.newArrayList(); + for (NameIdentifier identifier : identifiers) { + users.add((E) UserMetaService.getInstance().getUserByIdentifier(identifier)); + } + return users; + case GROUP: + List<E> groups = Lists.newArrayList(); + for (NameIdentifier identifier : identifiers) { + groups.add((E) GroupMetaService.getInstance().getGroupByIdentifier(identifier)); + } + return groups; + case ROLE: + List<E> roles = Lists.newArrayList(); + for (NameIdentifier identifier : identifiers) { + roles.add((E) RoleMetaService.getInstance().getRoleByIdentifier(identifier)); + } + return roles; + case VIEW: + List<E> views = Lists.newArrayList(); + for (NameIdentifier identifier : identifiers) { + views.add((E) ViewMetaService.getInstance().getViewByIdentifier(identifier)); + } + return views; default: throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for batch get operation", entityType); @@ -669,6 +695,19 @@ public class JDBCBackend implements RelationalBackend { } } + @Override + public List<RelationalEntity<?>> batchListEntitiesByRelation( + Type relType, List<NameIdentifier> nameIdentifiers, Entity.EntityType identType) + throws IOException { + switch (relType) { + case OWNER_REL: + return OwnerMetaService.getInstance().batchGetOwner(nameIdentifiers, identType); + default: + throw new IllegalArgumentException( + String.format("Doesn't support the relation type %s", relType)); + } + } + @Override public void insertRelation( SupportsRelationOperations.Type relType, diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java index 31183f2649..9fdd2c7886 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java @@ -24,7 +24,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.function.Function; import org.apache.commons.lang3.tuple.Pair; @@ -36,10 +38,12 @@ import org.apache.gravitino.EntityStore; import org.apache.gravitino.HasIdentifier; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; +import org.apache.gravitino.RelationalEntity; import org.apache.gravitino.SupportsRelationOperations; import org.apache.gravitino.cache.CacheFactory; import org.apache.gravitino.cache.CachedEntityIdResolver; import org.apache.gravitino.cache.EntityCache; +import org.apache.gravitino.cache.EntityCacheKey; import org.apache.gravitino.cache.EntityCacheRelationKey; import org.apache.gravitino.cache.NoOpsCache; import org.apache.gravitino.exceptions.NoSuchEntityException; @@ -225,6 +229,47 @@ public class RelationalEntityStore implements EntityStore, SupportsRelationOpera }); } + @Override + public List<RelationalEntity<?>> batchListEntitiesByRelation( + Type relType, List<NameIdentifier> nameIdentifiers, Entity.EntityType identType) + throws IOException { + if (nameIdentifiers == null || nameIdentifiers.isEmpty()) { + return new ArrayList<>(); + } + + List<EntityCacheKey> lockKeys = new ArrayList<>(); + for (NameIdentifier id : nameIdentifiers) { + lockKeys.add(EntityCacheRelationKey.of(id, identType, relType)); + } + + return cache.withMultipleKeyCacheLock( + lockKeys, + () -> { + List<RelationalEntity<?>> result = new ArrayList<>(); + List<NameIdentifier> uncachedIdentifiers = new ArrayList<>(); + + for (NameIdentifier nameIdentifier : nameIdentifiers) { + Optional<List<RelationalEntity<?>>> cachedRelations = + getCachedRelations(relType, nameIdentifier, identType); + if (cachedRelations.isPresent()) { + result.addAll(cachedRelations.get()); + } else { + uncachedIdentifiers.add(nameIdentifier); + } + } + + if (!uncachedIdentifiers.isEmpty()) { + List<RelationalEntity<?>> backendRelations = + backend.batchListEntitiesByRelation(relType, uncachedIdentifiers, identType); + result.addAll(backendRelations); + + batchPopulateRelationCache(relType, identType, uncachedIdentifiers, backendRelations); + } + + return result; + }); + } + @Override public <E extends Entity & HasIdentifier> E getEntityByRelation( Type relType, @@ -318,4 +363,44 @@ public class RelationalEntityStore implements EntityStore, SupportsRelationOpera throws IOException, EntityAlreadyExistsException { backend.batchPut(entities, overwritten); } + + private <E extends Entity & HasIdentifier> Optional<List<RelationalEntity<?>>> getCachedRelations( + SupportsRelationOperations.Type relType, + NameIdentifier nameIdentifier, + Entity.EntityType identType) { + Optional<List<E>> entitiesOpt = cache.getIfPresent(relType, nameIdentifier, identType); + if (entitiesOpt.isPresent()) { + List<RelationalEntity<?>> cachedRelations = new ArrayList<>(); + for (E entity : entitiesOpt.get()) { + cachedRelations.add(new RelationalEntity<>(relType, nameIdentifier, identType, entity)); + } + return Optional.of(cachedRelations); + } + return Optional.empty(); + } + + private <E extends Entity & HasIdentifier> void batchPopulateRelationCache( + SupportsRelationOperations.Type relType, + Entity.EntityType identType, + List<NameIdentifier> uncachedIdentifiers, + List<RelationalEntity<?>> backendRelations) { + Map<NameIdentifier, List<RelationalEntity<?>>> relationsBySource = new HashMap<>(); + for (RelationalEntity<?> relation : backendRelations) { + relationsBySource.computeIfAbsent(relation.source(), k -> new ArrayList<>()).add(relation); + } + + for (NameIdentifier sourceId : uncachedIdentifiers) { + List<RelationalEntity<?>> sourceRelations = relationsBySource.get(sourceId); + List<E> entityList = new ArrayList<>(); + if (sourceRelations != null) { + for (RelationalEntity<?> rel : sourceRelations) { + @SuppressWarnings("unchecked") + E entity = (E) rel.targetEntity(); + entityList.add(entity); + } + } + + cache.put(sourceId, identType, relType, entityList); + } + } } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaMapper.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaMapper.java index 455d78fb59..17247163e2 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaMapper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaMapper.java @@ -18,8 +18,10 @@ */ package org.apache.gravitino.storage.relational.mapper; +import java.util.List; import org.apache.gravitino.storage.relational.po.GroupPO; import org.apache.gravitino.storage.relational.po.OwnerRelPO; +import org.apache.gravitino.storage.relational.po.UserOwnerRelPO; import org.apache.gravitino.storage.relational.po.UserPO; import org.apache.ibatis.annotations.InsertProvider; import org.apache.ibatis.annotations.Param; @@ -45,6 +47,13 @@ public interface OwnerMetaMapper { @Param("metadataObjectId") Long metadataObjectId, @Param("metadataObjectType") String metadataObjectType); + @SelectProvider( + type = OwnerMetaSQLProviderFactory.class, + method = "batchSelectUserOwnerMetaByMetadataObjectIdAndType") + List<UserOwnerRelPO> batchSelectUserOwnerMetaByMetadataObjectIdAndType( + @Param("metadataObjectIds") List<Long> metadataObjectIds, + @Param("metadataObjectType") String metadataObjectType); + @SelectProvider( type = OwnerMetaSQLProviderFactory.class, method = "selectGroupOwnerMetaByMetadataObjectIdAndType") diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java index b7b528a300..0dfcdc5bed 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java @@ -19,6 +19,7 @@ package org.apache.gravitino.storage.relational.mapper; import com.google.common.collect.ImmutableMap; +import java.util.List; import java.util.Map; import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; import org.apache.gravitino.storage.relational.mapper.provider.base.OwnerMetaBaseSQLProvider; @@ -96,4 +97,11 @@ public class OwnerMetaSQLProviderFactory { @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { return getProvider().deleteOwnerMetasByLegacyTimeline(legacyTimeline, limit); } + + public static String batchSelectUserOwnerMetaByMetadataObjectIdAndType( + @Param("metadataObjectIds") List<Long> metadataObjectIds, + @Param("metadataObjectType") String metadataObjectType) { + return getProvider() + .batchSelectUserOwnerMetaByMetadataObjectIdAndType(metadataObjectIds, metadataObjectType); + } } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java index e300c5bb44..ece7bfc22a 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java @@ -20,6 +20,7 @@ package org.apache.gravitino.storage.relational.mapper.provider.base; import static org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper.OWNER_TABLE_NAME; +import java.util.List; import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper; import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper; import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper; @@ -53,6 +54,34 @@ public class OwnerMetaBaseSQLProvider { + " ot.deleted_at = 0 AND ut.deleted_at = 0"; } + public String batchSelectUserOwnerMetaByMetadataObjectIdAndType( + @Param("metadataObjectIds") List<Long> metadataObjectIds, + @Param("metadataObjectType") String metadataObjectType) { + return "<script>" + + "SELECT ot.metadata_object_id as metadataObjectId," + + "ut.user_id as userId, " + + "ut.user_name as userName, " + + "ut.metalake_id as metalakeId, " + + "ut.audit_info as auditInfo, " + + "ut.current_version as currentVersion, " + + "ut.last_version as lastVersion, " + + "ut.deleted_at as deletedAt " + + "FROM " + + OWNER_TABLE_NAME + + " ot LEFT JOIN " + + UserMetaMapper.USER_TABLE_NAME + + " ut ON ut.user_id = ot.owner_id " + + "WHERE " + + "ot.metadata_object_type = #{metadataObjectType} " + + "AND ot.owner_type = 'USER' " + + "AND ot.metadata_object_id IN " + + "<foreach collection='metadataObjectIds' item='itemId' open='(' separator=',' close=')'>" + + "#{itemId}" + + "</foreach> " + + "AND ot.deleted_at = 0 AND ut.deleted_at = 0 " + + "</script>"; + } + public String selectGroupOwnerMetaByMetadataObjectIdAndType( @Param("metadataObjectId") Long metadataObjectId, @Param("metadataObjectType") String metadataObjectType) { diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/po/UserOwnerRelPO.java b/core/src/main/java/org/apache/gravitino/storage/relational/po/UserOwnerRelPO.java new file mode 100644 index 0000000000..969b49c1c1 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/po/UserOwnerRelPO.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage.relational.po; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +public class UserOwnerRelPO extends UserPO { + + private Long metadataObjectId; +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java b/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java index a68b2ebb3e..2a209e654f 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java @@ -20,15 +20,27 @@ package org.apache.gravitino.storage.relational.service; import static org.apache.gravitino.metrics.source.MetricsSource.GRAVITINO_RELATIONAL_STORE_METRIC_NAME; +import com.google.common.base.Preconditions; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.commons.collections4.CollectionUtils; import org.apache.gravitino.Entity; import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.RelationalEntity; +import org.apache.gravitino.SupportsRelationOperations; import org.apache.gravitino.authorization.AuthorizationUtils; +import org.apache.gravitino.meta.UserEntity; import org.apache.gravitino.metrics.Monitored; import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper; import org.apache.gravitino.storage.relational.po.GroupPO; import org.apache.gravitino.storage.relational.po.OwnerRelPO; +import org.apache.gravitino.storage.relational.po.UserOwnerRelPO; import org.apache.gravitino.storage.relational.po.UserPO; import org.apache.gravitino.storage.relational.utils.POConverters; import org.apache.gravitino.storage.relational.utils.SessionUtils; @@ -79,6 +91,59 @@ public class OwnerMetaService { return Optional.empty(); } + @Monitored( + metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, + baseMetricName = "batchGetOwner") + public List<RelationalEntity<?>> batchGetOwner( + List<NameIdentifier> identifiers, Entity.EntityType type) { + if (CollectionUtils.isEmpty(identifiers)) { + return new ArrayList<>(); + } + String metalake = NameIdentifierUtil.getMetalake(identifiers.get(0)); + for (NameIdentifier identifier : identifiers) { + Preconditions.checkArgument( + Objects.equals(NameIdentifierUtil.getMetalake(identifier), metalake), + "identifiers should in one metalake"); + } + List<RelationalEntity<?>> result = new ArrayList<>(); + Map<Long, NameIdentifier> nameIdentifierMap = new HashMap<>(); + List<Long> entityIds = + identifiers.stream() + .map( + identifier -> { + long entityId = EntityIdService.getEntityId(identifier, type); + nameIdentifierMap.put(entityId, identifier); + return entityId; + }) + .collect(Collectors.toList()); + + // Get user owners + List<UserOwnerRelPO> userPOList = + SessionUtils.getWithoutCommit( + OwnerMetaMapper.class, + mapper -> + mapper.batchSelectUserOwnerMetaByMetadataObjectIdAndType(entityIds, type.name())); + if (CollectionUtils.isNotEmpty(userPOList)) { + userPOList.forEach( + userPO -> { + UserEntity userEntity = + POConverters.fromUserPO( + userPO, Collections.emptyList(), AuthorizationUtils.ofUserNamespace(metalake)); + result.add( + new RelationalEntity<>( + SupportsRelationOperations.Type.OWNER_REL, + nameIdentifierMap.get(userPO.getMetadataObjectId()), + type, + userEntity)); + }); + } + + // TODO: Add batch support for group owners when GroupOwnerRelPO and batch method are available + // For now, we only handle user owners in batch mode + + return result; + } + @Monitored(metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, baseMetricName = "setOwner") public void setOwner( NameIdentifier entity, diff --git a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java index 8f42bb55c6..8d1ad4ca93 100644 --- a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java +++ b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java @@ -23,8 +23,15 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.gravitino.Entity; +import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; +import org.apache.gravitino.RelationalEntity; +import org.apache.gravitino.SupportsRelationOperations; import org.apache.gravitino.authorization.AuthorizationUtils; import org.apache.gravitino.meta.AuditInfo; import org.apache.gravitino.meta.BaseMetalake; @@ -491,6 +498,179 @@ class TestOwnerMetaService extends TestJDBCBackend { Assertions.assertEquals(0, countActiveOwnerRel(user.id())); } + @TestTemplate + void testBatchGetOwnerWithUserOwners() throws IOException { + createAndInsertMakeLake(METALAKE_NAME); + CatalogEntity catalog = createAndInsertCatalog(METALAKE_NAME, CATALOG_NAME); + SchemaEntity schema = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME); + TableEntity table = + createTableEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.of(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME), + TABLE_NAME, + AUDIT_INFO); + backend.insert(table, false); + TopicEntity topic = + createTopicEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.of(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME), + TOPIC_NAME, + AUDIT_INFO); + backend.insert(topic, false); + + UserEntity user = + createUserEntity( + RandomIdGenerator.INSTANCE.nextId(), + AuthorizationUtils.ofUserNamespace(METALAKE_NAME), + "user", + AUDIT_INFO); + backend.insert(user, false); + + OwnerMetaService.getInstance() + .setOwner(catalog.nameIdentifier(), catalog.type(), user.nameIdentifier(), user.type()); + OwnerMetaService.getInstance() + .setOwner(schema.nameIdentifier(), schema.type(), user.nameIdentifier(), user.type()); + OwnerMetaService.getInstance() + .setOwner(table.nameIdentifier(), table.type(), user.nameIdentifier(), user.type()); + OwnerMetaService.getInstance() + .setOwner(topic.nameIdentifier(), topic.type(), user.nameIdentifier(), user.type()); + + List<NameIdentifier> identifiers = + List.of( + catalog.nameIdentifier(), + schema.nameIdentifier(), + table.nameIdentifier(), + topic.nameIdentifier()); + + List<RelationalEntity<?>> relations = + OwnerMetaService.getInstance().batchGetOwner(identifiers, Entity.EntityType.CATALOG); + Assertions.assertEquals(1, relations.size()); + + // catalog is CATALOG type; schema/table/topic share the same owner but different types. + // batchGetOwner queries by a single identType, so query per type. + List<RelationalEntity<?>> catalogRelations = + OwnerMetaService.getInstance() + .batchGetOwner(List.of(catalog.nameIdentifier()), Entity.EntityType.CATALOG); + List<RelationalEntity<?>> schemaRelations = + OwnerMetaService.getInstance() + .batchGetOwner(List.of(schema.nameIdentifier()), Entity.EntityType.SCHEMA); + List<RelationalEntity<?>> tableRelations = + OwnerMetaService.getInstance() + .batchGetOwner(List.of(table.nameIdentifier()), Entity.EntityType.TABLE); + List<RelationalEntity<?>> topicRelations = + OwnerMetaService.getInstance() + .batchGetOwner(List.of(topic.nameIdentifier()), Entity.EntityType.TOPIC); + + Assertions.assertEquals(1, catalogRelations.size()); + Assertions.assertEquals(catalog.nameIdentifier(), catalogRelations.get(0).source()); + Assertions.assertEquals(Entity.EntityType.CATALOG, catalogRelations.get(0).sourceType()); + Assertions.assertEquals(Entity.EntityType.USER, catalogRelations.get(0).targetEntity().type()); + Assertions.assertEquals( + SupportsRelationOperations.Type.OWNER_REL, catalogRelations.get(0).type()); + + Assertions.assertEquals(1, schemaRelations.size()); + Assertions.assertEquals(schema.nameIdentifier(), schemaRelations.get(0).source()); + + Assertions.assertEquals(1, tableRelations.size()); + Assertions.assertEquals(table.nameIdentifier(), tableRelations.get(0).source()); + + Assertions.assertEquals(1, topicRelations.size()); + Assertions.assertEquals(topic.nameIdentifier(), topicRelations.get(0).source()); + + // All targets should be the same user + List<RelationalEntity<?>> allRelations = + List.of( + catalogRelations.get(0), + schemaRelations.get(0), + tableRelations.get(0), + topicRelations.get(0)); + for (RelationalEntity<?> rel : allRelations) { + Assertions.assertEquals(user.nameIdentifier(), rel.targetEntity().nameIdentifier()); + } + } + + @TestTemplate + void testBatchGetOwnerWithMultipleEntitiesOfSameType() throws IOException { + createAndInsertMakeLake(METALAKE_NAME); + createAndInsertCatalog(METALAKE_NAME, CATALOG_NAME); + SchemaEntity schema1 = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME); + SchemaEntity schema2 = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME + "_2"); + SchemaEntity schema3 = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME + "_3"); + + UserEntity user1 = + createUserEntity( + RandomIdGenerator.INSTANCE.nextId(), + AuthorizationUtils.ofUserNamespace(METALAKE_NAME), + "user1", + AUDIT_INFO); + backend.insert(user1, false); + UserEntity user2 = + createUserEntity( + RandomIdGenerator.INSTANCE.nextId(), + AuthorizationUtils.ofUserNamespace(METALAKE_NAME), + "user2", + AUDIT_INFO); + backend.insert(user2, false); + + // schema1 -> user1, schema2 -> user2, schema3 has no owner + OwnerMetaService.getInstance() + .setOwner(schema1.nameIdentifier(), schema1.type(), user1.nameIdentifier(), user1.type()); + OwnerMetaService.getInstance() + .setOwner(schema2.nameIdentifier(), schema2.type(), user2.nameIdentifier(), user2.type()); + + List<RelationalEntity<?>> relations = + OwnerMetaService.getInstance() + .batchGetOwner( + List.of( + schema1.nameIdentifier(), schema2.nameIdentifier(), schema3.nameIdentifier()), + Entity.EntityType.SCHEMA); + + Assertions.assertEquals(2, relations.size()); + + Map<NameIdentifier, NameIdentifier> sourceToTarget = + relations.stream() + .collect( + Collectors.toMap(RelationalEntity::source, r -> r.targetEntity().nameIdentifier())); + + Assertions.assertEquals(user1.nameIdentifier(), sourceToTarget.get(schema1.nameIdentifier())); + Assertions.assertEquals(user2.nameIdentifier(), sourceToTarget.get(schema2.nameIdentifier())); + // schema3 has no owner — not present in results + Assertions.assertFalse(sourceToTarget.containsKey(schema3.nameIdentifier())); + + // Verify RelationalEntity metadata + for (RelationalEntity<?> rel : relations) { + Assertions.assertEquals(SupportsRelationOperations.Type.OWNER_REL, rel.type()); + Assertions.assertEquals(Entity.EntityType.SCHEMA, rel.sourceType()); + Assertions.assertEquals(Entity.EntityType.USER, rel.targetEntity().type()); + } + } + + @TestTemplate + void testBatchGetOwnerWithEmptyInput() { + List<RelationalEntity<?>> relations = + OwnerMetaService.getInstance() + .batchGetOwner(Collections.emptyList(), Entity.EntityType.TABLE); + Assertions.assertNotNull(relations); + Assertions.assertTrue(relations.isEmpty()); + } + + @TestTemplate + void testBatchGetOwnerNoneHaveOwners() throws IOException { + createAndInsertMakeLake(METALAKE_NAME); + createAndInsertCatalog(METALAKE_NAME, CATALOG_NAME); + SchemaEntity schema1 = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME); + SchemaEntity schema2 = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME + "_2"); + + List<RelationalEntity<?>> relations = + OwnerMetaService.getInstance() + .batchGetOwner( + List.of(schema1.nameIdentifier(), schema2.nameIdentifier()), + Entity.EntityType.SCHEMA); + + Assertions.assertNotNull(relations); + Assertions.assertTrue(relations.isEmpty()); + } + private Integer countAllOwnerRel(Long ownerId) { try (SqlSession sqlSession = SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true); diff --git a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java index 5a0f1ee8bf..b63251fd96 100644 --- a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java +++ b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java @@ -33,11 +33,13 @@ import java.util.function.Function; import org.apache.gravitino.Config; import org.apache.gravitino.Configs; import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityStore; import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.MetadataObject; import org.apache.gravitino.Metalake; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; +import org.apache.gravitino.SupportsRelationOperations; import org.apache.gravitino.authorization.AuthorizationRequestContext; import org.apache.gravitino.authorization.GravitinoAuthorizer; import org.apache.gravitino.dto.tag.MetadataObjectDTO; @@ -166,6 +168,7 @@ public class MetadataAuthzHelper { Entity.EntityType entityType, NameIdentifier[] nameIdentifiers) { preloadToCache(entityType, nameIdentifiers); + preloadOwner(entityType, nameIdentifiers); return filterByExpression(metalake, expression, entityType, nameIdentifiers, e -> e); } @@ -390,4 +393,21 @@ public class MetadataAuthzHelper { entityType, EntityClassMapper.getEntityClass(entityType)); } + + private static void preloadOwner(Entity.EntityType entityType, NameIdentifier[] nameIdentifiers) { + if (!GravitinoEnv.getInstance().cacheEnabled()) { + return; + } + EntityStore entityStore = GravitinoEnv.getInstance().entityStore(); + try { + entityStore + .relationOperations() + .batchListEntitiesByRelation( + SupportsRelationOperations.Type.OWNER_REL, + Arrays.stream(nameIdentifiers).toList(), + entityType); + } catch (Exception e) { + LOG.warn("Ignore preloadOwner error:{}", e.getMessage(), e); + } + } }
