This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new ca7faaede2 [#7270] feat(core): Support Statistic manager (#7734)
ca7faaede2 is described below
commit ca7faaede2670d5ccf1c8ff6293125f4b953cf64
Author: roryqi <[email protected]>
AuthorDate: Fri Aug 15 19:04:39 2025 +0800
[#7270] feat(core): Support Statistic manager (#7734)
### What changes were proposed in this pull request?
Support Statistic manager
### Why are the changes needed?
Fix: #7270
### Does this PR introduce _any_ user-facing change?
I will add the document later.
### How was this patch tested?
Add UT.
---
.../stats/SupportsPartitionStatistics.java | 4 +-
.../apache/gravitino/stats/SupportsStatistics.java | 3 +-
.../src/main/java/org/apache/gravitino/Entity.java | 2 +-
.../java/org/apache/gravitino/EntityStore.java | 26 ++
.../java/org/apache/gravitino/GravitinoEnv.java | 7 +
.../gravitino/SupportsRelationOperations.java | 2 +-
.../org/apache/gravitino/meta/StatisticEntity.java | 95 ++++---
.../gravitino/meta/TableStatisticEntity.java | 43 +++
.../apache/gravitino/stats/StatisticManager.java | 213 +++++++++++++++
.../gravitino/storage/relational/JDBCBackend.java | 84 +++++-
.../storage/relational/RelationalBackend.java | 24 ++
.../storage/relational/RelationalEntityStore.java | 14 +
.../storage/relational/po/StatisticPO.java | 4 +-
.../relational/service/StatisticMetaService.java | 7 +-
.../gravitino/stats/TestStatisticManager.java | 287 +++++++++++++++++++++
.../storage/memory/TestMemoryEntityStore.java | 14 +
.../service/TestStatisticMetaService.java | 31 +--
.../storage/relational/utils/TestPOConverters.java | 5 +-
18 files changed, 799 insertions(+), 66 deletions(-)
diff --git
a/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
b/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
index 12eac9e4cc..8d25aba2c2 100644
---
a/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
+++
b/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
@@ -43,11 +43,9 @@ public interface SupportsPartitionStatistics {
* @param statisticsToUpdate a list of PartitionUpdateStatistics, where each
* PartitionStatisticsUpdate contains the partition name and a map of
statistic names to their
* values to be updated.
- * @return a list of updated PartitionStatistics, where each
PartitionStatistics contains the
- * partition name and a list of statistics applicable to that partition.
* @throws UnmodifiableStatisticException if any of the statistics to be
updated are unmodifiable
*/
- List<PartitionStatistics> updateStatistics(List<PartitionStatisticsUpdate>
statisticsToUpdate)
+ void updateStatistics(List<PartitionStatisticsUpdate> statisticsToUpdate)
throws UnmodifiableStatisticException;
/**
diff --git
a/api/src/main/java/org/apache/gravitino/stats/SupportsStatistics.java
b/api/src/main/java/org/apache/gravitino/stats/SupportsStatistics.java
index dd8fa8a553..5923b4a089 100644
--- a/api/src/main/java/org/apache/gravitino/stats/SupportsStatistics.java
+++ b/api/src/main/java/org/apache/gravitino/stats/SupportsStatistics.java
@@ -45,9 +45,8 @@ public interface SupportsStatistics {
* illegal, it will throw an IllegalStatisticNameException.
*
* @param statistics a map of statistic names to their values
- * @return a list of updated statistics
*/
- List<Statistic> updateStatistics(Map<String, StatisticValue<?>> statistics)
+ void updateStatistics(Map<String, StatisticValue<?>> statistics)
throws UnmodifiableStatisticException, IllegalStatisticNameException;
/**
diff --git a/core/src/main/java/org/apache/gravitino/Entity.java
b/core/src/main/java/org/apache/gravitino/Entity.java
index d08ddd0011..fc575b72b1 100644
--- a/core/src/main/java/org/apache/gravitino/Entity.java
+++ b/core/src/main/java/org/apache/gravitino/Entity.java
@@ -77,7 +77,7 @@ public interface Entity extends Serializable {
MODEL,
MODEL_VERSION,
POLICY,
- STATISTIC,
+ TABLE_STATISTIC,
JOB_TEMPLATE,
JOB,
AUDIT;
diff --git a/core/src/main/java/org/apache/gravitino/EntityStore.java
b/core/src/main/java/org/apache/gravitino/EntityStore.java
index 2993cefe98..79f5e12e26 100644
--- a/core/src/main/java/org/apache/gravitino/EntityStore.java
+++ b/core/src/main/java/org/apache/gravitino/EntityStore.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.gravitino.Entity.EntityType;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.tag.SupportsTagOperations;
@@ -190,6 +191,31 @@ public interface EntityStore extends Closeable {
*/
boolean delete(NameIdentifier ident, EntityType entityType, boolean cascade)
throws IOException;
+ /**
+ * Batch delete entities from the underlying storage by the specified list
of {@link
+ * org.apache.gravitino.NameIdentifier} and {@link EntityType}.
+ *
+ * @param entitiesToDelete the list of pairs of name identifiers and entity
types to be deleted
+ * @param cascade if true, cascade delete the entities, otherwise just
delete the entities
+ * @return the number of entities deleted
+ * @throws IOException if the batch delete operation fails
+ */
+ int batchDelete(List<Pair<NameIdentifier, EntityType>> entitiesToDelete,
boolean cascade)
+ throws IOException;
+
+ /**
+ * Batch put entities into the underlying storage.
+ *
+ * @param entities the list of entities to be stored
+ * @param overwritten if true, overwrite the existing entities, otherwise
throw an
+ * @param <E> the type of the entities
+ * @throws IOException if the batch put operation fails
+ * @throws EntityAlreadyExistsException if the entity already exists and the
overwritten flag is
+ * false
+ */
+ <E extends Entity & HasIdentifier> void batchPut(List<E> entities, boolean
overwritten)
+ throws IOException, EntityAlreadyExistsException;
+
/**
* Execute the specified {@link Executable} in a transaction.
*
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index cdc90ee63d..591542a36d 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -80,6 +80,7 @@ import org.apache.gravitino.metrics.MetricsSystem;
import org.apache.gravitino.metrics.source.JVMMetricsSource;
import org.apache.gravitino.policy.PolicyDispatcher;
import org.apache.gravitino.policy.PolicyManager;
+import org.apache.gravitino.stats.StatisticManager;
import org.apache.gravitino.storage.IdGenerator;
import org.apache.gravitino.storage.RandomIdGenerator;
import org.apache.gravitino.tag.TagDispatcher;
@@ -146,6 +147,7 @@ public class GravitinoEnv {
private OwnerDispatcher ownerDispatcher;
private FutureGrantManager futureGrantManager;
private GravitinoAuthorizer gravitinoAuthorizer;
+ private StatisticManager statisticManager;
protected GravitinoEnv() {}
@@ -413,6 +415,10 @@ public class GravitinoEnv {
return jobOperationDispatcher;
}
+ public StatisticManager statisticManager() {
+ return statisticManager;
+ }
+
public void start() {
metricsSystem.start();
eventListenerManager.start();
@@ -557,6 +563,7 @@ public class GravitinoEnv {
ModelNormalizeDispatcher modelNormalizeDispatcher =
new ModelNormalizeDispatcher(modelHookDispatcher, catalogManager);
this.modelDispatcher = new ModelEventDispatcher(eventBus,
modelNormalizeDispatcher);
+ this.statisticManager = new StatisticManager(entityStore, idGenerator);
// Create and initialize access control related modules
boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
diff --git
a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
index f64b60123d..5add56c640 100644
--- a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
+++ b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
@@ -39,7 +39,7 @@ public interface SupportsRelationOperations {
/** Role and group relationship */
ROLE_GROUP_REL,
/** Policy and metadata object relationship */
- POLICY_METADATA_OBJECT_REL,
+ POLICY_METADATA_OBJECT_REL
}
/**
diff --git a/core/src/main/java/org/apache/gravitino/meta/StatisticEntity.java
b/core/src/main/java/org/apache/gravitino/meta/StatisticEntity.java
index 284b347a73..546d2401f7 100644
--- a/core/src/main/java/org/apache/gravitino/meta/StatisticEntity.java
+++ b/core/src/main/java/org/apache/gravitino/meta/StatisticEntity.java
@@ -25,10 +25,11 @@ import org.apache.gravitino.Auditable;
import org.apache.gravitino.Entity;
import org.apache.gravitino.Field;
import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.stats.StatisticValue;
-public class StatisticEntity implements Entity, HasIdentifier, Auditable {
+public abstract class StatisticEntity implements Entity, HasIdentifier,
Auditable {
public static final Field ID =
Field.required("id", Long.class, "The unique identifier of the statistic
entity.");
public static final Field NAME =
@@ -38,11 +39,11 @@ public class StatisticEntity implements Entity,
HasIdentifier, Auditable {
public static final Field AUDIT_INFO =
Field.required("audit_info", Audit.class, "The audit details of the
statistic entity.");
- private Long id;
- private String name;
- private StatisticValue<?> value;
- private AuditInfo auditInfo;
- private Namespace namespace;
+ protected Long id;
+ protected String name;
+ protected StatisticValue<?> value;
+ protected AuditInfo auditInfo;
+ protected Namespace namespace;
@Override
public Audit auditInfo() {
@@ -59,11 +60,6 @@ public class StatisticEntity implements Entity,
HasIdentifier, Auditable {
return fields;
}
- @Override
- public EntityType type() {
- return EntityType.STATISTIC;
- }
-
@Override
public String name() {
return name;
@@ -83,45 +79,74 @@ public class StatisticEntity implements Entity,
HasIdentifier, Auditable {
return value;
}
- public static Builder builder() {
- return new Builder();
+ public static EntityType getStatisticType(MetadataObject.Type type) {
+ switch (type) {
+ case TABLE:
+ return EntityType.TABLE_STATISTIC;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported metadata object type for statistics: " + type);
+ }
}
- public static class Builder {
- private final StatisticEntity statisticEntity;
+ public static <S extends StatisticEntityBuilder<S, E>, E extends
StatisticEntity> S builder(
+ Entity.EntityType type) {
+ switch (type) {
+ case TABLE_STATISTIC:
+ return (S) TableStatisticEntity.builder();
+ default:
+ throw new IllegalArgumentException("Unsupported statistic entity type:
" + type);
+ }
+ }
- private Builder() {
- statisticEntity = new StatisticEntity();
+ interface Builder<SELF extends Builder<SELF, T>, T extends StatisticEntity> {
+ T build();
+ }
+
+ public abstract static class StatisticEntityBuilder<
+ SELF extends Builder<SELF, T>, T extends StatisticEntity>
+ implements Builder<SELF, T> {
+
+ protected Long id;
+ protected String name;
+ protected StatisticValue<?> value;
+ protected AuditInfo auditInfo;
+ protected Namespace namespace;
+
+ public SELF withId(Long id) {
+ this.id = id;
+ return self();
}
- public Builder withId(Long id) {
- statisticEntity.id = id;
- return this;
+ public SELF withName(String name) {
+ this.name = name;
+ return self();
}
- public Builder withName(String name) {
- statisticEntity.name = name;
- return this;
+ public SELF withValue(StatisticValue<?> value) {
+ this.value = value;
+ return self();
}
- public Builder withValue(StatisticValue<?> value) {
- statisticEntity.value = value;
- return this;
+ public SELF withAuditInfo(AuditInfo auditInfo) {
+ this.auditInfo = auditInfo;
+ return self();
}
- public Builder withAuditInfo(AuditInfo auditInfo) {
- statisticEntity.auditInfo = auditInfo;
- return this;
+ public SELF withNamespace(Namespace namespace) {
+ this.namespace = namespace;
+ return self();
}
- public Builder withNamespace(Namespace namespace) {
- statisticEntity.namespace = namespace;
- return this;
+ private SELF self() {
+ return (SELF) this;
}
- public StatisticEntity build() {
- statisticEntity.validate();
- return statisticEntity;
+ protected abstract T internalBuild();
+
+ public T build() {
+ T t = internalBuild();
+ return t;
}
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/meta/TableStatisticEntity.java
b/core/src/main/java/org/apache/gravitino/meta/TableStatisticEntity.java
new file mode 100644
index 0000000000..04c25403f8
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/meta/TableStatisticEntity.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.meta;
+
+public class TableStatisticEntity extends StatisticEntity {
+ @Override
+ public EntityType type() {
+ return EntityType.TABLE_STATISTIC;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder extends StatisticEntityBuilder<Builder,
TableStatisticEntity> {
+ @Override
+ protected TableStatisticEntity internalBuild() {
+ TableStatisticEntity entity = new TableStatisticEntity();
+ entity.id = id;
+ entity.name = name;
+ entity.value = value;
+ entity.auditInfo = auditInfo;
+ entity.namespace = namespace;
+ return entity;
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
new file mode 100644
index 0000000000..c1fc9bc892
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
+import org.apache.gravitino.exceptions.UnmodifiableStatisticException;
+import org.apache.gravitino.lock.LockType;
+import org.apache.gravitino.lock.TreeLockUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.StatisticEntity;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.utils.Executable;
+import org.apache.gravitino.utils.MetadataObjectUtil;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StatisticManager {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StatisticManager.class);
+
+ private final EntityStore store;
+
+ private final IdGenerator idGenerator;
+
+ public StatisticManager(EntityStore store, IdGenerator idGenerator) {
+ this.store = store;
+ this.idGenerator = idGenerator;
+ }
+
+ public List<Statistic> listStatistics(String metalake, MetadataObject
metadataObject) {
+ try {
+ NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake,
metadataObject);
+ Entity.EntityType type =
StatisticEntity.getStatisticType(metadataObject.type());
+ return TreeLockUtils.doWithTreeLock(
+ identifier,
+ LockType.READ,
+ () ->
+ store.list(Namespace.fromString(identifier.toString()),
StatisticEntity.class, type)
+ .stream()
+ .map(
+ entity -> {
+ String name = entity.name();
+ StatisticValue<?> value = entity.value();
+ return new CustomStatistic(name, value);
+ })
+ .collect(Collectors.toList()));
+ } catch (NoSuchEntityException nse) {
+ LOG.warn(
+ "Failed to list statistics for metadata object {} in the metalake
{}: {}",
+ metadataObject.fullName(),
+ metalake,
+ nse.getMessage());
+ throw new NoSuchMetadataObjectException(
+ "The metadata object %s in the metalake %s isn't found",
+ metadataObject.fullName(), metalake);
+ } catch (IOException ioe) {
+ LOG.error(
+ "Failed to list statistics for metadata object {} in the metalake {}
: {}",
+ metadataObject.fullName(),
+ metalake,
+ ioe.getMessage());
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ public void updateStatistics(
+ String metalake, MetadataObject metadataObject, Map<String,
StatisticValue<?>> statistics) {
+ try {
+ NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake,
metadataObject);
+ List<StatisticEntity> statisticEntities = Lists.newArrayList();
+ for (Map.Entry<String, StatisticValue<?>> entry : statistics.entrySet())
{
+ String name = entry.getKey();
+ StatisticValue<?> value = entry.getValue();
+
+ StatisticEntity statistic =
+
StatisticEntity.builder(StatisticEntity.getStatisticType(metadataObject.type()))
+ .withId(idGenerator.nextId())
+ .withName(name)
+ .withValue(value)
+ .withNamespace(Namespace.fromString(identifier.toString()))
+ .withAuditInfo(
+ AuditInfo.builder()
+
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+ .withCreateTime(Instant.now())
+
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+ .withLastModifiedTime(Instant.now())
+ .build())
+ .build();
+ statisticEntities.add(statistic);
+ }
+ TreeLockUtils.doWithTreeLock(
+ identifier,
+ LockType.WRITE,
+ (Executable<Void, IOException>)
+ () -> {
+ store.batchPut(statisticEntities, true);
+ return null;
+ });
+
+ } catch (NoSuchEntityException nse) {
+ LOG.warn(
+ "Failed to update statistics for metadata object {} in the metalake
{}: {}",
+ metadataObject.fullName(),
+ metalake,
+ nse.getMessage());
+ throw new NoSuchMetadataObjectException(
+ "The metadata object %s in the metalake %s isn't found",
+ metadataObject.fullName(), metalake);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ public boolean dropStatistics(
+ String metalake, MetadataObject metadataObject, List<String> statistics)
+ throws UnmodifiableStatisticException {
+ try {
+ NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake,
metadataObject);
+ Entity.EntityType type =
StatisticEntity.getStatisticType(metadataObject.type());
+ List<Pair<NameIdentifier, Entity.EntityType>> idents =
Lists.newArrayList();
+
+ for (String statistic : statistics) {
+ Pair<NameIdentifier, Entity.EntityType> pair =
+ Pair.of(NameIdentifierUtil.ofStatistic(identifier, statistic),
type);
+ idents.add(pair);
+ }
+ int deleteCount =
+ TreeLockUtils.doWithTreeLock(
+ identifier, LockType.WRITE, () -> store.batchDelete(idents,
true));
+ // If deleteCount is 0, it means that the statistics were not found.
+ return deleteCount != 0;
+ } catch (NoSuchEntityException nse) {
+ LOG.warn(
+ "Failed to drop statistics for metadata object {} in the metalake {}
: {}",
+ metadataObject.fullName(),
+ metalake,
+ nse.getMessage());
+ throw new NoSuchMetadataObjectException(
+ "The metadata object %s in the metalake %s isn't found",
+ metadataObject.fullName(), metalake);
+ } catch (IOException ioe) {
+ LOG.error(
+ "Failed to drop statistics for metadata object {} in the metalake {}
: {}",
+ metadataObject.fullName(),
+ metalake,
+ ioe.getMessage());
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ private static class CustomStatistic implements Statistic {
+
+ private final String name;
+ private final StatisticValue<?> value;
+
+ CustomStatistic(String name, StatisticValue<?> value) {
+ this.name = name;
+ this.value = value;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Optional<StatisticValue<?>> value() {
+ return Optional.of(value);
+ }
+
+ @Override
+ public boolean reserved() {
+ return false;
+ }
+
+ @Override
+ public boolean modifiable() {
+ return true;
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index a72182d5df..f1168d493f 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -20,13 +20,17 @@
package org.apache.gravitino.storage.relational;
import static
org.apache.gravitino.Configs.GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT;
+import static org.apache.gravitino.Entity.EntityType.TABLE;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.gravitino.Config;
import org.apache.gravitino.Configs;
import org.apache.gravitino.Entity;
@@ -49,6 +53,7 @@ import org.apache.gravitino.meta.ModelVersionEntity;
import org.apache.gravitino.meta.PolicyEntity;
import org.apache.gravitino.meta.RoleEntity;
import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.StatisticEntity;
import org.apache.gravitino.meta.TableEntity;
import org.apache.gravitino.meta.TagEntity;
import org.apache.gravitino.meta.TopicEntity;
@@ -138,6 +143,11 @@ public class JDBCBackend implements RelationalBackend {
JobTemplateMetaService.getInstance().listJobTemplatesByNamespace(namespace);
case JOB:
return (List<E>)
JobMetaService.getInstance().listJobsByNamespace(namespace);
+ case TABLE_STATISTIC:
+ return (List<E>)
+ StatisticMetaService.getInstance()
+ .listStatisticsByEntity(
+ NameIdentifier.parse(namespace.toString()),
Entity.EntityType.TABLE);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for list operation", entityType);
@@ -375,7 +385,7 @@ public class JDBCBackend implements RelationalBackend {
return ModelVersionMetaService.getInstance()
.deleteModelVersionMetasByLegacyTimeline(
legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
- case STATISTIC:
+ case TABLE_STATISTIC:
return StatisticMetaService.getInstance()
.deleteStatisticsByLegacyTimeline(
legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
@@ -413,7 +423,7 @@ public class JDBCBackend implements RelationalBackend {
case TAG:
case MODEL:
case MODEL_VERSION:
- case STATISTIC:
+ case TABLE_STATISTIC:
case JOB_TEMPLATE:
case JOB:
// These entity types have not implemented multi-versions, so we can
skip.
@@ -476,6 +486,76 @@ public class JDBCBackend implements RelationalBackend {
.associateTagsWithMetadataObject(objectIdent, objectType, tagsToAdd,
tagsToRemove);
}
+ @Override
+ public int batchDelete(
+ List<Pair<NameIdentifier, Entity.EntityType>> entitiesToDelete, boolean
cascade)
+ throws IOException {
+ if (entitiesToDelete == null || entitiesToDelete.isEmpty()) {
+ return 0;
+ }
+ Preconditions.checkArgument(
+ 1 ==
entitiesToDelete.stream().collect(Collectors.groupingBy(Pair::getRight)).size(),
+ "All entities must be of the same type for batch delete operation.");
+ Entity.EntityType entityType = entitiesToDelete.get(0).getRight();
+ switch (entityType) {
+ case TABLE_STATISTIC:
+ Preconditions.checkArgument(
+ cascade, "Batch delete for statistics must be cascade deleted.");
+ List<NameIdentifier> deleteIdents =
+
entitiesToDelete.stream().map(Pair::getLeft).collect(Collectors.toList());
+ int namespaceSize =
+ entitiesToDelete.stream()
+ .collect(Collectors.groupingBy(ident ->
ident.getLeft().namespace()))
+ .size();
+ Preconditions.checkArgument(
+ 1 == namespaceSize,
+ "All entities must be in the same namespace for batch delete
operation.");
+
+ Namespace namespace = deleteIdents.get(0).namespace();
+ return StatisticMetaService.getInstance()
+ .batchDeleteStatisticPOs(
+ NameIdentifier.parse(namespace.toString()),
+ TABLE,
+
deleteIdents.stream().map(NameIdentifier::name).collect(Collectors.toList()));
+ default:
+ throw new IllegalArgumentException(
+ String.format("Batch delete is not supported for entity type %s",
entityType.name()));
+ }
+ }
+
+ @Override
+ public <E extends Entity & HasIdentifier> void batchPut(List<E> entities,
boolean overwritten)
+ throws IOException, EntityAlreadyExistsException {
+ if (entities.isEmpty()) {
+ return;
+ }
+
+ Preconditions.checkArgument(
+ 1 ==
entities.stream().collect(Collectors.groupingBy(Entity::type)).size(),
+ "All entities must be of the same type for batchPut operation.");
+ Entity.EntityType entityType = entities.get(0).type();
+
+ switch (entityType) {
+ case TABLE_STATISTIC:
+ Preconditions.checkArgument(overwritten, "Batch put for statistics
must be overwritten.");
+ List<StatisticEntity> statisticEntities =
+ entities.stream().map(e -> (StatisticEntity)
e).collect(Collectors.toList());
+ Preconditions.checkArgument(
+ 1 ==
entities.stream().collect(Collectors.groupingBy(HasIdentifier::namespace)).size(),
+ "All entities must be in the same namespace for batchPut
operation.");
+
+ StatisticMetaService.getInstance()
+ .batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ statisticEntities,
+
NameIdentifier.parse(statisticEntities.get(0).namespace().toString()),
+ Entity.EntityType.TABLE);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Batch put is not supported for entity type %s",
entityType.name()));
+ }
+ }
+
@Override
public <E extends Entity & HasIdentifier> List<E> listEntitiesByRelation(
Type relType, NameIdentifier nameIdentifier, Entity.EntityType
identType, boolean allFields)
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalBackend.java
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalBackend.java
index 95a58e86fd..42c21e9a5d 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalBackend.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalBackend.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.gravitino.Config;
import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityAlreadyExistsException;
@@ -126,6 +127,29 @@ public interface RelationalBackend
boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean
cascade)
throws IOException;
+ /**
+ * Deletes the entities in the specified namespace and entity type.
+ *
+ * @param entitiesToDelete The list of identifiers and their entity types to
be deleted.
+ * @param cascade True, If you need to cascade delete entities, else false.
+ * @return The count of the deleted entities.
+ * @throws IOException If the store operation fails
+ */
+ int batchDelete(List<Pair<NameIdentifier, Entity.EntityType>>
entitiesToDelete, boolean cascade)
+ throws IOException;
+
+ /**
+ * Stores a batch of entities, possibly overwriting existing entities if
specified.
+ *
+ * @param entities The list of entities to be stored.
+ * @param overwritten If true, overwrites existing entities with the same
identifier.
+ * @param <E> The type of the entities in the list.
+ * @throws IOException If the store operation fails
+ * @throws EntityAlreadyExistsException If an entity already exists and
overwrite is false.
+ */
+ <E extends Entity & HasIdentifier> void batchPut(List<E> entities, boolean
overwritten)
+ throws IOException, EntityAlreadyExistsException;
+
/**
* Permanently deletes the legacy data that has been marked as deleted
before the given legacy
* timeline.
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
index 9ef62d656a..1a71fb0070 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.gravitino.Config;
import org.apache.gravitino.Configs;
import org.apache.gravitino.Entity;
@@ -260,4 +261,17 @@ public class RelationalEntityStore
return backend.updateEntityRelations(
relType, srcEntityIdent, srcEntityType, destEntitiesToAdd,
destEntitiesToRemove);
}
+
+ @Override
+ public int batchDelete(
+ List<Pair<NameIdentifier, Entity.EntityType>> entitiesToDelete, boolean
cascade)
+ throws IOException {
+ return backend.batchDelete(entitiesToDelete, cascade);
+ }
+
+ @Override
+ public <E extends Entity & HasIdentifier> void batchPut(List<E> entities,
boolean overwritten)
+ throws IOException, EntityAlreadyExistsException {
+ backend.batchPut(entities, overwritten);
+ }
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/po/StatisticPO.java
b/core/src/main/java/org/apache/gravitino/storage/relational/po/StatisticPO.java
index 51dd1e0066..fa3de359b2 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/po/StatisticPO.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/po/StatisticPO.java
@@ -57,7 +57,9 @@ public class StatisticPO {
public static StatisticEntity fromStatisticPO(StatisticPO statisticPO) {
try {
- return StatisticEntity.builder()
+ return StatisticEntity.builder(
+ StatisticEntity.getStatisticType(
+ MetadataObject.Type.valueOf(statisticPO.metadataObjectType)))
.withId(statisticPO.getStatisticId())
.withName(statisticPO.getStatisticName())
.withValue(
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
index 8b92cc5a25..e455a4c3b2 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
@@ -59,13 +59,12 @@ public class StatisticMetaService {
}
public void batchInsertStatisticPOsOnDuplicateKeyUpdate(
- List<StatisticEntity> statisticEntities,
- String metalake,
- NameIdentifier entity,
- Entity.EntityType type) {
+ List<StatisticEntity> statisticEntities, NameIdentifier entity,
Entity.EntityType type) {
if (statisticEntities == null || statisticEntities.isEmpty()) {
return;
}
+
+ String metalake = NameIdentifierUtil.getMetalake(entity);
Long metalakeId =
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
MetadataObject object = NameIdentifierUtil.toMetadataObject(entity, type);
Long entityId =
diff --git
a/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
b/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
new file mode 100644
index 0000000000..e430d9486c
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.stats;
+
+import static org.apache.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE;
+import static
org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER;
+import static
org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_MAX_CONNECTIONS;
+import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL;
+import static
org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_WAIT_MILLISECONDS;
+import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_STORE;
+import static org.apache.gravitino.Configs.ENTITY_STORE;
+import static org.apache.gravitino.Configs.RELATIONAL_ENTITY_STORE;
+import static org.apache.gravitino.Configs.STORE_DELETE_AFTER_TIME;
+import static org.apache.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME;
+import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
+import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
+import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.apache.gravitino.Configs.VERSION_RETENTION_COUNT;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.EntityStoreFactory;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
+import org.apache.gravitino.lock.LockManager;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.SchemaVersion;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestStatisticManager {
+ private static final String JDBC_STORE_PATH =
+ "/tmp/gravitino_jdbc_entityStore_" +
UUID.randomUUID().toString().replace("-", "");
+
+ private static final String DB_DIR = JDBC_STORE_PATH + "/testdb";
+ private static final String METALAKE = "metalake_for_stat_test";
+
+ private static final String CATALOG = "catalog_for_stat_test";
+
+ private static final String SCHEMA = "schema_for_stat_test";
+
+ private static final String TABLE = "table_for_stat_test";
+
+ private static final String COLUMN = "column_for_stat_test";
+ private static final Config config = Mockito.mock(Config.class);
+
+ private static EntityStore entityStore;
+
+ private static IdGenerator idGenerator;
+
+ @BeforeAll
+ public static void setUp() throws IOException, IllegalAccessException {
+ idGenerator = new RandomIdGenerator();
+
+ File dbDir = new File(DB_DIR);
+ dbDir.mkdirs();
+
+ Mockito.when(config.get(ENTITY_STORE)).thenReturn(RELATIONAL_ENTITY_STORE);
+
Mockito.when(config.get(ENTITY_RELATIONAL_STORE)).thenReturn(DEFAULT_ENTITY_RELATIONAL_STORE);
+ Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_URL))
+
.thenReturn(String.format("jdbc:h2:file:%s;DB_CLOSE_DELAY=-1;MODE=MYSQL",
DB_DIR));
+
Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)).thenReturn("org.h2.Driver");
+
Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_MAX_CONNECTIONS)).thenReturn(100);
+
Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_WAIT_MILLISECONDS)).thenReturn(1000L);
+
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
+ Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 *
1000L);
+ Mockito.when(config.get(VERSION_RETENTION_COUNT)).thenReturn(1L);
+ // Fix cache config for test
+ Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(true);
+ Mockito.when(config.get(Configs.CACHE_MAX_ENTRIES)).thenReturn(10_000);
+
Mockito.when(config.get(Configs.CACHE_EXPIRATION_TIME)).thenReturn(3_600_000L);
+ Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
+ Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
+
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+
+ Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
+ Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
+ Mockito.doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new
LockManager(config), true);
+
+ entityStore = EntityStoreFactory.createEntityStore(config);
+ entityStore.initialize(config);
+
+ AuditInfo audit =
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+
+ BaseMetalake metalake =
+ BaseMetalake.builder()
+ .withId(idGenerator.nextId())
+ .withName(METALAKE)
+ .withVersion(SchemaVersion.V_0_1)
+ .withComment("Test metalake")
+ .withAuditInfo(audit)
+ .build();
+ entityStore.put(metalake, false /* overwritten */);
+
+ CatalogEntity catalog =
+ CatalogEntity.builder()
+ .withId(idGenerator.nextId())
+ .withName(CATALOG)
+ .withNamespace(Namespace.of(METALAKE))
+ .withType(Catalog.Type.RELATIONAL)
+ .withProvider("test")
+ .withComment("Test catalog")
+ .withAuditInfo(audit)
+ .build();
+ entityStore.put(catalog, false /* overwritten */);
+
+ SchemaEntity schema =
+ SchemaEntity.builder()
+ .withId(idGenerator.nextId())
+ .withName(SCHEMA)
+ .withNamespace(Namespace.of(METALAKE, CATALOG))
+ .withComment("Test schema")
+ .withAuditInfo(audit)
+ .build();
+ entityStore.put(schema, false /* overwritten */);
+
+ ColumnEntity column =
+ ColumnEntity.builder()
+ .withId(idGenerator.nextId())
+ .withName(COLUMN)
+ .withPosition(0)
+ .withComment("Test column")
+ .withDataType(Types.IntegerType.get())
+ .withNullable(true)
+ .withAutoIncrement(false)
+ .withAuditInfo(audit)
+ .build();
+
+ TableEntity table =
+ TableEntity.builder()
+ .withId(idGenerator.nextId())
+ .withName(TABLE)
+ .withColumns(Lists.newArrayList(column))
+ .withNamespace(Namespace.of(METALAKE, CATALOG, SCHEMA))
+ .withAuditInfo(audit)
+ .build();
+ entityStore.put(table, false /* overwritten */);
+ }
+
+ @AfterAll
+ public static void tearDown() throws IOException {
+ if (entityStore != null) {
+ entityStore.close();
+ entityStore = null;
+ }
+
+ FileUtils.deleteDirectory(new File(JDBC_STORE_PATH));
+ }
+
+ @Test
+ public void testStatisticLifeCycle() {
+ StatisticManager statisticManager = new StatisticManager(entityStore,
idGenerator);
+
+ MetadataObject tableObject =
+ MetadataObjects.of(Lists.newArrayList(CATALOG, SCHEMA, TABLE),
MetadataObject.Type.TABLE);
+ Map<String, StatisticValue<?>> stats = Maps.newHashMap();
+ // Update statistics
+ stats.put("a", StatisticValues.stringValue("1"));
+ stats.put("b", StatisticValues.longValue(1L));
+ stats.put("c", StatisticValues.doubleValue(1.0));
+ stats.put("d", StatisticValues.booleanValue(true));
+ stats.put(
+ "e",
+ StatisticValues.listValue(
+ Lists.newArrayList(
+ StatisticValues.stringValue("1"),
StatisticValues.stringValue("2"))));
+
+ Map<String, StatisticValue<?>> map = Maps.newHashMap();
+ map.put("x", StatisticValues.stringValue("1"));
+ map.put("y", StatisticValues.longValue(2L));
+ stats.put("f", StatisticValues.objectValue(map));
+ statisticManager.updateStatistics(METALAKE, tableObject, stats);
+
+ List<Statistic> statistics = statisticManager.listStatistics(METALAKE,
tableObject);
+ Assertions.assertEquals(6, statistics.size());
+ for (Statistic statistic : statistics) {
+ Assertions.assertTrue(
+ stats.containsKey(statistic.name()),
+ "Statistic name should be in the updated stats: " +
statistic.name());
+ StatisticValue<?> value = stats.get(statistic.name());
+ Assertions.assertEquals(
+ value, statistic.value().get(), "Statistic value type mismatch: " +
statistic.name());
+ }
+
+ // Update partial statistics
+ Map<String, StatisticValue<?>> expectStats = Maps.newHashMap();
+ expectStats.putAll(stats);
+ stats.clear();
+ stats.put("f", StatisticValues.longValue(2L));
+ stats.put("x", StatisticValues.longValue(2L));
+
+ expectStats.put("f", StatisticValues.longValue(2L));
+ expectStats.put("x", StatisticValues.longValue(2));
+
+ statisticManager.updateStatistics(METALAKE, tableObject, stats);
+ statistics = statisticManager.listStatistics(METALAKE, tableObject);
+ Assertions.assertEquals(7, statistics.size());
+ for (Statistic statistic : statistics) {
+ Assertions.assertTrue(
+ expectStats.containsKey(statistic.name()),
+ "Statistic name should be in the updated stats: " +
statistic.name());
+ StatisticValue<?> value = expectStats.get(statistic.name());
+ Assertions.assertEquals(
+ value, statistic.value().get(), "Statistic value type mismatch: " +
statistic.name());
+ }
+
+ // Drop statistics
+ expectStats.remove("a");
+ expectStats.remove("b");
+ expectStats.remove("c");
+ List<String> statNames = Lists.newArrayList("a", "b", "c");
+ statisticManager.dropStatistics(METALAKE, tableObject, statNames);
+ statistics = statisticManager.listStatistics(METALAKE, tableObject);
+ Assertions.assertEquals(4, statistics.size());
+
+ for (Statistic statistic : statistics) {
+ Assertions.assertTrue(
+ expectStats.containsKey(statistic.name()),
+ "Statistic name should be in the updated stats: " +
statistic.name());
+ StatisticValue<?> value = expectStats.get(statistic.name());
+ Assertions.assertEquals(
+ value, statistic.value().get(), "Statistic value type mismatch: " +
statistic.name());
+ }
+
+ // List not-existed metadata object statistics
+ MetadataObject notExistObject =
+ MetadataObjects.of(
+ Lists.newArrayList(CATALOG, SCHEMA, "not-exist"),
MetadataObject.Type.TABLE);
+ Assertions.assertThrows(
+ NoSuchMetadataObjectException.class,
+ () -> statisticManager.listStatistics(METALAKE, notExistObject));
+
+ // Update not-existed metadata object statistics
+ Assertions.assertThrows(
+ NoSuchMetadataObjectException.class,
+ () -> statisticManager.updateStatistics(METALAKE, notExistObject,
stats));
+
+ // Drop statistics
+ Assertions.assertThrows(
+ NoSuchMetadataObjectException.class,
+ () -> statisticManager.dropStatistics(METALAKE, notExistObject,
statNames));
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/storage/memory/TestMemoryEntityStore.java
b/core/src/test/java/org/apache/gravitino/storage/memory/TestMemoryEntityStore.java
index bdb37895d2..725d440dde 100644
---
a/core/src/test/java/org/apache/gravitino/storage/memory/TestMemoryEntityStore.java
+++
b/core/src/test/java/org/apache/gravitino/storage/memory/TestMemoryEntityStore.java
@@ -31,6 +31,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.gravitino.Config;
import org.apache.gravitino.Entity;
import org.apache.gravitino.Entity.EntityType;
@@ -170,6 +171,19 @@ public class TestMemoryEntityStore {
}
}
+ @Override
+ public int batchDelete(List<Pair<NameIdentifier, EntityType>>
entitiesToDelete, boolean cascade)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Batch delete is not supported in InMemoryEntityStore.");
+ }
+
+ @Override
+ public <E extends Entity & HasIdentifier> void batchPut(List<E> entities,
boolean overwritten)
+ throws IOException, EntityAlreadyExistsException {
+ throw new UnsupportedOperationException("Batch put is not supported in
InMemoryEntityStore.");
+ }
+
@Override
public void close() throws IOException {
entityMap.clear();
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestStatisticMetaService.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestStatisticMetaService.java
index a45705377d..1ffd191ae6 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestStatisticMetaService.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestStatisticMetaService.java
@@ -31,6 +31,7 @@ import org.apache.gravitino.meta.ModelEntity;
import org.apache.gravitino.meta.SchemaEntity;
import org.apache.gravitino.meta.StatisticEntity;
import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.TableStatisticEntity;
import org.apache.gravitino.meta.TopicEntity;
import org.apache.gravitino.stats.StatisticValues;
import org.apache.gravitino.storage.RandomIdGenerator;
@@ -76,7 +77,7 @@ public class TestStatisticMetaService extends TestJDBCBackend
{
statisticEntities.add(statisticEntity);
statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
- statisticEntities, metalakeName, table.nameIdentifier(),
Entity.EntityType.TABLE);
+ statisticEntities, table.nameIdentifier(), Entity.EntityType.TABLE);
List<StatisticEntity> listEntities =
statisticMetaService.listStatisticsByEntity(
@@ -90,7 +91,7 @@ public class TestStatisticMetaService extends TestJDBCBackend
{
statisticEntities.clear();
statisticEntities.add(statisticEntity);
statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
- statisticEntities, metalakeName, table.nameIdentifier(),
Entity.EntityType.TABLE);
+ statisticEntities, table.nameIdentifier(), Entity.EntityType.TABLE);
listEntities =
statisticMetaService.listStatisticsByEntity(
@@ -165,25 +166,25 @@ public class TestStatisticMetaService extends
TestJDBCBackend {
StatisticEntity statisticEntity = createStatisticEntity(auditInfo, 100L);
statisticEntities.add(statisticEntity);
statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
- statisticEntities, metalakeName, table.nameIdentifier(),
Entity.EntityType.TABLE);
+ statisticEntities, table.nameIdentifier(), Entity.EntityType.TABLE);
statisticEntities.clear();
statisticEntity = createStatisticEntity(auditInfo, 100L);
statisticEntities.add(statisticEntity);
statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
- statisticEntities, metalakeName, topic.nameIdentifier(),
Entity.EntityType.TOPIC);
+ statisticEntities, topic.nameIdentifier(), Entity.EntityType.TOPIC);
statisticEntities.clear();
statisticEntity = createStatisticEntity(auditInfo, 100L);
statisticEntities.add(statisticEntity);
statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
- statisticEntities, metalakeName, fileset.nameIdentifier(),
Entity.EntityType.FILESET);
+ statisticEntities, fileset.nameIdentifier(),
Entity.EntityType.FILESET);
statisticEntities.clear();
statisticEntity = createStatisticEntity(auditInfo, 100L);
statisticEntities.add(statisticEntity);
statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
- statisticEntities, metalakeName, model.nameIdentifier(),
Entity.EntityType.MODEL);
+ statisticEntities, model.nameIdentifier(), Entity.EntityType.MODEL);
// assert stats
Assertions.assertEquals(4, countActiveStats(metalake.id()));
@@ -278,25 +279,25 @@ public class TestStatisticMetaService extends
TestJDBCBackend {
statisticEntity = createStatisticEntity(auditInfo, 100L);
statisticEntities.add(statisticEntity);
statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
- statisticEntities, metalakeName, table.nameIdentifier(),
Entity.EntityType.TABLE);
+ statisticEntities, table.nameIdentifier(), Entity.EntityType.TABLE);
statisticEntities.clear();
statisticEntity = createStatisticEntity(auditInfo, 100L);
statisticEntities.add(statisticEntity);
statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
- statisticEntities, metalakeName, topic.nameIdentifier(),
Entity.EntityType.TOPIC);
+ statisticEntities, topic.nameIdentifier(), Entity.EntityType.TOPIC);
statisticEntities.clear();
statisticEntity = createStatisticEntity(auditInfo, 100L);
statisticEntities.add(statisticEntity);
statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
- statisticEntities, metalakeName, fileset.nameIdentifier(),
Entity.EntityType.FILESET);
+ statisticEntities, fileset.nameIdentifier(),
Entity.EntityType.FILESET);
statisticEntities.clear();
statisticEntity = createStatisticEntity(auditInfo, 100L);
statisticEntities.add(statisticEntity);
statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
- statisticEntities, metalakeName, model.nameIdentifier(),
Entity.EntityType.MODEL);
+ statisticEntities, model.nameIdentifier(), Entity.EntityType.MODEL);
// assert stats
Assertions.assertEquals(4, countActiveStats(metalake.id()));
@@ -359,25 +360,25 @@ public class TestStatisticMetaService extends
TestJDBCBackend {
statisticEntity = createStatisticEntity(auditInfo, 100L);
statisticEntities.add(statisticEntity);
statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
- statisticEntities, metalakeName, table.nameIdentifier(),
Entity.EntityType.TABLE);
+ statisticEntities, table.nameIdentifier(), Entity.EntityType.TABLE);
statisticEntities.clear();
statisticEntity = createStatisticEntity(auditInfo, 100L);
statisticEntities.add(statisticEntity);
statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
- statisticEntities, metalakeName, topic.nameIdentifier(),
Entity.EntityType.TOPIC);
+ statisticEntities, topic.nameIdentifier(), Entity.EntityType.TOPIC);
statisticEntities.clear();
statisticEntity = createStatisticEntity(auditInfo, 100L);
statisticEntities.add(statisticEntity);
statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
- statisticEntities, metalakeName, fileset.nameIdentifier(),
Entity.EntityType.FILESET);
+ statisticEntities, fileset.nameIdentifier(),
Entity.EntityType.FILESET);
statisticEntities.clear();
statisticEntity = createStatisticEntity(auditInfo, 100L);
statisticEntities.add(statisticEntity);
statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
- statisticEntities, metalakeName, model.nameIdentifier(),
Entity.EntityType.MODEL);
+ statisticEntities, model.nameIdentifier(), Entity.EntityType.MODEL);
// assert stats count
Assertions.assertEquals(4, countActiveStats(metalake.id()));
@@ -392,7 +393,7 @@ public class TestStatisticMetaService extends
TestJDBCBackend {
}
private static StatisticEntity createStatisticEntity(AuditInfo auditInfo,
long value) {
- return StatisticEntity.builder()
+ return TableStatisticEntity.builder()
.withId(RandomIdGenerator.INSTANCE.nextId())
.withName("test")
.withValue(StatisticValues.longValue(value))
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
index 31211eab23..9d72d31fa9 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
@@ -58,6 +58,7 @@ import org.apache.gravitino.meta.SchemaEntity;
import org.apache.gravitino.meta.SchemaVersion;
import org.apache.gravitino.meta.StatisticEntity;
import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.TableStatisticEntity;
import org.apache.gravitino.meta.TagEntity;
import org.apache.gravitino.meta.TopicEntity;
import org.apache.gravitino.policy.Policy;
@@ -1212,7 +1213,7 @@ public class TestPOConverters {
public void testStatisticPO() throws JsonProcessingException {
List<StatisticEntity> statisticEntities = Lists.newArrayList();
statisticEntities.add(
- StatisticEntity.builder()
+ TableStatisticEntity.builder()
.withId(1L)
.withName("test_statistic")
.withNamespace(
@@ -1241,7 +1242,7 @@ public class TestPOConverters {
.withStatisticName("test")
.withStatisticValue("\"test\"")
.withMetadataObjectId(1L)
- .withMetadataObjectType("CATALOG")
+ .withMetadataObjectType("TABLE")
.withDeletedAt(0L)
.withMetalakeId(1L)
.withAuditInfo(