This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 20e2ae1ebf [#9518] improvement(authz): support batch get owner (#9914)
20e2ae1ebf is described below
commit 20e2ae1ebf55059a4ee8b845075e863171746125
Author: yangyang zhong <[email protected]>
AuthorDate: Fri Mar 20 19:31:24 2026 +0800
[#9518] improvement(authz): support batch get owner (#9914)
### What changes were proposed in this pull request?
support batch get owner
### Why are the changes needed?
Fix: #9518
### Does this PR introduce _any_ user-facing change?
None
### How was this patch tested?
Existing IT
---------
Co-authored-by: Rory <[email protected]>
---
.../java/org/apache/gravitino/GravitinoEnv.java | 2 +-
.../org/apache/gravitino/RelationalEntity.java | 87 ++++++++++
.../gravitino/SupportsRelationOperations.java | 29 ++++
.../gravitino/cache/CaffeineEntityCache.java | 10 ++
.../org/apache/gravitino/cache/EntityCache.java | 15 ++
.../org/apache/gravitino/cache/NoOpsCache.java | 7 +
.../org/apache/gravitino/cache/SegmentedLock.java | 124 ++++++++++++--
.../gravitino/storage/relational/JDBCBackend.java | 39 +++++
.../storage/relational/RelationalEntityStore.java | 85 ++++++++++
.../storage/relational/mapper/OwnerMetaMapper.java | 9 ++
.../mapper/OwnerMetaSQLProviderFactory.java | 8 +
.../provider/base/OwnerMetaBaseSQLProvider.java | 29 ++++
.../storage/relational/po/UserOwnerRelPO.java | 30 ++++
.../relational/service/OwnerMetaService.java | 65 ++++++++
.../relational/service/TestOwnerMetaService.java | 180 +++++++++++++++++++++
.../server/authorization/MetadataAuthzHelper.java | 20 +++
16 files changed, 722 insertions(+), 17 deletions(-)
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index b0a58554d4..4918457bd0 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -456,7 +456,7 @@ public class GravitinoEnv {
}
public boolean cacheEnabled() {
- return config.get(Configs.CACHE_ENABLED);
+ return config == null || config.get(Configs.CACHE_ENABLED);
}
public void start() {
diff --git a/core/src/main/java/org/apache/gravitino/RelationalEntity.java
b/core/src/main/java/org/apache/gravitino/RelationalEntity.java
new file mode 100644
index 0000000000..4c9cd20bcc
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/RelationalEntity.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino;
+
+/**
+ * Represents a directed relation between two entities. The source is
identified by a {@link
+ * NameIdentifier} and an {@link Entity.EntityType}; the target is the actual
resolved entity,
+ * avoiding additional database round-trips to look it up.
+ *
+ * @param <T> the type of the target entity
+ */
+public class RelationalEntity<T extends Entity & HasIdentifier> {
+ private final SupportsRelationOperations.Type type;
+ private final NameIdentifier source;
+ private final Entity.EntityType sourceType;
+ private final T targetEntity;
+
+ /**
+ * Constructs a RelationalEntity.
+ *
+ * @param type the relation type
+ * @param source the source identifier
+ * @param sourceType the entity type of the source
+ * @param targetEntity the resolved target entity
+ */
+ public RelationalEntity(
+ SupportsRelationOperations.Type type,
+ NameIdentifier source,
+ Entity.EntityType sourceType,
+ T targetEntity) {
+ this.type = type;
+ this.source = source;
+ this.sourceType = sourceType;
+ this.targetEntity = targetEntity;
+ }
+
+ /**
+ * Gets the relation type.
+ *
+ * @return the type of the relation
+ */
+ public SupportsRelationOperations.Type type() {
+ return type;
+ }
+
+ /**
+ * Gets the source identifier.
+ *
+ * @return the source identifier
+ */
+ public NameIdentifier source() {
+ return source;
+ }
+
+ /**
+ * Gets the entity type of the source.
+ *
+ * @return the source entity type
+ */
+ public Entity.EntityType sourceType() {
+ return sourceType;
+ }
+
+ /**
+ * Gets the resolved target entity.
+ *
+ * @return the target entity
+ */
+ public T targetEntity() {
+ return targetEntity;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
index 7d8aca914e..854760060d 100644
--- a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
+++ b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
@@ -92,6 +92,35 @@ public interface SupportsRelationOperations {
Type relType, NameIdentifier nameIdentifier, Entity.EntityType
identType, boolean allFields)
throws IOException;
+ /**
+ * Retrieves the relations for a batch of source entities in a single call.
+ *
+ * <p>This is the batch counterpart of {@link #listEntitiesByRelation}.
Instead of issuing one
+ * query per source entity, callers can supply a list of identifiers and
receive all matching
+ * {@link RelationalEntity} objects in one round-trip to the backend.
+ *
+ * <p>For example, to fetch the owners for a collection of tables in one
call:
+ *
+ * <pre>
+ * batchListEntitiesByRelation(OWNER_REL, tableIdentifiers,
EntityType.TABLE);
+ * </pre>
+ *
+ * <p>Each returned {@link RelationalEntity} carries the source identifier,
source entity type,
+ * and the resolved target entity, allowing callers to correlate results
back to their inputs.
+ * Source identifiers that have no related entity will produce no {@link
RelationalEntity} entry
+ * in the result; they will not cause an error.
+ *
+ * @param relType The type of relation to query.
+ * @param nameIdentifiers The list of source entity identifiers to look up
relations for.
+ * @param identType The entity type that each element in {@code
nameIdentifiers} represents.
+ * @return A list of {@link RelationalEntity} objects, one per (source,
target) pair found. May be
+ * empty but never null.
+ * @throws IOException If a storage-related error occurs during the batch
query.
+ */
+ List<RelationalEntity<?>> batchListEntitiesByRelation(
+ Type relType, List<NameIdentifier> nameIdentifiers, Entity.EntityType
identType)
+ throws IOException;
+
/**
* Get a specific entity that is related to a given source entity.
*
diff --git
a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
index ede392c996..952bb1e193 100644
--- a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
@@ -340,6 +340,16 @@ public class CaffeineEntityCache extends BaseEntityCache {
return segmentedLock.withLockAndThrow(key, action);
}
+ /** {@inheritDoc} */
+ @Override
+ public <T, E extends Exception> T withMultipleKeyCacheLock(
+ List<EntityCacheKey> keys, EntityCache.ThrowingSupplier<T, E> action)
throws E {
+ Preconditions.checkArgument(keys != null, "Keys cannot be null");
+ Preconditions.checkArgument(action != null, "Action cannot be null");
+
+ return segmentedLock.withMultipleKeyLockAndThrow(keys, action);
+ }
+
/**
* Removes the expired entity from the cache. This method is a hook method
for the Cache, when an
* entry expires, it will call this method.
diff --git a/core/src/main/java/org/apache/gravitino/cache/EntityCache.java
b/core/src/main/java/org/apache/gravitino/cache/EntityCache.java
index 3ad6de1e8a..218c5099d0 100644
--- a/core/src/main/java/org/apache/gravitino/cache/EntityCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/EntityCache.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.cache;
+import java.util.List;
import org.apache.gravitino.Entity;
/**
@@ -61,6 +62,20 @@ public interface EntityCache extends
SupportsEntityStoreCache, SupportsRelationE
<T, E extends Exception> T withCacheLock(EntityCacheKey key,
ThrowingSupplier<T, E> action)
throws E;
+ /**
+ * Acquires locks for multiple cache keys and executes the action, returning
the result. Keys are
+ * locked in a consistent order to avoid deadlocks.
+ *
+ * @param keys The cache keys to lock
+ * @param action The action to execute while holding all locks
+ * @param <T> The type of the result
+ * @param <E> The type of exception that may be thrown
+ * @return The result of the action
+ * @throws E if the action throws an exception of type E
+ */
+ <T, E extends Exception> T withMultipleKeyCacheLock(
+ List<EntityCacheKey> keys, ThrowingSupplier<T, E> action) throws E;
+
/**
* A functional interface that represents a supplier that may throw an
exception.
*
diff --git a/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java
b/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java
index 74babb75a5..a9c9939bc1 100644
--- a/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java
@@ -71,6 +71,13 @@ public class NoOpsCache extends BaseEntityCache {
return opLock.withLockAndThrow(key, action);
}
+ /** {@inheritDoc} */
+ @Override
+ public <T, E extends Exception> T withMultipleKeyCacheLock(
+ List<EntityCacheKey> keys, ThrowingSupplier<T, E> action) throws E {
+ return opLock.withMultipleKeyLockAndThrow(keys, action);
+ }
+
/** {@inheritDoc} */
@Override
public <E extends Entity & HasIdentifier> Optional<E> getIfPresent(
diff --git a/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java
b/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java
index dce8eb63ce..c620125900 100644
--- a/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java
+++ b/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java
@@ -21,18 +21,30 @@ package org.apache.gravitino.cache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Striped;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* Segmented lock for improved concurrency. Divides locks into segments to
reduce contention.
* Supports global clearing operations that require exclusive access to all
segments.
*/
public class SegmentedLock {
- private final Striped<Lock> stripedLocks;
private static final Object NULL_KEY = new Object();
+ private final Striped<Lock> stripedLocks;
+
+ /**
+ * Stable index for each stripe lock, built once at construction. Used to
sort locks in {@link
+ * #getDistinctSortedLocks} with a guaranteed total order (no hash-collision
ties).
+ */
+ private final Map<Lock, Integer> lockIndices;
+
/** CountDownLatch for global operations - null when no operation is in
progress */
private final AtomicReference<CountDownLatch> globalOperationLatch = new
AtomicReference<>();
@@ -50,6 +62,10 @@ public class SegmentedLock {
}
this.stripedLocks = Striped.lock(numSegments);
+ this.lockIndices =
+ IntStream.range(0, this.stripedLocks.size())
+ .boxed()
+ .collect(Collectors.toMap(this.stripedLocks::getAt, i -> i));
}
/**
@@ -176,6 +192,65 @@ public class SegmentedLock {
}
}
+ /**
+ * Acquires locks for multiple keys in a consistent order (sorted by stripe
index) to avoid
+ * deadlocks, then executes the action.
+ *
+ * @param keys The keys to lock
+ * @param action Action to run
+ * @param <E> Exception type
+ * @throws E Exception from the action
+ */
+ public <E extends Exception> void withMultipleKeyLockAndThrow(
+ List<? extends Object> keys, EntityCache.ThrowingRunnable<E> action)
throws E {
+ waitForGlobalComplete();
+ List<Lock> sortedLocks = getDistinctSortedLocks(keys);
+ acquireAllLocks(sortedLocks);
+ try {
+ action.run();
+ } finally {
+ releaseAllLocks(sortedLocks);
+ }
+ }
+
+ /**
+ * Acquires locks for multiple keys in a consistent order (sorted by stripe
index) to avoid
+ * deadlocks, then executes the action and returns the result.
+ *
+ * @param keys The keys to lock
+ * @param action Action to run
+ * @param <T> Result type
+ * @param <E> Exception type
+ * @return Action result
+ * @throws E Exception from the action
+ */
+ public <T, E extends Exception> T withMultipleKeyLockAndThrow(
+ List<? extends Object> keys, EntityCache.ThrowingSupplier<T, E> action)
throws E {
+ waitForGlobalComplete();
+ List<Lock> sortedLocks = getDistinctSortedLocks(keys);
+ acquireAllLocks(sortedLocks);
+ try {
+ return action.get();
+ } finally {
+ releaseAllLocks(sortedLocks);
+ }
+ }
+
+ /** Checks if a global operation is currently in progress. */
+ @VisibleForTesting
+ public boolean isClearing() {
+ return globalOperationLatch.get() != null;
+ }
+
+ /**
+ * Returns number of lock segments.
+ *
+ * @return Number of segments
+ */
+ public int getNumSegments() {
+ return stripedLocks.size();
+ }
+
/**
* Executes a global clearing operation with exclusive access to all
segments. This method sets
* the clearing flag and ensures no other operations can proceed until the
clearing is complete.
@@ -202,6 +277,38 @@ public class SegmentedLock {
}
}
+ private List<Lock> getDistinctSortedLocks(List<? extends Object> keys) {
+ return keys.stream()
+ .map(this::getSegmentLock)
+ .distinct()
+ .sorted(Comparator.comparingInt(lockIndices::get))
+ .collect(Collectors.toList());
+ }
+
+ private void acquireAllLocks(List<Lock> locks) {
+ int lockedCount = 0;
+ try {
+ for (Lock lock : locks) {
+ lock.lockInterruptibly();
+ lockedCount++;
+ }
+ } catch (InterruptedException e) {
+ releaseLocks(locks, lockedCount);
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("Thread was interrupted while acquiring
batch lock", e);
+ }
+ }
+
+ private static void releaseLocks(List<Lock> locks, int lockedCount) {
+ for (int i = lockedCount - 1; i >= 0; i--) {
+ locks.get(i).unlock();
+ }
+ }
+
+ private void releaseAllLocks(List<Lock> locks) {
+ releaseLocks(locks, locks.size());
+ }
+
/**
* Waits for any ongoing global operation to complete. This method is called
by regular operations
* to ensure they don't interfere with global operations.
@@ -218,19 +325,4 @@ public class SegmentedLock {
}
}
}
-
- /** Checks if a global operation is currently in progress. */
- @VisibleForTesting
- public boolean isClearing() {
- return globalOperationLatch.get() != null;
- }
-
- /**
- * Returns number of lock segments.
- *
- * @return Number of segments
- */
- public int getNumSegments() {
- return stripedLocks.size();
- }
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index 2f4becd77e..900065f87a 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -38,6 +38,7 @@ import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.HasIdentifier;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.RelationalEntity;
import org.apache.gravitino.SupportsRelationOperations;
import org.apache.gravitino.UnsupportedEntityTypeException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
@@ -350,6 +351,31 @@ public class JDBCBackend implements RelationalBackend {
case JOB_TEMPLATE:
return (List<E>)
JobTemplateMetaService.getInstance().batchGetJobTemplateByIdentifier(identifiers);
+ case USER:
+ // TODO: I will add batch operations for users, groups and views
+ List<E> users = Lists.newArrayList();
+ for (NameIdentifier identifier : identifiers) {
+ users.add((E)
UserMetaService.getInstance().getUserByIdentifier(identifier));
+ }
+ return users;
+ case GROUP:
+ List<E> groups = Lists.newArrayList();
+ for (NameIdentifier identifier : identifiers) {
+ groups.add((E)
GroupMetaService.getInstance().getGroupByIdentifier(identifier));
+ }
+ return groups;
+ case ROLE:
+ List<E> roles = Lists.newArrayList();
+ for (NameIdentifier identifier : identifiers) {
+ roles.add((E)
RoleMetaService.getInstance().getRoleByIdentifier(identifier));
+ }
+ return roles;
+ case VIEW:
+ List<E> views = Lists.newArrayList();
+ for (NameIdentifier identifier : identifiers) {
+ views.add((E)
ViewMetaService.getInstance().getViewByIdentifier(identifier));
+ }
+ return views;
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for batch get operation", entityType);
@@ -669,6 +695,19 @@ public class JDBCBackend implements RelationalBackend {
}
}
+ @Override
+ public List<RelationalEntity<?>> batchListEntitiesByRelation(
+ Type relType, List<NameIdentifier> nameIdentifiers, Entity.EntityType
identType)
+ throws IOException {
+ switch (relType) {
+ case OWNER_REL:
+ return OwnerMetaService.getInstance().batchGetOwner(nameIdentifiers,
identType);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Doesn't support the relation type %s", relType));
+ }
+ }
+
@Override
public void insertRelation(
SupportsRelationOperations.Type relType,
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
index 31183f2649..9fdd2c7886 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
@@ -24,7 +24,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import org.apache.commons.lang3.tuple.Pair;
@@ -36,10 +38,12 @@ import org.apache.gravitino.EntityStore;
import org.apache.gravitino.HasIdentifier;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.RelationalEntity;
import org.apache.gravitino.SupportsRelationOperations;
import org.apache.gravitino.cache.CacheFactory;
import org.apache.gravitino.cache.CachedEntityIdResolver;
import org.apache.gravitino.cache.EntityCache;
+import org.apache.gravitino.cache.EntityCacheKey;
import org.apache.gravitino.cache.EntityCacheRelationKey;
import org.apache.gravitino.cache.NoOpsCache;
import org.apache.gravitino.exceptions.NoSuchEntityException;
@@ -225,6 +229,47 @@ public class RelationalEntityStore implements EntityStore,
SupportsRelationOpera
});
}
+ @Override
+ public List<RelationalEntity<?>> batchListEntitiesByRelation(
+ Type relType, List<NameIdentifier> nameIdentifiers, Entity.EntityType
identType)
+ throws IOException {
+ if (nameIdentifiers == null || nameIdentifiers.isEmpty()) {
+ return new ArrayList<>();
+ }
+
+ List<EntityCacheKey> lockKeys = new ArrayList<>();
+ for (NameIdentifier id : nameIdentifiers) {
+ lockKeys.add(EntityCacheRelationKey.of(id, identType, relType));
+ }
+
+ return cache.withMultipleKeyCacheLock(
+ lockKeys,
+ () -> {
+ List<RelationalEntity<?>> result = new ArrayList<>();
+ List<NameIdentifier> uncachedIdentifiers = new ArrayList<>();
+
+ for (NameIdentifier nameIdentifier : nameIdentifiers) {
+ Optional<List<RelationalEntity<?>>> cachedRelations =
+ getCachedRelations(relType, nameIdentifier, identType);
+ if (cachedRelations.isPresent()) {
+ result.addAll(cachedRelations.get());
+ } else {
+ uncachedIdentifiers.add(nameIdentifier);
+ }
+ }
+
+ if (!uncachedIdentifiers.isEmpty()) {
+ List<RelationalEntity<?>> backendRelations =
+ backend.batchListEntitiesByRelation(relType,
uncachedIdentifiers, identType);
+ result.addAll(backendRelations);
+
+ batchPopulateRelationCache(relType, identType,
uncachedIdentifiers, backendRelations);
+ }
+
+ return result;
+ });
+ }
+
@Override
public <E extends Entity & HasIdentifier> E getEntityByRelation(
Type relType,
@@ -318,4 +363,44 @@ public class RelationalEntityStore implements EntityStore,
SupportsRelationOpera
throws IOException, EntityAlreadyExistsException {
backend.batchPut(entities, overwritten);
}
+
+ private <E extends Entity & HasIdentifier>
Optional<List<RelationalEntity<?>>> getCachedRelations(
+ SupportsRelationOperations.Type relType,
+ NameIdentifier nameIdentifier,
+ Entity.EntityType identType) {
+ Optional<List<E>> entitiesOpt = cache.getIfPresent(relType,
nameIdentifier, identType);
+ if (entitiesOpt.isPresent()) {
+ List<RelationalEntity<?>> cachedRelations = new ArrayList<>();
+ for (E entity : entitiesOpt.get()) {
+ cachedRelations.add(new RelationalEntity<>(relType, nameIdentifier,
identType, entity));
+ }
+ return Optional.of(cachedRelations);
+ }
+ return Optional.empty();
+ }
+
+ private <E extends Entity & HasIdentifier> void batchPopulateRelationCache(
+ SupportsRelationOperations.Type relType,
+ Entity.EntityType identType,
+ List<NameIdentifier> uncachedIdentifiers,
+ List<RelationalEntity<?>> backendRelations) {
+ Map<NameIdentifier, List<RelationalEntity<?>>> relationsBySource = new
HashMap<>();
+ for (RelationalEntity<?> relation : backendRelations) {
+ relationsBySource.computeIfAbsent(relation.source(), k -> new
ArrayList<>()).add(relation);
+ }
+
+ for (NameIdentifier sourceId : uncachedIdentifiers) {
+ List<RelationalEntity<?>> sourceRelations =
relationsBySource.get(sourceId);
+ List<E> entityList = new ArrayList<>();
+ if (sourceRelations != null) {
+ for (RelationalEntity<?> rel : sourceRelations) {
+ @SuppressWarnings("unchecked")
+ E entity = (E) rel.targetEntity();
+ entityList.add(entity);
+ }
+ }
+
+ cache.put(sourceId, identType, relType, entityList);
+ }
+ }
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaMapper.java
index 455d78fb59..17247163e2 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaMapper.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaMapper.java
@@ -18,8 +18,10 @@
*/
package org.apache.gravitino.storage.relational.mapper;
+import java.util.List;
import org.apache.gravitino.storage.relational.po.GroupPO;
import org.apache.gravitino.storage.relational.po.OwnerRelPO;
+import org.apache.gravitino.storage.relational.po.UserOwnerRelPO;
import org.apache.gravitino.storage.relational.po.UserPO;
import org.apache.ibatis.annotations.InsertProvider;
import org.apache.ibatis.annotations.Param;
@@ -45,6 +47,13 @@ public interface OwnerMetaMapper {
@Param("metadataObjectId") Long metadataObjectId,
@Param("metadataObjectType") String metadataObjectType);
+ @SelectProvider(
+ type = OwnerMetaSQLProviderFactory.class,
+ method = "batchSelectUserOwnerMetaByMetadataObjectIdAndType")
+ List<UserOwnerRelPO> batchSelectUserOwnerMetaByMetadataObjectIdAndType(
+ @Param("metadataObjectIds") List<Long> metadataObjectIds,
+ @Param("metadataObjectType") String metadataObjectType);
+
@SelectProvider(
type = OwnerMetaSQLProviderFactory.class,
method = "selectGroupOwnerMetaByMetadataObjectIdAndType")
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java
index b7b528a300..0dfcdc5bed 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.storage.relational.mapper;
import com.google.common.collect.ImmutableMap;
+import java.util.List;
import java.util.Map;
import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
import
org.apache.gravitino.storage.relational.mapper.provider.base.OwnerMetaBaseSQLProvider;
@@ -96,4 +97,11 @@ public class OwnerMetaSQLProviderFactory {
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
return getProvider().deleteOwnerMetasByLegacyTimeline(legacyTimeline,
limit);
}
+
+ public static String batchSelectUserOwnerMetaByMetadataObjectIdAndType(
+ @Param("metadataObjectIds") List<Long> metadataObjectIds,
+ @Param("metadataObjectType") String metadataObjectType) {
+ return getProvider()
+ .batchSelectUserOwnerMetaByMetadataObjectIdAndType(metadataObjectIds,
metadataObjectType);
+ }
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java
index e300c5bb44..ece7bfc22a 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java
@@ -20,6 +20,7 @@ package
org.apache.gravitino.storage.relational.mapper.provider.base;
import static
org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper.OWNER_TABLE_NAME;
+import java.util.List;
import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
@@ -53,6 +54,34 @@ public class OwnerMetaBaseSQLProvider {
+ " ot.deleted_at = 0 AND ut.deleted_at = 0";
}
+ public String batchSelectUserOwnerMetaByMetadataObjectIdAndType(
+ @Param("metadataObjectIds") List<Long> metadataObjectIds,
+ @Param("metadataObjectType") String metadataObjectType) {
+ return "<script>"
+ + "SELECT ot.metadata_object_id as metadataObjectId,"
+ + "ut.user_id as userId, "
+ + "ut.user_name as userName, "
+ + "ut.metalake_id as metalakeId, "
+ + "ut.audit_info as auditInfo, "
+ + "ut.current_version as currentVersion, "
+ + "ut.last_version as lastVersion, "
+ + "ut.deleted_at as deletedAt "
+ + "FROM "
+ + OWNER_TABLE_NAME
+ + " ot LEFT JOIN "
+ + UserMetaMapper.USER_TABLE_NAME
+ + " ut ON ut.user_id = ot.owner_id "
+ + "WHERE "
+ + "ot.metadata_object_type = #{metadataObjectType} "
+ + "AND ot.owner_type = 'USER' "
+ + "AND ot.metadata_object_id IN "
+ + "<foreach collection='metadataObjectIds' item='itemId' open='('
separator=',' close=')'>"
+ + "#{itemId}"
+ + "</foreach> "
+ + "AND ot.deleted_at = 0 AND ut.deleted_at = 0 "
+ + "</script>";
+ }
+
public String selectGroupOwnerMetaByMetadataObjectIdAndType(
@Param("metadataObjectId") Long metadataObjectId,
@Param("metadataObjectType") String metadataObjectType) {
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/po/UserOwnerRelPO.java
b/core/src/main/java/org/apache/gravitino/storage/relational/po/UserOwnerRelPO.java
new file mode 100644
index 0000000000..969b49c1c1
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/po/UserOwnerRelPO.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.storage.relational.po;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class UserOwnerRelPO extends UserPO {
+
+ private Long metadataObjectId;
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
index a68b2ebb3e..2a209e654f 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
@@ -20,15 +20,27 @@ package org.apache.gravitino.storage.relational.service;
import static
org.apache.gravitino.metrics.source.MetricsSource.GRAVITINO_RELATIONAL_STORE_METRIC_NAME;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.gravitino.Entity;
import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.RelationalEntity;
+import org.apache.gravitino.SupportsRelationOperations;
import org.apache.gravitino.authorization.AuthorizationUtils;
+import org.apache.gravitino.meta.UserEntity;
import org.apache.gravitino.metrics.Monitored;
import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
import org.apache.gravitino.storage.relational.po.GroupPO;
import org.apache.gravitino.storage.relational.po.OwnerRelPO;
+import org.apache.gravitino.storage.relational.po.UserOwnerRelPO;
import org.apache.gravitino.storage.relational.po.UserPO;
import org.apache.gravitino.storage.relational.utils.POConverters;
import org.apache.gravitino.storage.relational.utils.SessionUtils;
@@ -79,6 +91,59 @@ public class OwnerMetaService {
return Optional.empty();
}
+ @Monitored(
+ metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+ baseMetricName = "batchGetOwner")
+ public List<RelationalEntity<?>> batchGetOwner(
+ List<NameIdentifier> identifiers, Entity.EntityType type) {
+ if (CollectionUtils.isEmpty(identifiers)) {
+ return new ArrayList<>();
+ }
+ String metalake = NameIdentifierUtil.getMetalake(identifiers.get(0));
+ for (NameIdentifier identifier : identifiers) {
+ Preconditions.checkArgument(
+ Objects.equals(NameIdentifierUtil.getMetalake(identifier), metalake),
+ "identifiers should in one metalake");
+ }
+ List<RelationalEntity<?>> result = new ArrayList<>();
+ Map<Long, NameIdentifier> nameIdentifierMap = new HashMap<>();
+ List<Long> entityIds =
+ identifiers.stream()
+ .map(
+ identifier -> {
+ long entityId = EntityIdService.getEntityId(identifier,
type);
+ nameIdentifierMap.put(entityId, identifier);
+ return entityId;
+ })
+ .collect(Collectors.toList());
+
+ // Get user owners
+ List<UserOwnerRelPO> userPOList =
+ SessionUtils.getWithoutCommit(
+ OwnerMetaMapper.class,
+ mapper ->
+
mapper.batchSelectUserOwnerMetaByMetadataObjectIdAndType(entityIds,
type.name()));
+ if (CollectionUtils.isNotEmpty(userPOList)) {
+ userPOList.forEach(
+ userPO -> {
+ UserEntity userEntity =
+ POConverters.fromUserPO(
+ userPO, Collections.emptyList(),
AuthorizationUtils.ofUserNamespace(metalake));
+ result.add(
+ new RelationalEntity<>(
+ SupportsRelationOperations.Type.OWNER_REL,
+ nameIdentifierMap.get(userPO.getMetadataObjectId()),
+ type,
+ userEntity));
+ });
+ }
+
+ // TODO: Add batch support for group owners when GroupOwnerRelPO and batch
method are available
+ // For now, we only handle user owners in batch mode
+
+ return result;
+ }
+
@Monitored(metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
baseMetricName = "setOwner")
public void setOwner(
NameIdentifier entity,
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java
index 8f42bb55c6..8d1ad4ca93 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java
@@ -23,8 +23,15 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.RelationalEntity;
+import org.apache.gravitino.SupportsRelationOperations;
import org.apache.gravitino.authorization.AuthorizationUtils;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.BaseMetalake;
@@ -491,6 +498,179 @@ class TestOwnerMetaService extends TestJDBCBackend {
Assertions.assertEquals(0, countActiveOwnerRel(user.id()));
}
+ @TestTemplate
+ void testBatchGetOwnerWithUserOwners() throws IOException {
+ createAndInsertMakeLake(METALAKE_NAME);
+ CatalogEntity catalog = createAndInsertCatalog(METALAKE_NAME,
CATALOG_NAME);
+ SchemaEntity schema = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME,
SCHEMA_NAME);
+ TableEntity table =
+ createTableEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME),
+ TABLE_NAME,
+ AUDIT_INFO);
+ backend.insert(table, false);
+ TopicEntity topic =
+ createTopicEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME),
+ TOPIC_NAME,
+ AUDIT_INFO);
+ backend.insert(topic, false);
+
+ UserEntity user =
+ createUserEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ AuthorizationUtils.ofUserNamespace(METALAKE_NAME),
+ "user",
+ AUDIT_INFO);
+ backend.insert(user, false);
+
+ OwnerMetaService.getInstance()
+ .setOwner(catalog.nameIdentifier(), catalog.type(),
user.nameIdentifier(), user.type());
+ OwnerMetaService.getInstance()
+ .setOwner(schema.nameIdentifier(), schema.type(),
user.nameIdentifier(), user.type());
+ OwnerMetaService.getInstance()
+ .setOwner(table.nameIdentifier(), table.type(), user.nameIdentifier(),
user.type());
+ OwnerMetaService.getInstance()
+ .setOwner(topic.nameIdentifier(), topic.type(), user.nameIdentifier(),
user.type());
+
+ List<NameIdentifier> identifiers =
+ List.of(
+ catalog.nameIdentifier(),
+ schema.nameIdentifier(),
+ table.nameIdentifier(),
+ topic.nameIdentifier());
+
+ List<RelationalEntity<?>> relations =
+ OwnerMetaService.getInstance().batchGetOwner(identifiers,
Entity.EntityType.CATALOG);
+ Assertions.assertEquals(1, relations.size());
+
+ // catalog is CATALOG type; schema/table/topic share the same owner but
different types.
+ // batchGetOwner queries by a single identType, so query per type.
+ List<RelationalEntity<?>> catalogRelations =
+ OwnerMetaService.getInstance()
+ .batchGetOwner(List.of(catalog.nameIdentifier()),
Entity.EntityType.CATALOG);
+ List<RelationalEntity<?>> schemaRelations =
+ OwnerMetaService.getInstance()
+ .batchGetOwner(List.of(schema.nameIdentifier()),
Entity.EntityType.SCHEMA);
+ List<RelationalEntity<?>> tableRelations =
+ OwnerMetaService.getInstance()
+ .batchGetOwner(List.of(table.nameIdentifier()),
Entity.EntityType.TABLE);
+ List<RelationalEntity<?>> topicRelations =
+ OwnerMetaService.getInstance()
+ .batchGetOwner(List.of(topic.nameIdentifier()),
Entity.EntityType.TOPIC);
+
+ Assertions.assertEquals(1, catalogRelations.size());
+ Assertions.assertEquals(catalog.nameIdentifier(),
catalogRelations.get(0).source());
+ Assertions.assertEquals(Entity.EntityType.CATALOG,
catalogRelations.get(0).sourceType());
+ Assertions.assertEquals(Entity.EntityType.USER,
catalogRelations.get(0).targetEntity().type());
+ Assertions.assertEquals(
+ SupportsRelationOperations.Type.OWNER_REL,
catalogRelations.get(0).type());
+
+ Assertions.assertEquals(1, schemaRelations.size());
+ Assertions.assertEquals(schema.nameIdentifier(),
schemaRelations.get(0).source());
+
+ Assertions.assertEquals(1, tableRelations.size());
+ Assertions.assertEquals(table.nameIdentifier(),
tableRelations.get(0).source());
+
+ Assertions.assertEquals(1, topicRelations.size());
+ Assertions.assertEquals(topic.nameIdentifier(),
topicRelations.get(0).source());
+
+ // All targets should be the same user
+ List<RelationalEntity<?>> allRelations =
+ List.of(
+ catalogRelations.get(0),
+ schemaRelations.get(0),
+ tableRelations.get(0),
+ topicRelations.get(0));
+ for (RelationalEntity<?> rel : allRelations) {
+ Assertions.assertEquals(user.nameIdentifier(),
rel.targetEntity().nameIdentifier());
+ }
+ }
+
+ @TestTemplate
+ void testBatchGetOwnerWithMultipleEntitiesOfSameType() throws IOException {
+ createAndInsertMakeLake(METALAKE_NAME);
+ createAndInsertCatalog(METALAKE_NAME, CATALOG_NAME);
+ SchemaEntity schema1 = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME,
SCHEMA_NAME);
+ SchemaEntity schema2 = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME,
SCHEMA_NAME + "_2");
+ SchemaEntity schema3 = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME,
SCHEMA_NAME + "_3");
+
+ UserEntity user1 =
+ createUserEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ AuthorizationUtils.ofUserNamespace(METALAKE_NAME),
+ "user1",
+ AUDIT_INFO);
+ backend.insert(user1, false);
+ UserEntity user2 =
+ createUserEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ AuthorizationUtils.ofUserNamespace(METALAKE_NAME),
+ "user2",
+ AUDIT_INFO);
+ backend.insert(user2, false);
+
+ // schema1 -> user1, schema2 -> user2, schema3 has no owner
+ OwnerMetaService.getInstance()
+ .setOwner(schema1.nameIdentifier(), schema1.type(),
user1.nameIdentifier(), user1.type());
+ OwnerMetaService.getInstance()
+ .setOwner(schema2.nameIdentifier(), schema2.type(),
user2.nameIdentifier(), user2.type());
+
+ List<RelationalEntity<?>> relations =
+ OwnerMetaService.getInstance()
+ .batchGetOwner(
+ List.of(
+ schema1.nameIdentifier(), schema2.nameIdentifier(),
schema3.nameIdentifier()),
+ Entity.EntityType.SCHEMA);
+
+ Assertions.assertEquals(2, relations.size());
+
+ Map<NameIdentifier, NameIdentifier> sourceToTarget =
+ relations.stream()
+ .collect(
+ Collectors.toMap(RelationalEntity::source, r ->
r.targetEntity().nameIdentifier()));
+
+ Assertions.assertEquals(user1.nameIdentifier(),
sourceToTarget.get(schema1.nameIdentifier()));
+ Assertions.assertEquals(user2.nameIdentifier(),
sourceToTarget.get(schema2.nameIdentifier()));
+ // schema3 has no owner — not present in results
+
Assertions.assertFalse(sourceToTarget.containsKey(schema3.nameIdentifier()));
+
+ // Verify RelationalEntity metadata
+ for (RelationalEntity<?> rel : relations) {
+ Assertions.assertEquals(SupportsRelationOperations.Type.OWNER_REL,
rel.type());
+ Assertions.assertEquals(Entity.EntityType.SCHEMA, rel.sourceType());
+ Assertions.assertEquals(Entity.EntityType.USER,
rel.targetEntity().type());
+ }
+ }
+
+ @TestTemplate
+ void testBatchGetOwnerWithEmptyInput() {
+ List<RelationalEntity<?>> relations =
+ OwnerMetaService.getInstance()
+ .batchGetOwner(Collections.emptyList(), Entity.EntityType.TABLE);
+ Assertions.assertNotNull(relations);
+ Assertions.assertTrue(relations.isEmpty());
+ }
+
+ @TestTemplate
+ void testBatchGetOwnerNoneHaveOwners() throws IOException {
+ createAndInsertMakeLake(METALAKE_NAME);
+ createAndInsertCatalog(METALAKE_NAME, CATALOG_NAME);
+ SchemaEntity schema1 = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME,
SCHEMA_NAME);
+ SchemaEntity schema2 = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME,
SCHEMA_NAME + "_2");
+
+ List<RelationalEntity<?>> relations =
+ OwnerMetaService.getInstance()
+ .batchGetOwner(
+ List.of(schema1.nameIdentifier(), schema2.nameIdentifier()),
+ Entity.EntityType.SCHEMA);
+
+ Assertions.assertNotNull(relations);
+ Assertions.assertTrue(relations.isEmpty());
+ }
+
private Integer countAllOwnerRel(Long ownerId) {
try (SqlSession sqlSession =
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
diff --git
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
index 5a0f1ee8bf..b63251fd96 100644
---
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
+++
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
@@ -33,11 +33,13 @@ import java.util.function.Function;
import org.apache.gravitino.Config;
import org.apache.gravitino.Configs;
import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.Metalake;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SupportsRelationOperations;
import org.apache.gravitino.authorization.AuthorizationRequestContext;
import org.apache.gravitino.authorization.GravitinoAuthorizer;
import org.apache.gravitino.dto.tag.MetadataObjectDTO;
@@ -166,6 +168,7 @@ public class MetadataAuthzHelper {
Entity.EntityType entityType,
NameIdentifier[] nameIdentifiers) {
preloadToCache(entityType, nameIdentifiers);
+ preloadOwner(entityType, nameIdentifiers);
return filterByExpression(metalake, expression, entityType,
nameIdentifiers, e -> e);
}
@@ -390,4 +393,21 @@ public class MetadataAuthzHelper {
entityType,
EntityClassMapper.getEntityClass(entityType));
}
+
+ private static void preloadOwner(Entity.EntityType entityType,
NameIdentifier[] nameIdentifiers) {
+ if (!GravitinoEnv.getInstance().cacheEnabled()) {
+ return;
+ }
+ EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
+ try {
+ entityStore
+ .relationOperations()
+ .batchListEntitiesByRelation(
+ SupportsRelationOperations.Type.OWNER_REL,
+ Arrays.stream(nameIdentifiers).toList(),
+ entityType);
+ } catch (Exception e) {
+ LOG.warn("Ignore preloadOwner error:{}", e.getMessage(), e);
+ }
+ }
}