This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new e7849f284d [#7825] feat(core, server): Add partition statistic manager
and partition REST API (#7876)
e7849f284d is described below
commit e7849f284d54135368247d9897c3efb35e18c478
Author: roryqi <[email protected]>
AuthorDate: Wed Aug 20 14:05:30 2025 +0800
[#7825] feat(core, server): Add partition statistic manager and partition
REST API (#7876)
### What changes were proposed in this pull request?
1. Extract PartitionStatisticsUpdate and PartitionStatisticsDrop
interfaces
2. Add partition statistic manager
3. Add partition REST API
### Why are the changes needed?
Fix: #7825
### Does this PR introduce _any_ user-facing change?
I will add the document later.
### How was this patch tested?
Add UT.
---
.../exceptions/IllegalStatisticNameException.java | 6 +-
.../gravitino/stats/PartitionStatistics.java | 2 +-
.../gravitino/stats/PartitionStatisticsDrop.java | 31 +-
.../stats/PartitionStatisticsModification.java | 122 ++++++++
.../gravitino/stats/PartitionStatisticsUpdate.java | 31 +-
.../requests/PartitionStatisticsDropRequest.java | 66 ++++
.../requests/PartitionStatisticsUpdateRequest.java | 66 ++++
.../responses/PartitionStatisticsListResponse.java | 69 +++++
.../dto/stats/PartitionStatisticsDTO.java | 85 ++++++
.../dto/stats/PartitionStatisticsDropDTO.java | 89 ++++++
.../dto/stats/PartitionStatisticsUpdateDTO.java | 91 ++++++
.../dto/stats/TestPartitionStatisticsDTO.java | 57 ++++
.../main/java/org/apache/gravitino/Configs.java | 9 +
.../java/org/apache/gravitino/GravitinoEnv.java | 10 +-
.../apache/gravitino/stats/StatisticManager.java | 154 +++++++++-
.../MemoryPartitionStatsStorageFactory.java | 71 ++++-
.../stats/storage/PartitionStatisticStorage.java | 17 +-
.../storage/PersistedPartitionStatistics.java | 25 +-
.../stats/storage/PersistedStatistic.java | 82 +++++
.../gravitino/stats/TestStatisticManager.java | 108 ++++++-
.../storage/TestMemoryPartitionStatsStorage.java | 76 ++++-
.../server/web/rest/ExceptionHandlers.java | 34 +++
.../server/web/rest/StatisticOperations.java | 256 ++++++++++++++++
.../server/web/rest/TestStatisticOperations.java | 340 ++++++++++++++++++++-
24 files changed, 1786 insertions(+), 111 deletions(-)
diff --git
a/api/src/main/java/org/apache/gravitino/exceptions/IllegalStatisticNameException.java
b/api/src/main/java/org/apache/gravitino/exceptions/IllegalStatisticNameException.java
index ee98b84036..84cbf590f8 100644
---
a/api/src/main/java/org/apache/gravitino/exceptions/IllegalStatisticNameException.java
+++
b/api/src/main/java/org/apache/gravitino/exceptions/IllegalStatisticNameException.java
@@ -22,7 +22,7 @@ import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
/** An exception thrown when statistic has an illegal name */
-public class IllegalStatisticNameException extends GravitinoRuntimeException {
+public class IllegalStatisticNameException extends IllegalArgumentException {
/**
* Constructs a new exception with the specified detail message.
*
@@ -31,7 +31,7 @@ public class IllegalStatisticNameException extends
GravitinoRuntimeException {
*/
@FormatMethod
public IllegalStatisticNameException(@FormatString String message, Object...
args) {
- super(message, args);
+ super(String.format(message, args));
}
/**
@@ -44,6 +44,6 @@ public class IllegalStatisticNameException extends
GravitinoRuntimeException {
@FormatMethod
public IllegalStatisticNameException(
Throwable cause, @FormatString String message, Object... args) {
- super(cause, message, args);
+ super(String.format(message, args), cause);
}
}
diff --git
a/api/src/main/java/org/apache/gravitino/stats/PartitionStatistics.java
b/api/src/main/java/org/apache/gravitino/stats/PartitionStatistics.java
index f44be6c953..356d9e5b4a 100644
--- a/api/src/main/java/org/apache/gravitino/stats/PartitionStatistics.java
+++ b/api/src/main/java/org/apache/gravitino/stats/PartitionStatistics.java
@@ -26,7 +26,7 @@ public interface PartitionStatistics {
*
* @return the name of the partition
*/
- String name();
+ String partitionName();
/**
* Returns the statistics for the partition.
diff --git
a/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsDrop.java
b/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsDrop.java
index 4e4cc1b170..c9acfe35db 100644
--- a/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsDrop.java
+++ b/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsDrop.java
@@ -25,44 +25,19 @@ import java.util.List;
* source. It is used to manage and track the statistics that are relevant
when partitions are
* dropped.
*/
-public class PartitionStatisticsDrop {
-
- private final String partitionName;
- private final List<String> statisticNames;
-
- /**
- * Creates a PartitionDropStatistics instance with the specified partition
name and statistic
- * names.
- *
- * @param partitionName the name of the partition.
- * @param statisticNames a list of statistic names that are relevant to the
partition being
- * dropped.
- * @return a PartitionDropStatistics instance
- */
- public static PartitionStatisticsDrop of(String partitionName, List<String>
statisticNames) {
- return new PartitionStatisticsDrop(partitionName, statisticNames);
- }
-
- private PartitionStatisticsDrop(String partitionName, List<String>
statisticNames) {
- this.partitionName = partitionName;
- this.statisticNames = statisticNames;
- }
+public interface PartitionStatisticsDrop {
/**
* Returns the name of the partition for which these statistics are
applicable.
*
* @return the name of the partition
*/
- public String partitionName() {
- return partitionName;
- }
+ String partitionName();
/**
* Returns the names of the statistics that are relevant to the partition
being dropped.
*
* @return a list of statistic names
*/
- public List<String> statisticNames() {
- return statisticNames;
- }
+ List<String> statisticNames();
}
diff --git
a/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsModification.java
b/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsModification.java
new file mode 100644
index 0000000000..ed63374c70
--- /dev/null
+++
b/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsModification.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * PartitionStatisticsModification provides methods to create instances of
PartitionStatisticsDrop
+ * and PartitionStatisticsUpdate, which are used to manage statistics related
to partitions in a
+ * data source.
+ */
+public class PartitionStatisticsModification {
+
+ /**
+ * Creates a PartitionDropStatistics instance with the specified partition
name and statistic
+ * names.
+ *
+ * @param partitionName the name of the partition.
+ * @param statisticNames a list of statistic names that are relevant to the
partition being
+ * dropped.
+ * @return a PartitionDropStatistics instance
+ */
+ public static PartitionStatisticsDrop drop(String partitionName,
List<String> statisticNames) {
+ return new PartitionStatisticsDropImpl(partitionName, statisticNames);
+ }
+
+ /**
+ * Creates a PartitionUpdateStatistics instance with the specified partition
name and statistics.
+ *
+ * @param partitionName the name of the partition
+ * @param statistics a map of statistic names to their values to be updated
+ * @return a PartitionUpdateStatistics instance
+ */
+ public static PartitionStatisticsUpdate update(
+ String partitionName, Map<String, StatisticValue<?>> statistics) {
+ return new PartitionStatisticsUpdateImpl(partitionName, statistics);
+ }
+
+ private static class PartitionStatisticsDropImpl implements
PartitionStatisticsDrop {
+
+ private final String partitionName;
+ private final List<String> statisticNames;
+
+ private PartitionStatisticsDropImpl(String partitionName, List<String>
statisticNames) {
+ this.partitionName = partitionName;
+ this.statisticNames = statisticNames;
+ }
+
+ /**
+ * Returns the name of the partition for which these statistics are
applicable.
+ *
+ * @return the name of the partition
+ */
+ public String partitionName() {
+ return partitionName;
+ }
+
+ /**
+ * Returns the names of the statistics that are relevant to the partition
being dropped.
+ *
+ * @return a list of statistic names
+ */
+ public List<String> statisticNames() {
+ return statisticNames;
+ }
+ }
+
+ private static class PartitionStatisticsUpdateImpl implements
PartitionStatisticsUpdate {
+
+ private final String partitionName;
+ private final Map<String, StatisticValue<?>> statistics;
+
+ /**
+ * Creates a PartitionUpdateStatistics instance with the specified
partition name and
+ * statistics.
+ *
+ * @param partitionName the name of the partition
+ * @param statistics a map of statistic names to their values to be updated
+ * @return a PartitionUpdateStatistics instance
+ */
+ private PartitionStatisticsUpdateImpl(
+ String partitionName, Map<String, StatisticValue<?>> statistics) {
+ this.partitionName = partitionName;
+ this.statistics = statistics;
+ }
+
+ /**
+ * Returns the name of the partition for which these statistics are
applicable.
+ *
+ * @return the name of the partition.
+ */
+ public String partitionName() {
+ return partitionName;
+ }
+
+ /**
+ * Returns the statistics to be updated for the partition.
+ *
+ * @return a map where the key is the statistic name and the value is the
statistic value.
+ */
+ public Map<String, StatisticValue<?>> statistics() {
+ return statistics;
+ }
+ }
+}
diff --git
a/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsUpdate.java
b/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsUpdate.java
index 7a8f493506..df47af2f5b 100644
---
a/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsUpdate.java
+++
b/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsUpdate.java
@@ -24,44 +24,19 @@ import java.util.Map;
* PartitionUpdateStatistics represents the statistics for a specific
partition that can be updated.
* It contains the partition name and a map of statistic names to their values.
*/
-public class PartitionStatisticsUpdate {
-
- private final String partitionName;
- private final Map<String, StatisticValue<?>> statistics;
-
- /**
- * Creates a PartitionUpdateStatistics instance with the specified partition
name and statistics.
- *
- * @param partitionName the name of the partition
- * @param statistics a map of statistic names to their values to be updated
- * @return a PartitionUpdateStatistics instance
- */
- public static PartitionStatisticsUpdate of(
- String partitionName, Map<String, StatisticValue<?>> statistics) {
- return new PartitionStatisticsUpdate(partitionName, statistics);
- }
-
- private PartitionStatisticsUpdate(
- String partitionName, Map<String, StatisticValue<?>> statistics) {
- this.partitionName = partitionName;
- this.statistics = statistics;
- }
+public interface PartitionStatisticsUpdate {
/**
* Returns the name of the partition for which these statistics are
applicable.
*
* @return the name of the partition.
*/
- public String partitionName() {
- return partitionName;
- }
+ String partitionName();
/**
* Returns the statistics to be updated for the partition.
*
* @return a map where the key is the statistic name and the value is the
statistic value.
*/
- public Map<String, StatisticValue<?>> statistics() {
- return statistics;
- }
+ Map<String, StatisticValue<?>> statistics();
}
diff --git
a/common/src/main/java/org/apache/gravitino/dto/requests/PartitionStatisticsDropRequest.java
b/common/src/main/java/org/apache/gravitino/dto/requests/PartitionStatisticsDropRequest.java
new file mode 100644
index 0000000000..3a571242e7
--- /dev/null
+++
b/common/src/main/java/org/apache/gravitino/dto/requests/PartitionStatisticsDropRequest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.requests;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.gravitino.dto.stats.PartitionStatisticsDropDTO;
+import org.apache.gravitino.rest.RESTRequest;
+
+/** Request to drop partition statistics. */
+@Getter
+@EqualsAndHashCode
+@ToString
+public class PartitionStatisticsDropRequest implements RESTRequest {
+
+ @JsonProperty("drops")
+ private List<PartitionStatisticsDropDTO> drops;
+
+ /**
+ * Creates a new PartitionStatsDropRequest.
+ *
+ * @param drops The partition statistics to drop.
+ */
+ public PartitionStatisticsDropRequest(List<PartitionStatisticsDropDTO>
drops) {
+ this.drops = drops;
+ }
+
+ /** This is the constructor that is used by Jackson deserializer */
+ public PartitionStatisticsDropRequest() {
+ this(null);
+ }
+
+ /**
+ * Validates the request.
+ *
+ * @throws IllegalArgumentException If the request is invalid, this
exception is thrown.
+ */
+ @Override
+ public void validate() throws IllegalArgumentException {
+ Preconditions.checkArgument(
+ drops != null && !drops.isEmpty(), "\"drops\" must not be null or
empty.");
+ for (PartitionStatisticsDropDTO drop : drops) {
+ drop.validate();
+ }
+ }
+}
diff --git
a/common/src/main/java/org/apache/gravitino/dto/requests/PartitionStatisticsUpdateRequest.java
b/common/src/main/java/org/apache/gravitino/dto/requests/PartitionStatisticsUpdateRequest.java
new file mode 100644
index 0000000000..5ef9d92f4d
--- /dev/null
+++
b/common/src/main/java/org/apache/gravitino/dto/requests/PartitionStatisticsUpdateRequest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.requests;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.gravitino.dto.stats.PartitionStatisticsUpdateDTO;
+import org.apache.gravitino.rest.RESTRequest;
+
+/** Represents a request to update partition statistics. */
+@Getter
+@EqualsAndHashCode
+@ToString
+public class PartitionStatisticsUpdateRequest implements RESTRequest {
+
+ @JsonProperty("updates")
+ private final List<PartitionStatisticsUpdateDTO> updates;
+
+ /**
+ * Creates a new PartitionStatsUpdateRequest.
+ *
+ * @param updates The updates to apply to the partition statistics.
+ */
+ public PartitionStatisticsUpdateRequest(List<PartitionStatisticsUpdateDTO>
updates) {
+ this.updates = updates;
+ }
+
+ /** Default constructor for PartitionStatsUpdateRequest. (Used for Jackson
deserialization.) */
+ public PartitionStatisticsUpdateRequest() {
+ this(null);
+ }
+
+ /**
+ * Validates the request.
+ *
+ * @throws IllegalArgumentException If the request is invalid, this
exception is thrown.
+ */
+ @Override
+ public void validate() throws IllegalArgumentException {
+ Preconditions.checkArgument(
+ updates != null && !updates.isEmpty(), "\"updates\" must not be null
or empty.");
+ for (PartitionStatisticsUpdateDTO update : updates) {
+ update.validate();
+ }
+ }
+}
diff --git
a/common/src/main/java/org/apache/gravitino/dto/responses/PartitionStatisticsListResponse.java
b/common/src/main/java/org/apache/gravitino/dto/responses/PartitionStatisticsListResponse.java
new file mode 100644
index 0000000000..d22240bf0c
--- /dev/null
+++
b/common/src/main/java/org/apache/gravitino/dto/responses/PartitionStatisticsListResponse.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.responses;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.gravitino.dto.stats.PartitionStatisticsDTO;
+
+/**
+ * PartitionStatisticsListResponse is a response object that contains an array
of
+ * PartitionStatisticsDTO.
+ */
+@Getter
+@ToString
+@EqualsAndHashCode(callSuper = true)
+public class PartitionStatisticsListResponse extends BaseResponse {
+
+ @JsonProperty("partitionStatistics")
+ private PartitionStatisticsDTO[] partitionStatistics;
+
+ /**
+ * Creates a new PartitionStatsListResponse.
+ *
+ * @param partitionStatistics The updated statistics for the partition.
+ */
+ public PartitionStatisticsListResponse(PartitionStatisticsDTO[]
partitionStatistics) {
+ super(0);
+ this.partitionStatistics = partitionStatistics;
+ }
+
+ /** This is the constructor that is used by Jackson deserializer */
+ public PartitionStatisticsListResponse() {
+ this(null);
+ }
+
+ /**
+ * Validates the response.
+ *
+ * @throws IllegalArgumentException If the response is invalid, this
exception is thrown.
+ */
+ @Override
+ public void validate() throws IllegalArgumentException {
+ super.validate();
+ Preconditions.checkArgument(
+ partitionStatistics != null, "\"partitionStatistics\" must not be
null");
+ for (PartitionStatisticsDTO partitionStat : partitionStatistics) {
+ partitionStat.validate();
+ }
+ }
+}
diff --git
a/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsDTO.java
b/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsDTO.java
new file mode 100644
index 0000000000..3b48f3df3b
--- /dev/null
+++
b/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsDTO.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.stats;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.stats.PartitionStatistics;
+import org.apache.gravitino.stats.Statistic;
+
+/**
+ * PartitionStatisticsDTO is a Data Transfer Object (DTO) that represents the
statistics for a
+ * specific partition in a data source.
+ */
+@EqualsAndHashCode
+@ToString
+public class PartitionStatisticsDTO implements PartitionStatistics {
+
+ @JsonProperty("partitionName")
+ private String partitionName;
+
+ @JsonProperty("statistics")
+ private StatisticDTO[] statistics;
+
+ /** Default constructor for Jackson. */
+ protected PartitionStatisticsDTO() {
+ this(null, null);
+ }
+
+ private PartitionStatisticsDTO(String partitionName, StatisticDTO[]
statistics) {
+ this.partitionName = partitionName;
+ this.statistics = statistics;
+ }
+
+ @Override
+ public String partitionName() {
+ return partitionName;
+ }
+
+ @Override
+ public Statistic[] statistics() {
+ return statistics;
+ }
+
+ /** Validates the PartitionStatisticsDTO instance. */
+ public void validate() {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(partitionName), "\"partitionName\" must not be
null or empty");
+ Preconditions.checkArgument(statistics != null, "\"statistics\" must not
be null");
+ for (StatisticDTO statistic : statistics) {
+ statistic.validate();
+ }
+ }
+
+ /**
+ * Creates a new instance of PartitionStatisticsDTO.
+ *
+ * @param partitionName the name of the partition for which these statistics
are applicable
+ * @param statistics the statistics applicable to the partition
+ * @return a new instance of PartitionStatisticsDTO
+ */
+ public static PartitionStatisticsDTO of(String partitionName, StatisticDTO[]
statistics) {
+ PartitionStatisticsDTO dto = new PartitionStatisticsDTO(partitionName,
statistics);
+ dto.validate();
+ return dto;
+ }
+}
diff --git
a/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsDropDTO.java
b/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsDropDTO.java
new file mode 100644
index 0000000000..8004d8d7c4
--- /dev/null
+++
b/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsDropDTO.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.stats;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+
+/**
+ * PartitionStatisticsDropDTO is a Data Transfer Object (DTO) that represents
the request to drop
+ * statistics for a specific partition in a data source.
+ */
+@EqualsAndHashCode
+@ToString
+public class PartitionStatisticsDropDTO implements PartitionStatisticsDrop {
+
+ @JsonProperty("partitionName")
+ private final String partitionName;
+
+ @JsonProperty("statisticNames")
+ private final List<String> statisticNames;
+
+ /** Default constructor for Jackson. */
+ protected PartitionStatisticsDropDTO() {
+ this(null, null);
+ }
+
+ private PartitionStatisticsDropDTO(String partitionName, List<String>
statisticNames) {
+ this.partitionName = partitionName;
+ this.statisticNames = statisticNames;
+ }
+
+ @Override
+ public String partitionName() {
+ return partitionName;
+ }
+
+ @Override
+ public List<String> statisticNames() {
+ return statisticNames;
+ }
+
+ /** Validates the PartitionStatisticsDropDTO instance. */
+ public void validate() {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(partitionName), "\"partitionName\" must not be
null or empty");
+ Preconditions.checkArgument(
+ statisticNames != null && !statisticNames.isEmpty(),
+ "\"statisticNames\" must not be null or empty");
+ for (String statisticName : statisticNames) {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(statisticName),
+ "Each statistic \"name\" in \"statisticNames\" must not be null or
empty");
+ }
+ }
+
+ /**
+ * Creates a new instance of PartitionStatisticsDropDTO.
+ *
+ * @param partitionName the name of the partition for which these statistics
are applicable
+ * @param statisticNames the names of the statistics to drop for the
partition
+ * @return a new instance of PartitionStatisticsDropDTO
+ */
+ public static PartitionStatisticsDropDTO of(String partitionName,
List<String> statisticNames) {
+ PartitionStatisticsDropDTO dto = new
PartitionStatisticsDropDTO(partitionName, statisticNames);
+ dto.validate();
+ return dto;
+ }
+}
diff --git
a/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsUpdateDTO.java
b/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsUpdateDTO.java
new file mode 100644
index 0000000000..edaf035e28
--- /dev/null
+++
b/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsUpdateDTO.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.stats;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+
+/**
+ * PartitionStatisticsUpdateDTO is a Data Transfer Object (DTO) that
represents the request to
+ * update statistics for a specific partition in a data source.
+ */
+@EqualsAndHashCode
+@ToString
+public class PartitionStatisticsUpdateDTO implements PartitionStatisticsUpdate
{
+
+ @JsonProperty("partitionName")
+ private final String partitionName;
+
+ @JsonProperty("statistics")
+ @JsonSerialize(contentUsing = JsonUtils.StatisticValueSerializer.class)
+ @JsonDeserialize(contentUsing = JsonUtils.StatisticValueDeserializer.class)
+ private final Map<String, StatisticValue<?>> statistics;
+
+ /** Default constructor for Jackson. */
+ protected PartitionStatisticsUpdateDTO() {
+ this(null, null);
+ }
+
+ private PartitionStatisticsUpdateDTO(
+ String partitionName, Map<String, StatisticValue<?>> statistics) {
+ this.partitionName = partitionName;
+ this.statistics = statistics;
+ }
+
+ @Override
+ public String partitionName() {
+ return partitionName;
+ }
+
+ @Override
+ public Map<String, StatisticValue<?>> statistics() {
+ return statistics;
+ }
+
+ /** Validates the PartitionStatisticsUpdateDTO instance. */
+ public void validate() {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(partitionName), "\"partitionName\" must not be
null or empty");
+ Preconditions.checkArgument(
+ statistics != null && !statistics.isEmpty(), "\"statistics\" must not
be null or empty");
+ }
+
+ /**
+ * Creates a new instance of PartitionStatisticsUpdateDTO.
+ *
+ * @param partitionName the name of the partition for which these statistics
are applicable
+ * @param statistics the statistics applicable to the partition
+ * @return a new instance of PartitionStatisticsUpdateDTO
+ */
+ public static PartitionStatisticsUpdateDTO of(
+ String partitionName, Map<String, StatisticValue<?>> statistics) {
+ PartitionStatisticsUpdateDTO dto = new
PartitionStatisticsUpdateDTO(partitionName, statistics);
+ dto.validate();
+ return dto;
+ }
+}
diff --git
a/common/src/test/java/org/apache/gravitino/dto/stats/TestPartitionStatisticsDTO.java
b/common/src/test/java/org/apache/gravitino/dto/stats/TestPartitionStatisticsDTO.java
new file mode 100644
index 0000000000..ef770cb640
--- /dev/null
+++
b/common/src/test/java/org/apache/gravitino/dto/stats/TestPartitionStatisticsDTO.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.stats;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.time.Instant;
+import java.util.Optional;
+import org.apache.gravitino.dto.AuditDTO;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestPartitionStatisticsDTO {
+
+ @Test
+ public void testPartitionStatisticsDTO() throws JsonProcessingException {
+ StatisticDTO statisticDTO =
+ StatisticDTO.builder()
+ .withName("statistic_test")
+ .withValue(Optional.of(StatisticValues.longValue(100L)))
+ .withReserved(false)
+ .withModifiable(false)
+ .withAudit(
+ AuditDTO.builder()
+ .withCreator("test_user")
+ .withCreateTime(Instant.now())
+ .withLastModifier("test_user")
+ .withLastModifiedTime(Instant.now())
+ .build())
+ .build();
+
+ PartitionStatisticsDTO partitionStatisticsDTO =
+ PartitionStatisticsDTO.of("test_partition", new StatisticDTO[]
{statisticDTO});
+
+ String serJson =
JsonUtils.objectMapper().writeValueAsString(partitionStatisticsDTO);
+ PartitionStatisticsDTO deserPartitionStatisticsDTO =
+ JsonUtils.objectMapper().readValue(serJson,
PartitionStatisticsDTO.class);
+ Assertions.assertEquals(partitionStatisticsDTO,
deserPartitionStatisticsDTO);
+ }
+}
diff --git a/core/src/main/java/org/apache/gravitino/Configs.java
b/core/src/main/java/org/apache/gravitino/Configs.java
index 829f6330bf..63681fec1d 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -28,6 +28,7 @@ import org.apache.gravitino.audit.v2.SimpleFormatterV2;
import org.apache.gravitino.config.ConfigBuilder;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.config.ConfigEntry;
+import org.apache.gravitino.stats.storage.MemoryPartitionStatsStorageFactory;
public class Configs {
@@ -445,4 +446,12 @@ public class Configs {
.longConf()
.checkValue(value -> value > 0,
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
.createWithDefault(5 * 60 * 1000L); // Default is 5 minutes
+
+ // TODO: Change default value to a Lance partition statistics storage
factory class
+ public static final ConfigEntry<String>
PARTITION_STATS_STORAGE_FACTORY_CLASS =
+ new ConfigBuilder("gravitino.stats.partition.storageFactoryClass")
+ .doc("The partition stats storage factory class.")
+ .version(ConfigConstants.VERSION_1_0_0)
+ .stringConf()
+
.createWithDefault(MemoryPartitionStatsStorageFactory.class.getCanonicalName());
}
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index 591542a36d..7b5929d5e5 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -472,6 +472,14 @@ public class GravitinoEnv {
}
}
+ if (statisticManager != null) {
+ try {
+ statisticManager.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close StatisticManager", e);
+ }
+ }
+
LOG.info("Gravitino Environment is shut down.");
}
@@ -563,7 +571,7 @@ public class GravitinoEnv {
ModelNormalizeDispatcher modelNormalizeDispatcher =
new ModelNormalizeDispatcher(modelHookDispatcher, catalogManager);
this.modelDispatcher = new ModelEventDispatcher(eventBus,
modelNormalizeDispatcher);
- this.statisticManager = new StatisticManager(entityStore, idGenerator);
+ this.statisticManager = new StatisticManager(entityStore, idGenerator,
config);
// Create and initialize access control related modules
boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
diff --git
a/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
index b62527dca6..44d1a80b76 100644
--- a/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
+++ b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.stats;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
@@ -28,6 +29,8 @@ import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.gravitino.Audit;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.MetadataObject;
@@ -40,6 +43,11 @@ import org.apache.gravitino.lock.LockType;
import org.apache.gravitino.lock.TreeLockUtils;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.StatisticEntity;
+import org.apache.gravitino.stats.storage.MetadataObjectStatisticsDrop;
+import org.apache.gravitino.stats.storage.MetadataObjectStatisticsUpdate;
+import org.apache.gravitino.stats.storage.PartitionStatisticStorage;
+import org.apache.gravitino.stats.storage.PartitionStatisticStorageFactory;
+import org.apache.gravitino.stats.storage.PersistedPartitionStatistics;
import org.apache.gravitino.storage.IdGenerator;
import org.apache.gravitino.utils.Executable;
import org.apache.gravitino.utils.MetadataObjectUtil;
@@ -48,17 +56,34 @@ import org.apache.gravitino.utils.PrincipalUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StatisticManager {
+public class StatisticManager implements Closeable {
+ private static final String OPTIONS_PREFIX =
"gravitino.stats.partition.option.";
private static final Logger LOG =
LoggerFactory.getLogger(StatisticManager.class);
private final EntityStore store;
private final IdGenerator idGenerator;
+ private final PartitionStatisticStorage partitionStorage;
- public StatisticManager(EntityStore store, IdGenerator idGenerator) {
+ public StatisticManager(EntityStore store, IdGenerator idGenerator, Config
config) {
this.store = store;
this.idGenerator = idGenerator;
+ String className =
config.get(Configs.PARTITION_STATS_STORAGE_FACTORY_CLASS);
+ Map<String, String> options = config.getConfigsWithPrefix(OPTIONS_PREFIX);
+ try {
+ PartitionStatisticStorageFactory factory =
+ (PartitionStatisticStorageFactory)
+ Class.forName(className).getDeclaredConstructor().newInstance();
+ this.partitionStorage = factory.create(options);
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to create and initialize partition statistics storage
factory by name {}.",
+ className,
+ e);
+ throw new RuntimeException(
+ "Failed to create and initialize partition statistics storage
factory: " + className, e);
+ }
}
public List<Statistic> listStatistics(String metalake, MetadataObject
metadataObject) {
@@ -182,9 +207,110 @@ public class StatisticManager {
}
}
+ public boolean dropPartitionStatistics(
+ String metalake,
+ MetadataObject metadataObject,
+ List<PartitionStatisticsDrop> partitionStatistics) {
+ try {
+ List<MetadataObjectStatisticsDrop> partitionStatisticsToDrop =
+ Lists.newArrayList(MetadataObjectStatisticsDrop.of(metadataObject,
partitionStatistics));
+ NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake,
metadataObject);
+
+ return TreeLockUtils.doWithTreeLock(
+ identifier,
+ LockType.WRITE,
+ () -> partitionStorage.dropStatistics(metalake,
partitionStatisticsToDrop))
+ != 0;
+ } catch (IOException ioe) {
+ LOG.error(
+ "Failed to drop partition statistics for {} in metalake {}.",
+ metadataObject,
+ metalake,
+ ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ public void updatePartitionStatistics(
+ String metalake,
+ MetadataObject metadataObject,
+ List<PartitionStatisticsUpdate> partitionStatistics) {
+ try {
+ NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake,
metadataObject);
+
+ List<MetadataObjectStatisticsUpdate> statisticsToUpdate =
Lists.newArrayList();
+ statisticsToUpdate.add(
+ MetadataObjectStatisticsUpdate.of(metadataObject,
partitionStatistics));
+ TreeLockUtils.doWithTreeLock(
+ identifier,
+ LockType.WRITE,
+ (Executable<Void, IOException>)
+ () -> {
+ partitionStorage.updateStatistics(metalake,
statisticsToUpdate);
+ return null;
+ });
+ } catch (IOException ioe) {
+ LOG.error(
+ "Failed to update partition statistics for {} in metalake {}.",
+ metadataObject,
+ metalake,
+ ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ public List<PartitionStatistics> listPartitionStatistics(
+ String metalake, MetadataObject metadataObject, PartitionRange range) {
+ try {
+ NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake,
metadataObject);
+
+ List<PersistedPartitionStatistics> partitionStats =
+ TreeLockUtils.doWithTreeLock(
+ identifier,
+ LockType.READ,
+ () -> partitionStorage.listStatistics(metalake, metadataObject,
range));
+
+ List<PartitionStatistics> listedStats = Lists.newArrayList();
+
+ if (partitionStats == null || partitionStats.isEmpty()) {
+ return listedStats;
+ }
+
+ return partitionStats.stream()
+ .map(
+ partitionStat -> {
+ String partitionName = partitionStat.partitionName();
+ Statistic[] statistics =
+ partitionStat.statistics().stream()
+ .map(
+ entry -> {
+ String statName = entry.name();
+ StatisticValue<?> value = entry.value();
+ AuditInfo auditInfo = entry.auditInfo();
+ return new CustomStatistic(statName, value,
auditInfo);
+ })
+ .toArray(Statistic[]::new);
+
+ return new CustomPartitionStatistic(partitionName, statistics);
+ })
+ .collect(Collectors.toList());
+ } catch (IOException ioe) {
+ LOG.error(
+ "Failed to list partition statistics for {} in metalake {}.",
+ metadataObject,
+ metalake,
+ ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ partitionStorage.close();
+ }
+
@VisibleForTesting
public static class CustomStatistic implements Statistic {
-
private final String name;
private final StatisticValue<?> value;
private final Audit auditInfo;
@@ -220,4 +346,26 @@ public class StatisticManager {
return auditInfo;
}
}
+
+ @VisibleForTesting
+ public static class CustomPartitionStatistic implements PartitionStatistics {
+
+ private final String partitionName;
+ private final Statistic[] statistics;
+
+ public CustomPartitionStatistic(String partitionName, Statistic[]
statistics) {
+ this.partitionName = partitionName;
+ this.statistics = statistics;
+ }
+
+ @Override
+ public String partitionName() {
+ return partitionName;
+ }
+
+ @Override
+ public Statistic[] statistics() {
+ return statistics;
+ }
+ }
}
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/MemoryPartitionStatsStorageFactory.java
b/core/src/main/java/org/apache/gravitino/stats/storage/MemoryPartitionStatsStorageFactory.java
index 97f1297385..b3684118f4 100644
---
a/core/src/main/java/org/apache/gravitino/stats/storage/MemoryPartitionStatsStorageFactory.java
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/MemoryPartitionStatsStorageFactory.java
@@ -20,18 +20,22 @@ package org.apache.gravitino.stats.storage;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import java.io.IOException;
-import java.util.HashMap;
+import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.stats.PartitionRange;
import org.apache.gravitino.stats.PartitionStatisticsDrop;
import org.apache.gravitino.stats.PartitionStatisticsUpdate;
import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.utils.PrincipalUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +67,7 @@ public class MemoryPartitionStatsStorageFactory implements
PartitionStatisticSto
return Lists.newArrayList();
}
- Map<String, Map<String, StatisticValue<?>>> resultStats =
Maps.newHashMap();
+ Map<String, List<PersistedStatistic>> resultStats = Maps.newHashMap();
for (PersistedPartitionStatistics partitionStat :
tableStats.partitionStatistics().values()) {
String partitionName = partitionStat.partitionName();
boolean lowerBoundSatisfied =
@@ -81,7 +85,7 @@ public class MemoryPartitionStatsStorageFactory implements
PartitionStatisticSto
BoundDirection.UPPER);
if (lowerBoundSatisfied && upperBoundSatisfied) {
- resultStats.put(partitionName,
Maps.newHashMap(partitionStat.statistics()));
+ resultStats.put(partitionName,
Lists.newArrayList(partitionStat.statistics()));
}
}
return resultStats.entrySet().stream()
@@ -121,11 +125,45 @@ public class MemoryPartitionStatsStorageFactory
implements PartitionStatisticSto
.partitionStatistics()
.computeIfAbsent(
partitionName,
- k -> PersistedPartitionStatistics.of(partitionName, new
HashMap<>()));
+ k -> PersistedPartitionStatistics.of(partitionName,
Lists.newArrayList()));
+
+ Set<String> existedStats = Sets.newHashSet();
+ // Update existed stats
+ existedPartitionStats
+ .statistics()
+ .replaceAll(
+ stat -> {
+ String statName = stat.name();
+ if (partitionStats.containsKey(statName)) {
+ existedStats.add(statName);
+ StatisticValue<?> newValue =
partitionStats.get(statName);
+ AuditInfo auditInfo =
+ AuditInfo.builder()
+ .withCreator(stat.auditInfo().creator())
+ .withCreateTime(stat.auditInfo().createTime())
+ .withLastModifiedTime(Instant.now())
+
.withLastModifier(PrincipalUtils.getCurrentUserName())
+ .build();
+ return PersistedStatistic.of(statName, newValue,
auditInfo);
+ }
+ return stat;
+ });
+
+ // Add new stats
for (Map.Entry<String, StatisticValue<?>> statEntry :
partitionStats.entrySet()) {
String statName = statEntry.getKey();
- StatisticValue<?> statValue = statEntry.getValue();
- existedPartitionStats.statistics().put(statName, statValue);
+ if (!existedStats.contains(statName)) {
+ AuditInfo auditInfo =
+ AuditInfo.builder()
+ .withCreator(PrincipalUtils.getCurrentUserName())
+ .withCreateTime(Instant.now())
+ .withLastModifiedTime(Instant.now())
+ .withLastModifier(PrincipalUtils.getCurrentUserName())
+ .build();
+ PersistedStatistic newStat =
+ PersistedStatistic.of(statName, statEntry.getValue(),
auditInfo);
+ existedPartitionStats.statistics().add(newStat);
+ }
}
}
}
@@ -153,14 +191,15 @@ public class MemoryPartitionStatsStorageFactory
implements PartitionStatisticSto
if
(tableStats.partitionStatistics().containsKey(partStats.partitionName())) {
PersistedPartitionStatistics persistedPartitionStatistics =
tableStats.partitionStatistics().get(partStats.partitionName());
- for (String statName : partStats.statisticNames()) {
- Map<String, StatisticValue<?>> statisticValueMap =
- persistedPartitionStatistics.statistics();
- if (statisticValueMap.containsKey(statName)) {
- statisticValueMap.remove(statName);
- deleteCount++;
- }
- }
+ Set<String> statsNamesToDelete =
Sets.newHashSet(partStats.statisticNames());
+ int originCount = persistedPartitionStatistics.statistics().size();
+ persistedPartitionStatistics
+ .statistics()
+ .removeIf(
+ persistedStatistic ->
statsNamesToDelete.contains(persistedStatistic.name()));
+
+ deleteCount =
+ deleteCount + (originCount -
persistedPartitionStatistics.statistics().size());
if (persistedPartitionStatistics.statistics().isEmpty()) {
tableStats.partitionStatistics().remove(partStats.partitionName());
}
@@ -223,7 +262,7 @@ public class MemoryPartitionStatsStorageFactory implements
PartitionStatisticSto
boolean compare(
String targetPartitionName, String partitionName,
PartitionRange.BoundType type) {
int result = targetPartitionName.compareTo(partitionName);
- return type == PartitionRange.BoundType.OPEN ? result > 0 : result
>= 0;
+ return type == PartitionRange.BoundType.OPEN ? result < 0 : result
<= 0;
}
},
UPPER {
@@ -231,7 +270,7 @@ public class MemoryPartitionStatsStorageFactory implements
PartitionStatisticSto
boolean compare(
String targetPartitionName, String partitionName,
PartitionRange.BoundType type) {
int result = targetPartitionName.compareTo(partitionName);
- return type == PartitionRange.BoundType.OPEN ? result < 0 : result
<= 0;
+ return type == PartitionRange.BoundType.OPEN ? result > 0 : result
>= 0;
}
};
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorage.java
b/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorage.java
index ca64fa48c7..64acf8c2fe 100644
---
a/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorage.java
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorage.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.stats.storage;
import java.io.Closeable;
+import java.io.IOException;
import java.util.List;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.stats.PartitionRange;
@@ -39,7 +40,8 @@ public interface PartitionStatisticStorage extends Closeable {
* name
*/
List<PersistedPartitionStatistics> listStatistics(
- String metalake, MetadataObject metadataObject, PartitionRange
partitionRange);
+ String metalake, MetadataObject metadataObject, PartitionRange
partitionRange)
+ throws IOException;
/**
* Lists statistics for a given metadata object and specific partition
names. This interface may
@@ -55,7 +57,8 @@ public interface PartitionStatisticStorage extends Closeable {
* name
*/
default List<PersistedPartitionStatistics> listStatistics(
- String metalake, MetadataObject metadataObject, List<String>
partitionNames) {
+ String metalake, MetadataObject metadataObject, List<String>
partitionNames)
+ throws IOException {
throw new UnsupportedOperationException(
"Don't support listStatistics with partition names yet.");
}
@@ -73,19 +76,21 @@ public interface PartitionStatisticStorage extends
Closeable {
* @return the number of statistics dropped, which may be less than the size
of the input list if
* some statistics do not exist or cannot be dropped.
*/
- int dropStatistics(String metalake, List<MetadataObjectStatisticsDrop>
partitionStatisticsToDrop);
+ int dropStatistics(String metalake, List<MetadataObjectStatisticsDrop>
partitionStatisticsToDrop)
+ throws IOException;
/**
* Updates statistics for a given metadata object. If the statistic exists,
it will be updated; If
* the statistic doesn't exist, it will be created. Locking guarantee: The
upper layer will
* acquire a write lock at the metadata object level. For example, if the
metadata object is a
- * table, the write lock of the table level will be held. The concrete
implementation * may
- * perform partial drops, meaning that the underlying storage system may not
support transactional
+ * table, the write lock of the table level will be held. The concrete
implementation may perform
+ * partial updates, meaning that the underlying storage system may not
support transactional
* update.
*
* @param metalake the name of the metalake
* @param statisticsToUpdate a list of {@link
MetadataObjectStatisticsUpdate} objects, each
* containing the metadata object and its associated statistics updates.
*/
- void updateStatistics(String metalake, List<MetadataObjectStatisticsUpdate>
statisticsToUpdate);
+ void updateStatistics(String metalake, List<MetadataObjectStatisticsUpdate>
statisticsToUpdate)
+ throws IOException;
}
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/PersistedPartitionStatistics.java
b/core/src/main/java/org/apache/gravitino/stats/storage/PersistedPartitionStatistics.java
index 0e3b647ae6..241ec6084e 100644
---
a/core/src/main/java/org/apache/gravitino/stats/storage/PersistedPartitionStatistics.java
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/PersistedPartitionStatistics.java
@@ -18,25 +18,24 @@
*/
package org.apache.gravitino.stats.storage;
-import java.util.Map;
-import org.apache.gravitino.stats.StatisticValue;
+import java.util.List;
/** Represents a collection of statistics for a specific partition in a
metadata object. */
public class PersistedPartitionStatistics {
private final String partitionName;
- private final Map<String, StatisticValue<?>> statistics;
+ private final List<PersistedStatistic> statistics;
/**
* Creates an instance of {@link PersistedPartitionStatistics}.
*
* @param partitionName the name of the partition for which these statistics
are applicable
- * @param statistics a map of statistics applicable to the partition, where
the key is the
- * statistic name
+ * @param statistics a list of statistics applicable to the partition, where
each statistic is
+ * represented by a {@link PersistedStatistic} object
* @return a new instance of {@link PersistedPartitionStatistics}
*/
public static PersistedPartitionStatistics of(
- String partitionName, Map<String, StatisticValue<?>> statistics) {
+ String partitionName, List<PersistedStatistic> statistics) {
return new PersistedPartitionStatistics(partitionName, statistics);
}
@@ -44,11 +43,10 @@ public class PersistedPartitionStatistics {
* Private constructor for {@link PersistedPartitionStatistics}.
*
* @param partitionName the name of the partition for which these statistics
are applicable
- * @param statistics a map of statistics applicable to the partition, where
the key is the
- * statistic name
+ * @param statistics a list of statistics applicable to the partition, where
each statistic is
+ * represented by a {@link PersistedStatistic} object
*/
- private PersistedPartitionStatistics(
- String partitionName, Map<String, StatisticValue<?>> statistics) {
+ private PersistedPartitionStatistics(String partitionName,
List<PersistedStatistic> statistics) {
this.partitionName = partitionName;
this.statistics = statistics;
}
@@ -63,11 +61,12 @@ public class PersistedPartitionStatistics {
}
/**
- * Returns the statistics for the partition.
+ * Returns the statistics applicable to the partition.
*
- * @return a map of statistics applicable to the partition, where the key is
the statistic name
+ * @return a list of {@link PersistedStatistic} objects, each representing a
statistic for the
+ * partition
*/
- public Map<String, StatisticValue<?>> statistics() {
+ public List<PersistedStatistic> statistics() {
return statistics;
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/PersistedStatistic.java
b/core/src/main/java/org/apache/gravitino/stats/storage/PersistedStatistic.java
new file mode 100644
index 0000000000..2c87dc3bcd
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/PersistedStatistic.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.stats.StatisticValue;
+
+/** Represents a persisted statistic with its name, value, and associated
audit information. */
+public class PersistedStatistic {
+
+ private final String name;
+ private final StatisticValue<?> value;
+ private final AuditInfo auditInfo;
+
+ /**
+ * Creates an instance of {@link PersistedStatistic}.
+ *
+ * @param name the name of the statistic
+ * @param value the value of the statistic
+ * @param auditInfo the audit information associated with the statistic
+ * @return a new instance of {@link PersistedStatistic}
+ */
+ public static PersistedStatistic of(String name, StatisticValue<?> value,
AuditInfo auditInfo) {
+ return new PersistedStatistic(name, value, auditInfo);
+ }
+
+ /**
+ * Private constructor for {@link PersistedStatistic}.
+ *
+ * @param name the name of the statistic
+ * @param value the value of the statistic
+ * @param auditInfo the audit information associated with the statistic
+ */
+ private PersistedStatistic(String name, StatisticValue<?> value, AuditInfo
auditInfo) {
+ this.name = name;
+ this.value = value;
+ this.auditInfo = auditInfo;
+ }
+
+ /**
+ * Returns the name of the statistic.
+ *
+ * @return the name of the statistic
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Returns the value of the statistic.
+ *
+ * @return the value of the statistic
+ */
+ public StatisticValue<?> value() {
+ return value;
+ }
+
+ /**
+ * Returns the audit information associated with the statistic.
+ *
+ * @return the audit information
+ */
+ public AuditInfo auditInfo() {
+ return auditInfo;
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
b/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
index e430d9486c..03cb28245d 100644
--- a/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
+++ b/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
@@ -63,6 +63,7 @@ import org.apache.gravitino.meta.SchemaEntity;
import org.apache.gravitino.meta.SchemaVersion;
import org.apache.gravitino.meta.TableEntity;
import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.stats.storage.MemoryPartitionStatsStorageFactory;
import org.apache.gravitino.storage.IdGenerator;
import org.apache.gravitino.storage.RandomIdGenerator;
import org.junit.jupiter.api.AfterAll;
@@ -115,6 +116,8 @@ public class TestStatisticManager {
Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+ Mockito.when(config.get(Configs.PARTITION_STATS_STORAGE_FACTORY_CLASS))
+
.thenReturn(MemoryPartitionStatsStorageFactory.class.getCanonicalName());
Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
@@ -193,7 +196,7 @@ public class TestStatisticManager {
@Test
public void testStatisticLifeCycle() {
- StatisticManager statisticManager = new StatisticManager(entityStore,
idGenerator);
+ StatisticManager statisticManager = new StatisticManager(entityStore,
idGenerator, config);
MetadataObject tableObject =
MetadataObjects.of(Lists.newArrayList(CATALOG, SCHEMA, TABLE),
MetadataObject.Type.TABLE);
@@ -284,4 +287,107 @@ public class TestStatisticManager {
NoSuchMetadataObjectException.class,
() -> statisticManager.dropStatistics(METALAKE, notExistObject,
statNames));
}
+
+ @Test
+ public void testPartitionStatisticLifeCycle() {
+ StatisticManager statisticManager = new StatisticManager(entityStore,
idGenerator, config);
+
+ MetadataObject tableObject =
+ MetadataObjects.of(Lists.newArrayList(CATALOG, SCHEMA, TABLE),
MetadataObject.Type.TABLE);
+ Map<String, StatisticValue<?>> stats = Maps.newHashMap();
+ // Update statistics
+ stats.put("a", StatisticValues.stringValue("1"));
+ stats.put("b", StatisticValues.longValue(1L));
+ stats.put("c", StatisticValues.doubleValue(1.0));
+ stats.put("d", StatisticValues.booleanValue(true));
+ stats.put(
+ "e",
+ StatisticValues.listValue(
+ Lists.newArrayList(
+ StatisticValues.stringValue("1"),
StatisticValues.stringValue("2"))));
+
+ Map<String, StatisticValue<?>> map = Maps.newHashMap();
+ map.put("x", StatisticValues.stringValue("1"));
+ map.put("y", StatisticValues.longValue(2L));
+ stats.put("f", StatisticValues.objectValue(map));
+ List<PartitionStatisticsUpdate> partitionStatistics = Lists.newArrayList();
+ partitionStatistics.add(PartitionStatisticsModification.update("p0",
stats));
+ statisticManager.updatePartitionStatistics(METALAKE, tableObject,
partitionStatistics);
+
+ List<PartitionStatistics> statistics =
+ statisticManager.listPartitionStatistics(
+ METALAKE,
+ tableObject,
+ PartitionRange.between(
+ "p0", PartitionRange.BoundType.CLOSED, "p1",
PartitionRange.BoundType.OPEN));
+ Assertions.assertEquals(1, statistics.size());
+ PartitionStatistics listedPartitionStats = statistics.get(0);
+ Assertions.assertEquals(6, listedPartitionStats.statistics().length);
+ for (Statistic statistic : listedPartitionStats.statistics()) {
+ Assertions.assertTrue(
+ stats.containsKey(statistic.name()),
+ "Statistic name should be in the updated stats: " +
statistic.name());
+ StatisticValue<?> value = stats.get(statistic.name());
+ Assertions.assertEquals(
+ value, statistic.value().get(), "Statistic value type mismatch: " +
statistic.name());
+ }
+
+ // Update partial statistics
+ Map<String, StatisticValue<?>> expectStats = Maps.newHashMap();
+ expectStats.putAll(stats);
+ stats.clear();
+ stats.put("f", StatisticValues.longValue(2L));
+ stats.put("x", StatisticValues.longValue(2L));
+ partitionStatistics.clear();
+ partitionStatistics.add(PartitionStatisticsModification.update("p0",
stats));
+
+ expectStats.put("f", StatisticValues.longValue(2L));
+ expectStats.put("x", StatisticValues.longValue(2));
+
+ statisticManager.updatePartitionStatistics(METALAKE, tableObject,
partitionStatistics);
+ statistics =
+ statisticManager.listPartitionStatistics(
+ METALAKE,
+ tableObject,
+ PartitionRange.between(
+ "p0", PartitionRange.BoundType.CLOSED, "p1",
PartitionRange.BoundType.OPEN));
+ Assertions.assertEquals(1, statistics.size());
+ listedPartitionStats = statistics.get(0);
+ Assertions.assertEquals(7, listedPartitionStats.statistics().length);
+ for (Statistic statistic : listedPartitionStats.statistics()) {
+ Assertions.assertTrue(
+ expectStats.containsKey(statistic.name()),
+ "Statistic name should be in the updated stats: " +
statistic.name());
+ StatisticValue<?> value = expectStats.get(statistic.name());
+ Assertions.assertEquals(
+ value, statistic.value().get(), "Statistic value type mismatch: " +
statistic.name());
+ }
+
+ // Drop statistics
+ expectStats.remove("a");
+ expectStats.remove("b");
+ expectStats.remove("c");
+ List<String> statNames = Lists.newArrayList("a", "b", "c");
+ List<PartitionStatisticsDrop> partitionStatisticsToDrop =
Lists.newArrayList();
+ partitionStatisticsToDrop.add(PartitionStatisticsModification.drop("p0",
statNames));
+ statisticManager.dropPartitionStatistics(METALAKE, tableObject,
partitionStatisticsToDrop);
+ statistics =
+ statisticManager.listPartitionStatistics(
+ METALAKE,
+ tableObject,
+ PartitionRange.between(
+ "p0", PartitionRange.BoundType.CLOSED, "p1",
PartitionRange.BoundType.OPEN));
+ Assertions.assertEquals(1, statistics.size());
+ listedPartitionStats = statistics.get(0);
+ Assertions.assertEquals(4, listedPartitionStats.statistics().length);
+
+ for (Statistic statistic : listedPartitionStats.statistics()) {
+ Assertions.assertTrue(
+ expectStats.containsKey(statistic.name()),
+ "Statistic name should be in the updated stats: " +
statistic.name());
+ StatisticValue<?> value = expectStats.get(statistic.name());
+ Assertions.assertEquals(
+ value, statistic.value().get(), "Statistic value type mismatch: " +
statistic.name());
+ }
+ }
}
diff --git
a/core/src/test/java/org/apache/gravitino/stats/storage/TestMemoryPartitionStatsStorage.java
b/core/src/test/java/org/apache/gravitino/stats/storage/TestMemoryPartitionStatsStorage.java
index 4e60ff4dec..980b7f22ca 100644
---
a/core/src/test/java/org/apache/gravitino/stats/storage/TestMemoryPartitionStatsStorage.java
+++
b/core/src/test/java/org/apache/gravitino/stats/storage/TestMemoryPartitionStatsStorage.java
@@ -27,6 +27,7 @@ import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.MetadataObjects;
import org.apache.gravitino.stats.PartitionRange;
import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsModification;
import org.apache.gravitino.stats.PartitionStatisticsUpdate;
import org.apache.gravitino.stats.StatisticValue;
import org.apache.gravitino.stats.StatisticValues;
@@ -52,25 +53,86 @@ public class TestMemoryPartitionStatsStorage {
Map<String, StatisticValue<?>> statistics = Maps.newHashMap();
statistics.put("k1", StatisticValues.stringValue("v1"));
- PartitionStatisticsUpdate update = PartitionStatisticsUpdate.of("p0",
statistics);
+ PartitionStatisticsUpdate updateP0 =
PartitionStatisticsModification.update("p0", statistics);
+ PartitionStatisticsUpdate updateP1 =
PartitionStatisticsModification.update("p1", statistics);
MetadataObjectStatisticsUpdate metadataObjectStatisticsUpdate =
- MetadataObjectStatisticsUpdate.of(metadataObject,
Lists.newArrayList(update));
+ MetadataObjectStatisticsUpdate.of(metadataObject,
Lists.newArrayList(updateP0, updateP1));
storage.updateStatistics("metalake",
Lists.newArrayList(metadataObjectStatisticsUpdate));
+ // case 1: closed lower bound
+ stats =
+ storage.listStatistics(
+ "metalake",
+ metadataObject,
+ PartitionRange.downTo("p0", PartitionRange.BoundType.CLOSED));
+ Assertions.assertEquals(2, stats.size());
+
+ // case 2: open upper bound
+ stats =
+ storage.listStatistics(
+ "metalake", metadataObject, PartitionRange.upTo("p0",
PartitionRange.BoundType.OPEN));
+ Assertions.assertEquals(0, stats.size());
+
+ // case 3: open lower bound
+ stats =
+ storage.listStatistics(
+ "metalake",
+ metadataObject,
+ PartitionRange.downTo("p0", PartitionRange.BoundType.OPEN));
+ Assertions.assertEquals(1, stats.size());
+
+ // case 4: closed upper bound
stats =
storage.listStatistics(
"metalake",
metadataObject,
PartitionRange.upTo("p0", PartitionRange.BoundType.CLOSED));
Assertions.assertEquals(1, stats.size());
- Assertions.assertEquals(stats.get(0).partitionName(), "p0");
- Map<String, StatisticValue<?>> partitionStats =
stats.get(0).statistics();
+ List<PersistedStatistic> partitionStats = stats.get(0).statistics();
+
Assertions.assertEquals(1, partitionStats.size());
- Assertions.assertTrue(partitionStats.containsKey("k1"));
- StatisticValue<?> value = partitionStats.get("k1");
+ Assertions.assertEquals(partitionStats.get(0).name(), "k1");
+ StatisticValue<?> value = partitionStats.get(0).value();
Assertions.assertEquals(StatisticValues.stringValue("v1"), value);
- PartitionStatisticsDrop drop = PartitionStatisticsDrop.of("p0",
Lists.newArrayList("k1"));
+ // case 5: between p0 and p1 with [closed, open)
+ stats =
+ storage.listStatistics(
+ "metalake",
+ metadataObject,
+ PartitionRange.between(
+ "p0", PartitionRange.BoundType.CLOSED, "p1",
PartitionRange.BoundType.OPEN));
+ Assertions.assertEquals(1, stats.size());
+
+ // case 6: between p0 and p1 with (open, closed]
+ stats =
+ storage.listStatistics(
+ "metalake",
+ metadataObject,
+ PartitionRange.between(
+ "p0", PartitionRange.BoundType.OPEN, "p1",
PartitionRange.BoundType.CLOSED));
+ Assertions.assertEquals(1, stats.size());
+
+ // case 7: between p0 and p1 with (open, open)
+ stats =
+ storage.listStatistics(
+ "metalake",
+ metadataObject,
+ PartitionRange.between(
+ "p0", PartitionRange.BoundType.OPEN, "p1",
PartitionRange.BoundType.OPEN));
+ Assertions.assertEquals(0, stats.size());
+
+ // case 8: between p0 and p1 with [closed, closed]
+ stats =
+ storage.listStatistics(
+ "metalake",
+ metadataObject,
+ PartitionRange.between(
+ "p0", PartitionRange.BoundType.CLOSED, "p1",
PartitionRange.BoundType.CLOSED));
+ Assertions.assertEquals(2, stats.size());
+
+ PartitionStatisticsDrop drop =
+ PartitionStatisticsModification.drop("p0", Lists.newArrayList("k1"));
List<PartitionStatisticsDrop> drops = Lists.newArrayList(drop);
List<MetadataObjectStatisticsDrop> partitionStatisticsToDrop =
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
index 7b5ed17336..a9dc5ff630 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
@@ -193,6 +193,11 @@ public class ExceptionHandlers {
return StatisticExceptionHandler.INSTANCE.handle(type, name, parent, e);
}
+ public static Response handlePartitionStatsException(
+ OperationType type, String name, String parent, Exception e) {
+ return PartitionStatsExceptionHandler.INSTANCE.handle(type, name, parent,
e);
+ }
+
private static class PartitionExceptionHandler extends BaseExceptionHandler {
private static final ExceptionHandler INSTANCE = new
PartitionExceptionHandler();
@@ -940,6 +945,35 @@ public class ExceptionHandlers {
}
}
+ private static class PartitionStatsExceptionHandler extends
BaseExceptionHandler {
+
+ private static final ExceptionHandler INSTANCE = new
PartitionStatsExceptionHandler();
+
+ private static String getPartitionStatsErrorMsg(
+ String partition, String operation, String table, String reason) {
+ return String.format(
+ "Failed to operate partition stats%s operation [%s] of table [%s],
reason [%s]",
+ partition, operation, table, reason);
+ }
+
+ @Override
+ public Response handle(OperationType op, String partition, String table,
Exception e) {
+ String formatted = StringUtil.isBlank(partition) ? "" : " [" + partition
+ "]";
+ String errorMsg = getPartitionStatsErrorMsg(formatted, op.name(), table,
getErrorMsg(e));
+ LOG.warn(errorMsg, e);
+
+ if (e instanceof IllegalArgumentException) {
+ return Utils.illegalArguments(errorMsg, e);
+
+ } else if (e instanceof NotFoundException) {
+ return Utils.notFound(errorMsg, e);
+
+ } else {
+ return super.handle(op, partition, table, e);
+ }
+ }
+ }
+
@VisibleForTesting
static class BaseExceptionHandler extends ExceptionHandler {
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/rest/StatisticOperations.java
b/server/src/main/java/org/apache/gravitino/server/web/rest/StatisticOperations.java
index 2ddcfc2e50..a7ef188bfb 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/rest/StatisticOperations.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/rest/StatisticOperations.java
@@ -20,35 +20,48 @@ package org.apache.gravitino.server.web.rest;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.dto.requests.PartitionStatisticsDropRequest;
+import org.apache.gravitino.dto.requests.PartitionStatisticsUpdateRequest;
import org.apache.gravitino.dto.requests.StatisticsDropRequest;
import org.apache.gravitino.dto.requests.StatisticsUpdateRequest;
import org.apache.gravitino.dto.responses.BaseResponse;
import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.PartitionStatisticsListResponse;
import org.apache.gravitino.dto.responses.StatisticListResponse;
+import org.apache.gravitino.dto.stats.PartitionStatisticsDTO;
+import org.apache.gravitino.dto.stats.PartitionStatisticsDropDTO;
+import org.apache.gravitino.dto.stats.PartitionStatisticsUpdateDTO;
import org.apache.gravitino.dto.util.DTOConverters;
import org.apache.gravitino.exceptions.IllegalStatisticNameException;
import org.apache.gravitino.metrics.MetricNames;
import org.apache.gravitino.server.web.Utils;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatistics;
+import org.apache.gravitino.stats.PartitionStatisticsModification;
import org.apache.gravitino.stats.Statistic;
import org.apache.gravitino.stats.StatisticManager;
import org.apache.gravitino.stats.StatisticValue;
@@ -200,4 +213,247 @@ public class StatisticOperations {
OperationType.DROP, StringUtils.join(request.getNames(), ","),
fullName, e);
}
}
+
+ @GET
+ @Path("/partitions")
+ @Produces("application/vnd.gravitino.v1+json")
+ @Timed(name = "list-partition-stats." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
+ @ResponseMetered(name = "list-partition-stats", absolute = true)
+ public Response listPartitionStatistics(
+ @PathParam("metalake") String metalake,
+ @PathParam("type") String type,
+ @PathParam("fullName") String fullName,
+ @QueryParam("from") String fromPartitionName,
+ @QueryParam("to") String toPartitionName,
+ @QueryParam("fromInclusive") @DefaultValue("true") boolean fromInclusive,
+ @QueryParam("toInclusive") @DefaultValue("false") boolean toInclusive) {
+
+ String formattedFromPartitionName =
+ getFormattedFromPartitionName(fromPartitionName, fromInclusive);
+ String formattedToPartitionName =
getFormattedToPartitionName(toPartitionName, toInclusive);
+
+ LOG.info(
+ "Listing partition statistics for table: {} in the metalake {} from {}
to {}",
+ fullName,
+ metalake,
+ formattedFromPartitionName,
+ formattedToPartitionName);
+ try {
+ return Utils.doAs(
+ httpRequest,
+ () -> {
+ MetadataObject object =
+ MetadataObjects.parse(
+ fullName,
MetadataObject.Type.valueOf(type.toUpperCase(Locale.ROOT)));
+ if (object.type() != MetadataObject.Type.TABLE) {
+ throw new UnsupportedOperationException(
+ "Listing partition statistics is only supported for tables
now.");
+ }
+
+ if (fromPartitionName == null && toPartitionName == null) {
+ throw new IllegalArgumentException(
+ "Both 'from' and 'to' parameters cannot be null at the same
time.");
+ }
+
+ MetadataObjectUtil.checkMetadataObject(metalake, object);
+
+ PartitionRange range;
+ PartitionRange.BoundType fromBoundType =
getFromBoundType(fromInclusive);
+ PartitionRange.BoundType toBoundType =
getFromBoundType(toInclusive);
+ if (fromPartitionName != null && toPartitionName != null) {
+ range =
+ PartitionRange.between(
+ fromPartitionName, fromBoundType, toPartitionName,
toBoundType);
+ } else if (fromPartitionName != null) {
+ range = PartitionRange.downTo(fromPartitionName, fromBoundType);
+ } else {
+ range = PartitionRange.upTo(toPartitionName, toBoundType);
+ }
+
+ List<PartitionStatistics> statistics =
+ statisticManager.listPartitionStatistics(metalake, object,
range);
+
+ PartitionStatisticsDTO[] partitionStatistics =
+ statistics.stream()
+ .map(
+ partitionStatistic ->
+ PartitionStatisticsDTO.of(
+ partitionStatistic.partitionName(),
+
DTOConverters.toDTOs(partitionStatistic.statistics())))
+ .toArray(PartitionStatisticsDTO[]::new);
+
+ return Utils.ok(new
PartitionStatisticsListResponse(partitionStatistics));
+ });
+ } catch (Exception e) {
+ LOG.error(
+ "Error listing {},{} partition statistics for table: {} in the
metalake {}.",
+ formattedFromPartitionName,
+ formattedToPartitionName,
+ fullName,
+ metalake,
+ e);
+ return ExceptionHandlers.handlePartitionStatsException(
+ OperationType.LIST,
+ formattedFromPartitionName + "," + formattedToPartitionName,
+ fullName,
+ e);
+ }
+ }
+
+ @PUT
+ @Path("/partitions")
+ @Produces("application/vnd.gravitino.v1+json")
+ @Timed(name = "update-partitions-stats." +
MetricNames.HTTP_PROCESS_DURATION, absolute = true)
+ @ResponseMetered(name = "update-partitions-stats", absolute = true)
+ public Response updatePartitionStatistics(
+ @PathParam("metalake") String metalake,
+ @PathParam("type") String type,
+ @PathParam("fullName") String fullName,
+ PartitionStatisticsUpdateRequest request) {
+ LOG.info("Updating partition statistics for table: {} in the metalake {}",
fullName, metalake);
+ try {
+ return Utils.doAs(
+ httpRequest,
+ () -> {
+ request.validate();
+
+ MetadataObject object =
+ MetadataObjects.parse(
+ fullName,
MetadataObject.Type.valueOf(type.toUpperCase(Locale.ROOT)));
+ if (object.type() != MetadataObject.Type.TABLE) {
+ throw new UnsupportedOperationException(
+ "Updating partition statistics is only supported for tables
now.");
+ }
+
+ List<PartitionStatisticsUpdateDTO> updates = request.getUpdates();
+ for (PartitionStatisticsUpdateDTO update : updates) {
+ update
+ .statistics()
+ .keySet()
+ .forEach(
+ statistic -> {
+ if (!statistic.startsWith(Statistic.CUSTOM_PREFIX)) {
+ // Current we only support custom statistics
+ throw new IllegalStatisticNameException(
+ "Statistic name must start with %s, but got: %s",
+ Statistic.CUSTOM_PREFIX, statistic);
+ }
+ });
+ }
+
+ MetadataObjectUtil.checkMetadataObject(metalake, object);
+
+ statisticManager.updatePartitionStatistics(
+ metalake,
+ object,
+ updates.stream()
+ .map(
+ update ->
+ PartitionStatisticsModification.update(
+ update.partitionName(), update.statistics()))
+ .collect(Collectors.toList()));
+
+ return Utils.ok(new BaseResponse(0));
+ });
+ } catch (Exception e) {
+ LOG.error(
+ "Error updating partition statistics for table: {} in the metalake
{}",
+ fullName,
+ metalake,
+ e);
+ String partitions =
+ StringUtils.joinWith(
+ ",",
+ request.getUpdates().stream()
+ .map(PartitionStatisticsUpdateDTO::partitionName)
+ .collect(Collectors.toList()));
+ return ExceptionHandlers.handlePartitionStatsException(
+ OperationType.UPDATE, partitions, fullName, e);
+ }
+ }
+
+ @POST
+ @Path("/partitions")
+ @Produces("application/vnd.gravitino.v1+json")
+ @Timed(name = "drop-partitions-stats." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
+ @ResponseMetered(name = "drop-partitions-stats", absolute = true)
+ public Response dropPartitionStatistics(
+ @PathParam("metalake") String metalake,
+ @PathParam("type") String type,
+ @PathParam("fullName") String fullName,
+ PartitionStatisticsDropRequest request) {
+
+ try {
+ return Utils.doAs(
+ httpRequest,
+ () -> {
+ request.validate();
+ MetadataObject object =
+ MetadataObjects.parse(
+ fullName,
MetadataObject.Type.valueOf(type.toUpperCase(Locale.ROOT)));
+ if (object.type() != MetadataObject.Type.TABLE) {
+ throw new UnsupportedOperationException(
+ "Dropping partition statistics is only supported for tables
now.");
+ }
+
+ MetadataObjectUtil.checkMetadataObject(metalake, object);
+
+ return Utils.ok(
+ new DropResponse(
+ statisticManager.dropPartitionStatistics(
+ metalake,
+ object,
+ request.getDrops().stream()
+ .map(
+ drop ->
+ PartitionStatisticsModification.drop(
+ drop.partitionName(),
drop.statisticNames()))
+ .collect(Collectors.toList()))));
+ });
+ } catch (Exception e) {
+ LOG.error(
+ "Error dropping partition statistics for table: {} in the metalake
{}.",
+ fullName,
+ metalake,
+ e);
+ String partitions =
+ StringUtils.joinWith(
+ ",",
+ request.getDrops().stream()
+ .map(PartitionStatisticsDropDTO::partitionName)
+ .collect(Collectors.toList()));
+ return ExceptionHandlers.handlePartitionStatsException(
+ OperationType.DROP, partitions, fullName, e);
+ }
+ }
+
+ @VisibleForTesting
+ static PartitionRange.BoundType getFromBoundType(boolean inclusive) {
+ return inclusive ? PartitionRange.BoundType.CLOSED :
PartitionRange.BoundType.OPEN;
+ }
+
+ private static String getFormattedFromPartitionName(
+ String fromPartitionName, boolean fromInclusive) {
+ if (fromPartitionName == null) {
+ return "(-INF";
+ } else {
+ if (fromInclusive) {
+ return "[" + fromPartitionName;
+ } else {
+ return "(" + fromPartitionName;
+ }
+ }
+ }
+
+ private static String getFormattedToPartitionName(String toPartitionName,
boolean toInclusive) {
+ if (toPartitionName == null) {
+ return "INF)";
+ } else {
+ if (toInclusive) {
+ return toPartitionName + "]";
+ } else {
+ return toPartitionName + ")";
+ }
+ }
+ }
}
diff --git
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestStatisticOperations.java
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestStatisticOperations.java
index 1a55b10282..3a4c20e68f 100644
---
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestStatisticOperations.java
+++
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestStatisticOperations.java
@@ -31,7 +31,9 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.time.Instant;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
@@ -42,19 +44,27 @@ import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.MetadataObjects;
import org.apache.gravitino.catalog.TableDispatcher;
+import org.apache.gravitino.dto.requests.PartitionStatisticsDropRequest;
+import org.apache.gravitino.dto.requests.PartitionStatisticsUpdateRequest;
import org.apache.gravitino.dto.requests.StatisticsDropRequest;
import org.apache.gravitino.dto.requests.StatisticsUpdateRequest;
import org.apache.gravitino.dto.responses.BaseResponse;
import org.apache.gravitino.dto.responses.DropResponse;
import org.apache.gravitino.dto.responses.ErrorConstants;
import org.apache.gravitino.dto.responses.ErrorResponse;
+import org.apache.gravitino.dto.responses.PartitionStatisticsListResponse;
import org.apache.gravitino.dto.responses.StatisticListResponse;
+import org.apache.gravitino.dto.stats.PartitionStatisticsDropDTO;
+import org.apache.gravitino.dto.stats.PartitionStatisticsUpdateDTO;
import org.apache.gravitino.dto.stats.StatisticDTO;
import org.apache.gravitino.dto.util.DTOConverters;
+import org.apache.gravitino.exceptions.IllegalStatisticNameException;
import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.rest.RESTUtils;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatistics;
import org.apache.gravitino.stats.Statistic;
import org.apache.gravitino.stats.StatisticManager;
import org.apache.gravitino.stats.StatisticValue;
@@ -321,13 +331,13 @@ public class TestStatisticOperations extends JerseyTest {
.accept("application/vnd.gravitino.v1+json")
.put(entity(req, MediaType.APPLICATION_JSON_TYPE));
- Assertions.assertEquals(
- Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp3.getStatus());
+ Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
resp3.getStatus());
Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp3.getMediaType());
ErrorResponse errorResp3 = resp3.readEntity(ErrorResponse.class);
- Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE,
errorResp3.getCode());
- Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp3.getType());
+ Assertions.assertEquals(ErrorConstants.ILLEGAL_ARGUMENTS_CODE,
errorResp3.getCode());
+ Assertions.assertEquals(
+ IllegalStatisticNameException.class.getSimpleName(),
errorResp3.getType());
}
@Test
@@ -406,4 +416,326 @@ public class TestStatisticOperations extends JerseyTest {
Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE,
errorResp2.getCode());
Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp2.getType());
}
+
+ @Test
+ public void testListPartitionStatistics() {
+ AuditInfo auditInfo =
+ AuditInfo.builder()
+ .withCreateTime(Instant.now())
+ .withCreator("test")
+ .withLastModifiedTime(Instant.now())
+ .withLastModifier("test")
+ .build();
+
+ StatisticDTO stat1 =
+ StatisticDTO.builder()
+ .withName("test1")
+ .withValue(Optional.of(StatisticValues.stringValue("test")))
+ .withReserved(true)
+ .withModifiable(false)
+ .withAudit(DTOConverters.toDTO(auditInfo))
+ .build();
+ StatisticDTO stat2 =
+ StatisticDTO.builder()
+ .withName("test1")
+ .withValue(Optional.of(StatisticValues.longValue(1L)))
+ .withReserved(true)
+ .withModifiable(false)
+ .withAudit(DTOConverters.toDTO(auditInfo))
+ .build();
+ PartitionStatistics partitionStatistics =
+ new StatisticManager.CustomPartitionStatistic("partition1", new
Statistic[] {stat1, stat2});
+ MetadataObject tableObject =
+ MetadataObjects.parse(
+ String.format("%s.%s.%s", catalog, schema, table),
MetadataObject.Type.TABLE);
+
+ when(manager.listPartitionStatistics(any(), any(), any()))
+ .thenReturn(Lists.newArrayList(partitionStatistics));
+ when(tableDispatcher.tableExists(any())).thenReturn(true);
+
+ Response resp =
+ target(
+ "/metalakes/"
+ + metalake
+ + "/objects/"
+ + tableObject.type()
+ + "/"
+ + tableObject.fullName()
+ + "/statistics/partitions")
+ .queryParam("from", "p0")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ PartitionStatisticsListResponse listResp =
+ resp.readEntity(PartitionStatisticsListResponse.class);
+ Assertions.assertEquals(0, listResp.getCode());
+
+ Statistic[] statisticDTOS =
listResp.getPartitionStatistics()[0].statistics();
+ Assertions.assertEquals(2, statisticDTOS.length);
+ Assertions.assertEquals(stat1.name(), statisticDTOS[0].name());
+ Assertions.assertEquals(stat1.value().get(),
statisticDTOS[0].value().get());
+ Assertions.assertEquals(stat2.name(), statisticDTOS[1].name());
+ Assertions.assertEquals(stat2.value().get(),
statisticDTOS[1].value().get());
+
+ // Test throw NoSuchMetadataObjectException
+ when(tableDispatcher.tableExists(any())).thenReturn(false);
+ Response resp1 =
+ target(
+ "/metalakes/"
+ + metalake
+ + "/objects/"
+ + tableObject.type()
+ + "/"
+ + tableObject.fullName()
+ + "/statistics/partitions")
+ .queryParam("from", "p0")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
resp1.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp1.getMediaType());
+
+ ErrorResponse errorResp = resp1.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE,
errorResp.getCode());
+ Assertions.assertEquals(
+ NoSuchMetadataObjectException.class.getSimpleName(),
errorResp.getType());
+
+ // Test throw RuntimeException
+ when(tableDispatcher.tableExists(any())).thenReturn(true);
+ doThrow(new RuntimeException("mock error"))
+ .when(manager)
+ .listPartitionStatistics(any(), any(), any());
+ Response resp2 =
+ target(
+ "/metalakes/"
+ + metalake
+ + "/objects/"
+ + tableObject.type()
+ + "/"
+ + tableObject.fullName()
+ + "/statistics/partitions")
+ .queryParam("from", "p0")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp2.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp2.getMediaType());
+
+ ErrorResponse errorResp2 = resp2.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE,
errorResp2.getCode());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp2.getType());
+ }
+
+ @Test
+ public void testUpdatePartitionStatistics() {
+ Map<String, StatisticValue<?>> statsMap = Maps.newHashMap();
+ statsMap.put(Statistic.CUSTOM_PREFIX + "test1",
StatisticValues.stringValue("test"));
+ statsMap.put(Statistic.CUSTOM_PREFIX + "test2",
StatisticValues.longValue(1L));
+ List<PartitionStatisticsUpdateDTO> partitionStatsList =
Lists.newArrayList();
+ partitionStatsList.add(PartitionStatisticsUpdateDTO.of("partition1",
statsMap));
+ MetadataObject tableObject =
+ MetadataObjects.parse(
+ String.format("%s.%s.%s", catalog, schema, table),
MetadataObject.Type.TABLE);
+
+ PartitionStatisticsUpdateRequest req = new
PartitionStatisticsUpdateRequest(partitionStatsList);
+
+ when(tableDispatcher.tableExists(any())).thenReturn(true);
+
+ Response resp =
+ target(
+ "/metalakes/"
+ + metalake
+ + "/objects/"
+ + tableObject.type()
+ + "/"
+ + tableObject.fullName()
+ + "/statistics/partitions")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .put(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ BaseResponse updateResp = resp.readEntity(BaseResponse.class);
+ Assertions.assertEquals(0, updateResp.getCode());
+
+ // Test throw NoSuchMetadataObjectException
+ when(tableDispatcher.tableExists(any())).thenReturn(false);
+
+ Response resp1 =
+ target(
+ "/metalakes/"
+ + metalake
+ + "/objects/"
+ + tableObject.type()
+ + "/"
+ + tableObject.fullName()
+ + "/statistics/partitions")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .put(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
resp1.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp1.getMediaType());
+
+ ErrorResponse errorResp = resp1.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE,
errorResp.getCode());
+ Assertions.assertEquals(
+ NoSuchMetadataObjectException.class.getSimpleName(),
errorResp.getType());
+
+ when(tableDispatcher.tableExists(any())).thenReturn(true);
+
+ // Test throw RuntimeException
+ doThrow(new RuntimeException("mock error"))
+ .when(manager)
+ .updatePartitionStatistics(any(), any(), any());
+ Response resp2 =
+ target(
+ "/metalakes/"
+ + metalake
+ + "/objects/"
+ + tableObject.type()
+ + "/"
+ + tableObject.fullName()
+ + "/statistics/partitions")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .put(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp2.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp2.getMediaType());
+
+ ErrorResponse errorResp2 = resp2.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE,
errorResp2.getCode());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp2.getType());
+
+ // Test throw IllegalStatisticNameException
+ statsMap.put("test1", StatisticValues.longValue(1L));
+
+ req = new PartitionStatisticsUpdateRequest(partitionStatsList);
+ Response resp3 =
+ target(
+ "/metalakes/"
+ + metalake
+ + "/objects/"
+ + tableObject.type()
+ + "/"
+ + tableObject.fullName()
+ + "/statistics/partitions")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .put(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
resp3.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp3.getMediaType());
+
+ ErrorResponse errorResp3 = resp3.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.ILLEGAL_ARGUMENTS_CODE,
errorResp3.getCode());
+ Assertions.assertEquals(
+ IllegalStatisticNameException.class.getSimpleName(),
errorResp3.getType());
+ }
+
+ @Test
+ public void testDropPartitionStatistics() {
+ List<PartitionStatisticsDropDTO> partitionStatistics =
Lists.newArrayList();
+ partitionStatistics.add(
+ PartitionStatisticsDropDTO.of("partition1",
Lists.newArrayList("stat1", "stat2")));
+ PartitionStatisticsDropRequest req = new
PartitionStatisticsDropRequest(partitionStatistics);
+ when(manager.dropPartitionStatistics(any(), any(),
any())).thenReturn(true);
+ when(tableDispatcher.tableExists(any())).thenReturn(true);
+
+ MetadataObject tableObject =
+ MetadataObjects.parse(
+ String.format("%s.%s.%s", catalog, schema, table),
MetadataObject.Type.TABLE);
+
+ Response resp =
+ target(
+ "/metalakes/"
+ + metalake
+ + "/objects/"
+ + tableObject.type()
+ + "/"
+ + tableObject.fullName()
+ + "/statistics/partitions")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .post(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ DropResponse dropResp = resp.readEntity(DropResponse.class);
+ Assertions.assertEquals(0, dropResp.getCode());
+ Assertions.assertTrue(dropResp.dropped());
+
+ // Test throw NoSuchMetadataObjectExcep
+ when(tableDispatcher.tableExists(any())).thenReturn(false);
+
+ Response resp1 =
+ target(
+ "/metalakes/"
+ + metalake
+ + "/objects/"
+ + tableObject.type()
+ + "/"
+ + tableObject.fullName()
+ + "/statistics/partitions")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .post(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
resp1.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp1.getMediaType());
+
+ ErrorResponse errorResp = resp1.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE,
errorResp.getCode());
+ Assertions.assertEquals(
+ NoSuchMetadataObjectException.class.getSimpleName(),
errorResp.getType());
+
+ // Test throw RuntimeException
+ when(tableDispatcher.tableExists(any())).thenReturn(true);
+ doThrow(new RuntimeException("mock error"))
+ .when(manager)
+ .dropPartitionStatistics(any(), any(), any());
+ Response resp2 =
+ target(
+ "/metalakes/"
+ + metalake
+ + "/objects/"
+ + tableObject.type()
+ + "/"
+ + tableObject.fullName()
+ + "/statistics/partitions")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .post(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp2.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp2.getMediaType());
+
+ ErrorResponse errorResp2 = resp2.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE,
errorResp2.getCode());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp2.getType());
+ }
+
+ @Test
+ public void testGetBoundType() {
+ Assertions.assertEquals(
+ PartitionRange.BoundType.CLOSED,
StatisticOperations.getFromBoundType(true));
+ Assertions.assertEquals(
+ PartitionRange.BoundType.OPEN,
StatisticOperations.getFromBoundType(false));
+ }
}