This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 26c4271449 [#9654] feat(optimizer): add updater code skeleton (#9974)
26c4271449 is described below
commit 26c42714494f6ab8db16171e908ddadfb5fe51a8
Author: FANNG <[email protected]>
AuthorDate: Thu Feb 12 15:13:46 2026 +0900
[#9654] feat(optimizer): add updater code skeleton (#9974)
## What changes were proposed in this pull request?
### User perspective
- Configure the optimizer to point at your updater providers:
- Set `OptimizerConfig.STATISTICS_UPDATER_CONFIG` to your
`StatisticsUpdater` provider
name.
- Set `OptimizerConfig.METRICS_UPDATER_CONFIG` to your `MetricsUpdater`
provider name.
- Implement and register a `StatisticsCalculator` via:
`META-INF/services/
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator`.
- Construct an `Updater` with an `OptimizerEnv`.
- Run one of the entry points:
- `update(calculatorName, identifiers, UpdateType.STATISTICS)` to
persist raw statistics
for specific tables/jobs.
- `update(calculatorName, identifiers, UpdateType.METRICS)` to persist
derived metrics
(and job metrics if the calculator supports them).
- `updateAll(calculatorName, UpdateType.METRICS|STATISTICS)` for bulk
refresh, using the
calculator’s bulk methods.
### System perspective
- Adds new updater-focused optimizer APIs and implementations for
statistics/metrics flow:
- New API types: `MetricSample`, `PartitionMetricSample`,
`MetricsUpdater`,
`StatisticsUpdater`, `StatisticsCalculator`,
`SupportsCalculateTableStatistics`,
`SupportsCalculateBulkTableStatistics`,
`SupportsCalculateJobStatistics`, `SupportsCalculateBulkJobStatistics`,
`UpdateType`, `ToStatistic`, and
`TableStatisticsBundle`.
- New impls: `MetricSampleImpl`, `PartitionMetricSampleImpl`,
`StatisticEntryImpl`,
`MetricRecordImpl`, `MetricsRepository`, plus `Updater` as the
entrypoint to calculate and persist
statistics/metrics.
- Adds provider wiring:
- `OptimizerConfig` now includes `STATISTICS_UPDATER_CONFIG` and
`METRICS_UPDATER_CONFIG`.
- `ProviderUtils` creates `StatisticsUpdater` and `MetricsUpdater`
instances.
- `InstanceLoaderUtils` loads `StatisticsCalculator` by provider name.
- Removes unused/legacy updater/metrics components and SPI entries that
are out of scope
for the new updater flow.
- Adds and updates unit tests for the updater/statistics logic and keeps
existing
recommender code unchanged.
## Why are the changes needed?
Fixes: #9654
- This change introduces a clear, provider-based updater API to
calculate statistics and
persist either raw statistics or derived metrics, with explicit
entrypoints (`update`,
`updateAll`) and consistent data models (`MetricSample`,
`PartitionMetricSample`,
`TableStatisticsBundle`).
- It enables pluggable implementations for both statistics calculation
and storage, which
is required for supporting multiple sources/backends and batch refresh
workflows.
- It removes outdated or incomplete updater/metrics components so the
new API surface is
clean and consistent.
## Does this PR introduce any user-facing change?
- No.
## How was this patch tested?
- `./gradlew :maintenance:optimizer:test -PskipITs`
---
.../optimizer/api/common/MetricSample.java | 41 ++
.../api/common/PartitionMetricSample.java | 33 ++
.../api/common/TableStatisticsBundle.java | 47 +++
.../optimizer/api/updater/MetricsUpdater.java | 46 +++
.../api/updater/StatisticsCalculator.java | 33 ++
.../optimizer/api/updater/StatisticsUpdater.java | 51 +++
.../SupportsCalculateBulkJobStatistics.java | 37 ++
.../SupportsCalculateBulkTableStatistics.java | 37 ++
.../updater/SupportsCalculateJobStatistics.java | 37 ++
.../updater/SupportsCalculateTableStatistics.java | 36 ++
.../optimizer/common/MetricSampleImpl.java | 43 +++
.../common/PartitionMetricSampleImpl.java | 45 +++
.../optimizer/common/conf/OptimizerConfig.java | 18 +
.../optimizer/common/util/InstanceLoaderUtils.java | 66 ++++
.../optimizer/common/util/ProviderUtils.java | 10 +
.../optimizer/common/util/StatisticValueUtils.java | 119 ++++++
.../optimizer/recommender/Recommender.java | 23 +-
.../optimizer/updater/StatisticEntryImpl.java | 46 +++
.../maintenance/optimizer/updater/UpdateType.java | 44 +++
.../maintenance/optimizer/updater/Updater.java | 424 +++++++++++++++++++++
.../updater/metrics/storage/MetricRecord.java | 29 ++
.../updater/metrics/storage/MetricRecordImpl.java | 33 ++
.../updater/metrics/storage/MetricsRepository.java | 60 +++
.../optimizer/updater/util/ToStatistic.java | 38 ++
.../optimizer/common/util/TestProviderUtils.java | 2 -
.../common/util/TestStatisticValueUtils.java | 172 +++++++++
.../optimizer/updater/MetricsUpdaterForTest.java | 86 +++++
.../updater/StatisticsCalculatorForTest.java | 101 +++++
.../updater/StatisticsUpdaterForTest.java | 91 +++++
.../maintenance/optimizer/updater/TestUpdater.java | 160 ++++++++
...itino.maintenance.optimizer.api.common.Provider | 21 +
...ance.optimizer.api.updater.StatisticsCalculator | 18 +
...tenance.optimizer.api.updater.StatisticsUpdater | 18 +
33 files changed, 2051 insertions(+), 14 deletions(-)
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricSample.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricSample.java
new file mode 100644
index 0000000000..6370eef912
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricSample.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.common;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represents a single metric sample with a timestamp and associated
statistic. */
+@DeveloperApi
+public interface MetricSample {
+
+ /**
+ * Metric event time in epoch seconds.
+ *
+ * @return the metric timestamp in epoch seconds
+ */
+ long timestamp();
+
+ /**
+ * The statistic value sampled at this timestamp.
+ *
+ * @return the statistic entry for this sample
+ */
+ StatisticEntry<?> statistic();
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionMetricSample.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionMetricSample.java
new file mode 100644
index 0000000000..7731211c63
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionMetricSample.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.common;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Metric sample associated with a specific partition. */
+@DeveloperApi
+public interface PartitionMetricSample extends MetricSample {
+ /**
+ * Partition identifiers associated with this metric, ordered from outer to
inner level.
+ *
+ * @return the partition path for this metric sample
+ */
+ PartitionPath partition();
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableStatisticsBundle.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableStatisticsBundle.java
new file mode 100644
index 0000000000..1c718dcf48
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableStatisticsBundle.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.common;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Container for table-level statistics and partition-level statistics. */
+@DeveloperApi
+public class TableStatisticsBundle {
+ private final List<StatisticEntry<?>> tableStatistics;
+ private final Map<PartitionPath, List<StatisticEntry<?>>>
partitionStatistics;
+
+ public TableStatisticsBundle(
+ List<StatisticEntry<?>> tableStatistics,
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics) {
+ this.tableStatistics = tableStatistics != null ?
List.copyOf(tableStatistics) : List.of();
+ this.partitionStatistics =
+ partitionStatistics != null ? Map.copyOf(partitionStatistics) :
Map.of();
+ }
+
+ public List<StatisticEntry<?>> tableStatistics() {
+ return tableStatistics;
+ }
+
+ public Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics() {
+ return partitionStatistics;
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/MetricsUpdater.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/MetricsUpdater.java
new file mode 100644
index 0000000000..9cd0707e9f
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/MetricsUpdater.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.updater;
+
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.Provider;
+
+/** Represents an updater that can update metrics for a table or job. */
+@DeveloperApi
+public interface MetricsUpdater extends Provider {
+ /**
+ * Persist table metrics.
+ *
+ * @param nameIdentifier catalog/schema/table identifier
+ * @param metrics time-series samples to write
+ */
+ void updateTableMetrics(NameIdentifier nameIdentifier, List<MetricSample>
metrics);
+
+ /**
+ * Persist job metrics.
+ *
+ * @param nameIdentifier job identifier
+ * @param metrics time-series samples to write
+ */
+ void updateJobMetrics(NameIdentifier nameIdentifier, List<MetricSample>
metrics);
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsCalculator.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsCalculator.java
new file mode 100644
index 0000000000..e2c433f977
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsCalculator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.updater;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+
+/** Represents a calculator that can calculate statistics. */
+@DeveloperApi
+public interface StatisticsCalculator {
+ /** Name used for discovery via ServiceLoader. */
+ String name();
+
+ /** Prepare any external resources before first computation. */
+ void initialize(OptimizerEnv optimizerEnv);
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsUpdater.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsUpdater.java
new file mode 100644
index 0000000000..d3b0a59cf3
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsUpdater.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.updater;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.Provider;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+
+/** Represents an updater that can update statistics. */
+@DeveloperApi
+public interface StatisticsUpdater extends Provider {
+ /**
+ * Persist table statistics.
+ *
+ * @param tableIdentifier catalog/schema/table identifier
+ * @param tableStatistics list of table-level statistics
+ */
+ void updateTableStatistics(
+ NameIdentifier tableIdentifier, List<StatisticEntry<?>> tableStatistics);
+
+ /**
+ * Persist partition statistics.
+ *
+ * @param tableIdentifier catalog/schema/table identifier
+ * @param partitionStatistics map of partition identifier (outer to inner)
to statistic entry
+ */
+ void updatePartitionStatistics(
+ NameIdentifier tableIdentifier,
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics);
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobStatistics.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobStatistics.java
new file mode 100644
index 0000000000..d10aa2cecb
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobStatistics.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.updater;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+
+/** Represents a provider that supports bulk job statistics calculation. */
+@DeveloperApi
+public interface SupportsCalculateBulkJobStatistics extends
SupportsCalculateJobStatistics {
+ /**
+ * Calculate job-level statistics for all identifiers discoverable by this
calculator.
+ *
+ * @return map of job identifier to its statistics; empty when none are
produced
+ */
+ Map<NameIdentifier, List<StatisticEntry<?>>> calculateAllJobStatistics();
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableStatistics.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableStatistics.java
new file mode 100644
index 0000000000..30072fc662
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableStatistics.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.updater;
+
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import
org.apache.gravitino.maintenance.optimizer.api.common.TableStatisticsBundle;
+
+/** Represents a provider that supports bulk table statistics calculation. */
+@DeveloperApi
+public interface SupportsCalculateBulkTableStatistics extends
SupportsCalculateTableStatistics {
+ /**
+ * Calculate table-level and partition-level statistics for all identifiers
discoverable by this
+ * calculator.
+ *
+ * @return map of table identifier to its statistics bundle; empty when none
are produced
+ */
+ Map<NameIdentifier, TableStatisticsBundle> calculateBulkTableStatistics();
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobStatistics.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobStatistics.java
new file mode 100644
index 0000000000..4a49591138
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobStatistics.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.updater;
+
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+
+/** Represents a provider that supports job statistics. */
+@DeveloperApi
+public interface SupportsCalculateJobStatistics extends StatisticsCalculator {
+ /**
+ * Calculate job-level statistics to be persisted.
+ *
+ * @param jobIdentifier job identifier
+ * @return list of statistics; empty when none are produced
+ */
+ List<StatisticEntry<?>> calculateJobStatistics(NameIdentifier jobIdentifier);
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableStatistics.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableStatistics.java
new file mode 100644
index 0000000000..586af4b5e1
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableStatistics.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.updater;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import
org.apache.gravitino.maintenance.optimizer.api.common.TableStatisticsBundle;
+
+/** Represents a provider that supports table statistics. */
+@DeveloperApi
+public interface SupportsCalculateTableStatistics extends StatisticsCalculator
{
+ /**
+ * Calculate table-level and partition-level statistics to be persisted.
+ *
+ * @param tableIdentifier catalog/schema/table identifier
+ * @return statistics bundle; contains table statistics and partition
statistics
+ */
+ TableStatisticsBundle calculateTableStatistics(NameIdentifier
tableIdentifier);
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/MetricSampleImpl.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/MetricSampleImpl.java
new file mode 100644
index 0000000000..858c1729d9
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/MetricSampleImpl.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.experimental.Accessors;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+
+/** Immutable {@link MetricSample} implementation that binds a statistic to an
event timestamp. */
+@Accessors(fluent = true)
+@AllArgsConstructor
+@EqualsAndHashCode
+@Getter
+public final class MetricSampleImpl implements MetricSample {
+ private final long timestamp;
+ @NonNull private final StatisticEntry<?> statistic;
+
+ @Override
+ public String toString() {
+ return "{" + timestamp + ": " + statistic + " }";
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionMetricSampleImpl.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionMetricSampleImpl.java
new file mode 100644
index 0000000000..a2c8542e83
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionMetricSampleImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.experimental.Accessors;
+import
org.apache.gravitino.maintenance.optimizer.api.common.PartitionMetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+
+/** Metric sample tied to a specific partition. */
+@Accessors(fluent = true)
+@AllArgsConstructor
+@EqualsAndHashCode
+@Getter
+public final class PartitionMetricSampleImpl implements PartitionMetricSample {
+ private final long timestamp;
+ @NonNull private final StatisticEntry<?> statistic;
+ @NonNull private final PartitionPath partition;
+
+ @Override
+ public String toString() {
+ return "{" + timestamp + ": " + statistic + ", partition=" + partition + "
}";
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
index b0890957ac..8207b3716f 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
@@ -53,6 +53,10 @@ public class OptimizerConfig extends Config {
private static final String TABLE_META_PROVIDER = RECOMMENDER_PREFIX +
"tableMetaProvider";
private static final String JOB_SUBMITTER = RECOMMENDER_PREFIX +
"jobSubmitter";
+ private static final String UPDATER_PREFIX = OPTIMIZER_PREFIX + "updater.";
+ private static final String STATISTICS_UPDATER = UPDATER_PREFIX +
"statisticsUpdater";
+ private static final String METRICS_UPDATER = UPDATER_PREFIX +
"metricsUpdater";
+
public static final ConfigEntry<String> STATISTICS_PROVIDER_CONFIG =
new ConfigBuilder(STATISTICS_PROVIDER)
.doc(
@@ -97,6 +101,20 @@ public class OptimizerConfig extends Config {
.stringConf()
.createWithDefault(NoopJobSubmitter.NAME);
+ public static final ConfigEntry<String> STATISTICS_UPDATER_CONFIG =
+ new ConfigBuilder(STATISTICS_UPDATER)
+ .doc("The statistics updater implementation name (matches
Provider.name()).")
+ .version(ConfigConstants.VERSION_1_2_0)
+ .stringConf()
+ .create();
+
+ public static final ConfigEntry<String> METRICS_UPDATER_CONFIG =
+ new ConfigBuilder(METRICS_UPDATER)
+ .doc("The metrics updater implementation name (matches
Provider.name()).")
+ .version(ConfigConstants.VERSION_1_2_0)
+ .stringConf()
+ .create();
+
public static final ConfigEntry<String> GRAVITINO_URI_CONFIG =
new ConfigBuilder(GRAVITINO_URI)
.doc("The URI of the Gravitino server.")
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/InstanceLoaderUtils.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/InstanceLoaderUtils.java
new file mode 100644
index 0000000000..2ccdd294f2
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/InstanceLoaderUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Streams;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator;
+
+public class InstanceLoaderUtils {
+
+ public static <T extends StatisticsCalculator> T
createStatisticsCalculatorInstance(
+ String calculatorName) {
+ ServiceLoader<StatisticsCalculator> loader =
ServiceLoader.load(StatisticsCalculator.class);
+ List<Class<? extends T>> providers =
+ Streams.stream(loader.iterator())
+ .filter(p -> p.name().equalsIgnoreCase(calculatorName))
+ .map(p -> (Class<? extends T>) p.getClass())
+ .collect(Collectors.toList());
+
+ if (providers.isEmpty()) {
+ throw new IllegalArgumentException(
+ "No "
+ + StatisticsCalculator.class.getSimpleName()
+ + " class found for: "
+ + calculatorName);
+ } else if (providers.size() > 1) {
+ throw new IllegalArgumentException(
+ "Multiple "
+ + StatisticsCalculator.class.getSimpleName()
+ + " found for: "
+ + calculatorName);
+ } else {
+ Class<? extends T> providerClz = Iterables.getOnlyElement(providers);
+ try {
+ return providerClz.getDeclaredConstructor().newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to instantiate StatisticsCalculator: "
+ + calculatorName
+ + ", class: "
+ + providerClz.getName(),
+ e);
+ }
+ }
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
index 48b422c21e..a60b75257d 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
@@ -29,6 +29,8 @@ import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
import
org.apache.gravitino.maintenance.optimizer.api.recommender.StatisticsProvider;
import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyProvider;
import
org.apache.gravitino.maintenance.optimizer.api.recommender.TableMetadataProvider;
+import org.apache.gravitino.maintenance.optimizer.api.updater.MetricsUpdater;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater;
/** Helper for loading optimizer providers via {@link ServiceLoader}. */
public class ProviderUtils {
@@ -89,4 +91,12 @@ public class ProviderUtils {
public static JobSubmitter createJobSubmitterInstance(String provider) {
return createProviderInstance(JobSubmitter.class, provider);
}
+
+ public static StatisticsUpdater createStatisticsUpdaterInstance(String
provider) {
+ return createProviderInstance(StatisticsUpdater.class, provider);
+ }
+
+ public static MetricsUpdater createMetricsUpdaterInstance(String provider) {
+ return createProviderInstance(MetricsUpdater.class, provider);
+ }
}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/StatisticValueUtils.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/StatisticValueUtils.java
new file mode 100644
index 0000000000..970c526fdc
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/StatisticValueUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Optional;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Type.Name;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+
+public class StatisticValueUtils {
+
+ public static Optional<StatisticValue<?>> avg(List<StatisticValue<?>>
values) {
+ if (values.isEmpty()) {
+ return Optional.empty();
+ }
+
Preconditions.checkArgument(values.stream().allMatch(StatisticValueUtils::isNumber));
+
+ Optional<StatisticValue<?>> sum = sum(values);
+ if (sum.isEmpty()) {
+ return Optional.empty();
+ }
+ return Optional.of(StatisticValueUtils.div(sum.get(), values.size()));
+ }
+
+ public static Optional<StatisticValue<?>> sum(List<StatisticValue<?>>
values) {
+ if (values.isEmpty()) {
+ return Optional.empty();
+ }
+ Type type = getValueType(values);
+ Name longName = Types.LongType.get().name();
+ Name doubleName = Types.DoubleType.get().name();
+ if (type.name().equals(longName)) {
+ long longSum = 0L;
+ for (StatisticValue<?> value : values) {
+ longSum += ((Long) value.value()).longValue();
+ }
+ return Optional.of(StatisticValues.longValue(longSum));
+ } else if (type.name().equals(doubleName)) {
+ double doubleSum = 0.0;
+ for (StatisticValue<?> value : values) {
+ doubleSum += ((Number) value.value()).doubleValue();
+ }
+ return Optional.of(StatisticValues.doubleValue(doubleSum));
+ } else {
+ throw new IllegalArgumentException("Unsupported number type: " +
type.name());
+ }
+ }
+
+ public static StatisticValue<?> div(StatisticValue<?> value, long divisor) {
+ Preconditions.checkArgument(isNumber(value), "Value must be a number");
+ Preconditions.checkArgument(divisor != 0, "Divisor cannot be zero");
+ Type type = value.dataType();
+ Name longName = Types.LongType.get().name();
+ Name doubleName = Types.DoubleType.get().name();
+ if (type.name().equals(longName)) {
+ long longValue = ((Long) value.value()).longValue();
+ return StatisticValues.doubleValue(((double) longValue) / divisor);
+ } else if (type.name().equals(doubleName)) {
+ double doubleValue = ((Number) value.value()).doubleValue();
+ return StatisticValues.doubleValue(doubleValue / divisor);
+ } else {
+ throw new IllegalArgumentException("Unsupported number type: " +
type.name());
+ }
+ }
+
+ public static String toString(StatisticValue<?> value) {
+ Preconditions.checkArgument(value != null, "StatisticValue cannot be
null");
+ try {
+ return JsonUtils.anyFieldMapper().writeValueAsString(value);
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException("Failed to serialize StatisticValue:
" + value, e);
+ }
+ }
+
+ public static StatisticValue<?> fromString(String valueStr) {
+ Preconditions.checkArgument(valueStr != null, "StatisticValue string
cannot be null");
+ try {
+ return JsonUtils.anyFieldMapper().readValue(valueStr,
StatisticValue.class);
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException(
+ "Failed to deserialize StatisticValue from: " + valueStr, e);
+ }
+ }
+
+ private static Type getValueType(List<StatisticValue<?>> values) {
+ Type type = values.get(0).dataType();
+ Preconditions.checkArgument(
+ values.stream().allMatch(v ->
v.dataType().name().equals(type.name())));
+ return type;
+ }
+
+ private static boolean isNumber(StatisticValue<?> value) {
+ Type type = value.dataType();
+ return type == Types.LongType.get() || type == Types.DoubleType.get();
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
index 0602e7f286..a41398463e 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
@@ -88,23 +88,22 @@ public class Recommender implements AutoCloseable {
*/
public Recommender(OptimizerEnv optimizerEnv) {
OptimizerConfig config = optimizerEnv.config();
- StrategyProvider strategyProvider = loadStrategyProvider(config);
- StatisticsProvider statisticsProvider = loadStatisticsProvider(config);
- TableMetadataProvider tableMetadataProvider =
loadTableMetadataProvider(config);
- JobSubmitter jobSubmitter = loadJobSubmitter(config);
-
this.optimizerEnv = optimizerEnv;
- this.strategyProvider = strategyProvider;
- this.statisticsProvider = statisticsProvider;
- this.tableMetadataProvider = tableMetadataProvider;
- this.jobSubmitter = jobSubmitter;
-
+ this.strategyProvider = loadStrategyProvider(config);
this.strategyProvider.initialize(optimizerEnv);
+ closeableGroup.register(strategyProvider,
StrategyProvider.class.getSimpleName());
+
+ this.statisticsProvider = loadStatisticsProvider(config);
this.statisticsProvider.initialize(optimizerEnv);
+ closeableGroup.register(statisticsProvider,
StatisticsProvider.class.getSimpleName());
+
+ this.tableMetadataProvider = loadTableMetadataProvider(config);
this.tableMetadataProvider.initialize(optimizerEnv);
- this.jobSubmitter.initialize(optimizerEnv);
+ closeableGroup.register(tableMetadataProvider,
TableMetadataProvider.class.getSimpleName());
- addToCloseableGroup();
+ this.jobSubmitter = loadJobSubmitter(config);
+ this.jobSubmitter.initialize(optimizerEnv);
+ closeableGroup.register(jobSubmitter, JobSubmitter.class.getSimpleName());
}
@VisibleForTesting
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticEntryImpl.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticEntryImpl.java
new file mode 100644
index 0000000000..d042a94abf
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticEntryImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.experimental.Accessors;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.stats.StatisticValue;
+
+/**
+ * Immutable {@link StatisticEntry} implementation used by the updater runtime.
+ *
+ * @param <T> underlying value type
+ */
+@Accessors(fluent = true)
+@AllArgsConstructor
+@EqualsAndHashCode
+@Getter
+public final class StatisticEntryImpl<T> implements StatisticEntry<T> {
+ private final String name;
+ private final StatisticValue<T> value;
+
+ @Override
+ public String toString() {
+ return "{ " + name + " : " + value.value() + '}';
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/UpdateType.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/UpdateType.java
new file mode 100644
index 0000000000..55572fa9a9
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/UpdateType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater;
+
+/** Supported updater actions. */
+public enum UpdateType {
+ /** Persist metrics derived from statistics. */
+ METRICS,
+ /** Persist statistics directly into the catalog. */
+ STATISTICS;
+
+ /**
+ * Parses a case-insensitive update type value.
+ *
+ * @param value string representation such as "statistics" or "metrics"
+ * @return matching {@link UpdateType}
+ */
+ public static UpdateType fromString(String value) {
+ for (UpdateType m : UpdateType.values()) {
+ if (m.name().equalsIgnoreCase(value)) {
+ return m;
+ }
+ }
+ throw new IllegalArgumentException(
+ "Invalid update type: " + value + ". Allowed values: statistics,
metrics");
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/Updater.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/Updater.java
new file mode 100644
index 0000000000..3acbe2b188
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/Updater.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import
org.apache.gravitino.maintenance.optimizer.api.common.TableStatisticsBundle;
+import org.apache.gravitino.maintenance.optimizer.api.updater.MetricsUpdater;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateBulkJobStatistics;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateBulkTableStatistics;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateJobStatistics;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateTableStatistics;
+import org.apache.gravitino.maintenance.optimizer.common.CloseableGroup;
+import org.apache.gravitino.maintenance.optimizer.common.MetricSampleImpl;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import
org.apache.gravitino.maintenance.optimizer.common.PartitionMetricSampleImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import
org.apache.gravitino.maintenance.optimizer.common.util.InstanceLoaderUtils;
+import org.apache.gravitino.maintenance.optimizer.common.util.ProviderUtils;
+import
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Entry point that wires together the statistics calculator and updater
providers to persist
+ * optimizer statistics or metrics.
+ *
+ * <p>Usage:
+ *
+ * <ol>
+ * <li>Configure {@link OptimizerConfig#STATISTICS_UPDATER_CONFIG} and {@link
+ * OptimizerConfig#METRICS_UPDATER_CONFIG} with provider names.
+ * <li>Instantiate {@link Updater} with an {@link OptimizerEnv} to
initialize the providers.
+ * <li>Call {@link #update(String, List, UpdateType)} for specific
identifiers or {@link
+ * #updateAll(String, UpdateType)} for bulk refresh.
+ * <li>Call {@link #close()} to release provider resources.
+ * </ol>
+ */
+public class Updater implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(Updater.class);
+
+ private StatisticsUpdater statisticsUpdater;
+ private MetricsUpdater metricsUpdater;
+ private OptimizerEnv optimizerEnv;
+ private final CloseableGroup closeableGroup = new CloseableGroup();
+
+ public Updater(OptimizerEnv optimizerEnv) {
+ this.optimizerEnv = optimizerEnv;
+ this.statisticsUpdater = loadStatisticsUpdater(optimizerEnv.config());
+ statisticsUpdater.initialize(optimizerEnv);
+ closeableGroup.register(statisticsUpdater,
StatisticsUpdater.class.getSimpleName());
+
+ this.metricsUpdater = loadMetricsUpdater(optimizerEnv.config());
+ metricsUpdater.initialize(optimizerEnv);
+ closeableGroup.register(metricsUpdater,
MetricsUpdater.class.getSimpleName());
+ }
+
+ /**
+ * Updates statistics or metrics for the provided identifiers.
+ *
+ * <p>This is the main entry point for updating a bounded set of targets.
The updater resolves the
+ * {@link StatisticsCalculator} by name, calculates table and partition
statistics, and persists
+ * either raw statistics or derived metrics based on {@code updateType}. If
the calculator
+ * implements {@link SupportsCalculateJobStatistics} and {@code updateType}
is {@link
+ * UpdateType#METRICS}, job metrics are also emitted.
+ *
+ * @param statisticsCalculatorName The provider name of the statistics
calculator.
+ * @param nameIdentifiers The identifiers to update (table and/or job).
+ * @param updateType The target update type: statistics or metrics.
+ */
+ public void update(
+ String statisticsCalculatorName,
+ List<NameIdentifier> nameIdentifiers,
+ UpdateType updateType) {
+ StatisticsCalculator calculator =
getStatisticsCalculator(statisticsCalculatorName);
+ long tableRecords = 0;
+ long partitionRecords = 0;
+ long jobRecords = 0;
+ for (NameIdentifier nameIdentifier : nameIdentifiers) {
+ if (calculator instanceof SupportsCalculateTableStatistics) {
+ SupportsCalculateTableStatistics supportTableStatistics =
+ ((SupportsCalculateTableStatistics) calculator);
+ TableStatisticsBundle bundle =
+ supportTableStatistics.calculateTableStatistics(nameIdentifier);
+ List<StatisticEntry<?>> statistics = bundle != null ?
bundle.tableStatistics() : List.of();
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics =
+ bundle != null ? bundle.partitionStatistics() : Map.of();
+ tableRecords += countStatistics(statistics);
+ partitionRecords += countPartitionStatistics(partitionStatistics);
+ LOG.info(
+ "Updating table statistics/metrics: calculator={}, updateType={},
identifier={}",
+ statisticsCalculatorName,
+ updateType,
+ nameIdentifier);
+ updateTable(statistics, nameIdentifier, updateType);
+ updatePartition(partitionStatistics, nameIdentifier, updateType);
+ }
+ if (calculator instanceof SupportsCalculateJobStatistics
+ && UpdateType.METRICS.equals(updateType)) {
+ SupportsCalculateJobStatistics supportJobStatistics =
+ ((SupportsCalculateJobStatistics) calculator);
+ List<StatisticEntry<?>> statistics =
+ supportJobStatistics.calculateJobStatistics(nameIdentifier);
+ jobRecords += countStatistics(statistics);
+ LOG.info(
+ "Updating job metrics: calculator={}, identifier={}",
+ statisticsCalculatorName,
+ nameIdentifier);
+ updateJob(statistics, nameIdentifier);
+ }
+ }
+ System.out.println(
+ String.format(
+ "SUMMARY: %s totalRecords=%d tableRecords=%d partitionRecords=%d
jobRecords=%d",
+ updateType.name().toLowerCase(Locale.ROOT),
+ tableRecords + partitionRecords + jobRecords,
+ tableRecords,
+ partitionRecords,
+ jobRecords));
+ }
+
+ /**
+ * Updates statistics or metrics for all identifiers returned by the
calculator.
+ *
+ * <p>This is the main entry point for batch refreshes. The updater asks the
{@link
+ * StatisticsCalculator} for all table statistics (and optionally job
statistics) and persists
+ * them according to {@code updateType}. If the calculator implements {@link
+ * SupportsCalculateBulkJobStatistics} and {@code updateType} is {@link
UpdateType#METRICS}, job
+ * metrics are also emitted.
+ *
+ * @param statisticsCalculatorName The provider name of the statistics
calculator.
+ * @param updateType The target update type: statistics or metrics.
+ */
+ public void updateAll(String statisticsCalculatorName, UpdateType
updateType) {
+ StatisticsCalculator calculator =
getStatisticsCalculator(statisticsCalculatorName);
+ long tableRecords = 0;
+ long partitionRecords = 0;
+ long jobRecords = 0;
+
+ if (calculator instanceof SupportsCalculateBulkTableStatistics
supportBulkTableStatistics) {
+ Map<NameIdentifier, TableStatisticsBundle> allTableStatistics =
+ supportBulkTableStatistics.calculateBulkTableStatistics();
+ if (allTableStatistics == null) {
+ allTableStatistics = Map.of();
+ }
+
+ tableRecords += countAllTableStatistics(allTableStatistics);
+ partitionRecords += countAllPartitionStatistics(allTableStatistics);
+ allTableStatistics.forEach(
+ (identifier, bundle) -> {
+ List<StatisticEntry<?>> statistics =
+ bundle != null ? bundle.tableStatistics() : List.of();
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics =
+ bundle != null ? bundle.partitionStatistics() : Map.of();
+ updateTable(statistics, identifier, updateType);
+ updatePartition(partitionStatistics, identifier, updateType);
+ });
+ }
+
+ if (calculator instanceof SupportsCalculateBulkJobStatistics
supportJobStatistics
+ && UpdateType.METRICS.equals(updateType)) {
+ Map<NameIdentifier, List<StatisticEntry<?>>> allJobStatistics =
+ supportJobStatistics.calculateAllJobStatistics();
+ if (allJobStatistics == null) {
+ allJobStatistics = Map.of();
+ }
+ jobRecords += countAllStatistics(allJobStatistics);
+ allJobStatistics.forEach((identifier, statistics) ->
updateJob(statistics, identifier));
+ }
+ System.out.println(
+ String.format(
+ "SUMMARY: %s totalRecords=%d tableRecords=%d partitionRecords=%d
jobRecords=%d",
+ updateType.name().toLowerCase(Locale.ROOT),
+ tableRecords + partitionRecords + jobRecords,
+ tableRecords,
+ partitionRecords,
+ jobRecords));
+ }
+
+ @VisibleForTesting
+ public MetricsUpdater getMetricsUpdater() {
+ return metricsUpdater;
+ }
+
+ @Override
+ public void close() throws Exception {
+ closeableGroup.close();
+ }
+
+ private void updateTable(
+ List<StatisticEntry<?>> statistics, NameIdentifier tableIdentifier,
UpdateType updateType) {
+ switch (updateType) {
+ case STATISTICS:
+ updateTableStatistics(statistics, tableIdentifier);
+ break;
+ case METRICS:
+ updateTableMetrics(statistics, tableIdentifier);
+ break;
+ }
+ }
+
+ private void updatePartition(
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics,
+ NameIdentifier tableIdentifier,
+ UpdateType updateType) {
+ switch (updateType) {
+ case STATISTICS:
+ updatePartitionStatistics(partitionStatistics, tableIdentifier);
+ break;
+ case METRICS:
+ updatePartitionMetrics(partitionStatistics, tableIdentifier);
+ break;
+ }
+ }
+
+ private void updateTableStatistics(
+ List<StatisticEntry<?>> statistics, NameIdentifier tableIdentifier) {
+ LOG.info(
+ "Persisting table statistics: identifier={}, count={}, details={}",
+ tableIdentifier,
+ statistics != null ? statistics.size() : 0,
+ summarize(statistics));
+ statisticsUpdater.updateTableStatistics(tableIdentifier, statistics);
+ }
+
+ private void updatePartitionStatistics(
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics,
+ NameIdentifier tableIdentifier) {
+ if (partitionStatistics == null || partitionStatistics.isEmpty()) {
+ LOG.info(
+ "Persist partition statistics skipped: identifier={}, reason=empty
partitions",
+ tableIdentifier);
+ return;
+ }
+ LOG.info(
+ "Persisting partition statistics: identifier={}, partitions={},
names={}, sample={}",
+ tableIdentifier,
+ partitionStatistics.size(),
+ partitionNames(partitionStatistics),
+
summarize(partitionStatistics.values().stream().flatMap(Collection::stream).toList()));
+ statisticsUpdater.updatePartitionStatistics(tableIdentifier,
partitionStatistics);
+ }
+
+ private void updateTableMetrics(
+ List<StatisticEntry<?>> statistics, NameIdentifier tableIdentifier) {
+ long timestampSeconds = System.currentTimeMillis() / 1000;
+ LOG.info(
+ "Persisting table metrics: identifier={}, count={}, details={}",
+ tableIdentifier,
+ statistics != null ? statistics.size() : 0,
+ summarize(statistics));
+ metricsUpdater.updateTableMetrics(tableIdentifier, toMetrics(statistics,
timestampSeconds));
+ }
+
+ private void updatePartitionMetrics(
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics,
+ NameIdentifier tableIdentifier) {
+ if (partitionStatistics == null || partitionStatistics.isEmpty()) {
+ LOG.info(
+ "Persist partition metrics skipped: identifier={}, reason=empty
partitions",
+ tableIdentifier);
+ return;
+ }
+ long timestampSeconds = System.currentTimeMillis() / 1000;
+ List<MetricSample> partitionMetrics =
toPartitionMetrics(partitionStatistics, timestampSeconds);
+ LOG.info(
+ "Persisting partition metrics: identifier={}, partitions={}, names={},
details={}",
+ tableIdentifier,
+ partitionStatistics.size(),
+ partitionNames(partitionStatistics),
+
summarize(partitionStatistics.values().stream().flatMap(Collection::stream).toList()));
+ metricsUpdater.updateTableMetrics(tableIdentifier, partitionMetrics);
+ }
+
+ private void updateJob(List<StatisticEntry<?>> statistics, NameIdentifier
jobIdentifier) {
+ long timestampSeconds = System.currentTimeMillis() / 1000;
+
+ LOG.info(
+ "Persisting job metrics: identifier={}, count={}, details={}",
+ jobIdentifier,
+ statistics != null ? statistics.size() : 0,
+ summarize(statistics));
+ metricsUpdater.updateJobMetrics(jobIdentifier, toMetrics(statistics,
timestampSeconds));
+ }
+
+ private String summarize(List<StatisticEntry<?>> statistics) {
+ if (statistics == null || statistics.isEmpty()) {
+ return "[]";
+ }
+ int limit = Math.min(statistics.size(), 20);
+ String summary =
+ statistics.stream()
+ .limit(limit)
+ .map(stat -> stat.name() + "=" + stat.value().value())
+ .collect(Collectors.joining(", ", "[", "]"));
+ if (statistics.size() > limit) {
+ summary = summary + " ... (" + statistics.size() + " total)";
+ }
+ return summary;
+ }
+
+ private List<MetricSample> toMetrics(List<StatisticEntry<?>> statistics,
long timestamp) {
+ List<MetricSample> metrics = new ArrayList<>();
+ if (statistics != null) {
+ statistics.forEach(stat -> metrics.add(new MetricSampleImpl(timestamp,
stat)));
+ }
+ return metrics;
+ }
+
+ private List<MetricSample> toPartitionMetrics(
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics, long
timestamp) {
+ List<MetricSample> metrics = new ArrayList<>();
+ if (partitionStatistics != null) {
+ partitionStatistics.forEach(
+ (partitionPath, statisticEntries) ->
+ statisticEntries.forEach(
+ stat ->
+ metrics.add(new PartitionMetricSampleImpl(timestamp,
stat, partitionPath))));
+ }
+ return metrics;
+ }
+
+ private StatisticsCalculator getStatisticsCalculator(String
statisticsCalculatorName) {
+ StatisticsCalculator calculator =
+
InstanceLoaderUtils.createStatisticsCalculatorInstance(statisticsCalculatorName);
+ calculator.initialize(optimizerEnv);
+ return calculator;
+ }
+
+ private StatisticsUpdater loadStatisticsUpdater(OptimizerConfig config) {
+ String statisticsUpdaterName =
config.get(OptimizerConfig.STATISTICS_UPDATER_CONFIG);
+ if (statisticsUpdaterName == null || statisticsUpdaterName.isBlank()) {
+ throw new IllegalArgumentException(
+ "Statistics updater provider name is required. Set "
+ + OptimizerConfig.STATISTICS_UPDATER_CONFIG.getKey()
+ + " to a valid provider name.");
+ }
+ return
ProviderUtils.createStatisticsUpdaterInstance(statisticsUpdaterName);
+ }
+
+ private String partitionNames(Map<PartitionPath, List<StatisticEntry<?>>>
partitionStatistics) {
+ return partitionStatistics.keySet().stream()
+ .map(PartitionUtils::encodePartitionPath)
+ .collect(Collectors.joining(", ", "[", "]"));
+ }
+
+ private MetricsUpdater loadMetricsUpdater(OptimizerConfig config) {
+ String metricsUpdaterName =
config.get(OptimizerConfig.METRICS_UPDATER_CONFIG);
+ if (metricsUpdaterName == null || metricsUpdaterName.isBlank()) {
+ throw new IllegalArgumentException(
+ "Metrics updater provider name is required. Set "
+ + OptimizerConfig.METRICS_UPDATER_CONFIG.getKey()
+ + " to a valid provider name.");
+ }
+ return ProviderUtils.createMetricsUpdaterInstance(metricsUpdaterName);
+ }
+
+ private long countStatistics(List<StatisticEntry<?>> statistics) {
+ return statistics == null ? 0 : statistics.size();
+ }
+
+ private long countPartitionStatistics(
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics) {
+ if (partitionStatistics == null) {
+ return 0;
+ }
+ return
partitionStatistics.values().stream().mapToLong(this::countStatistics).sum();
+ }
+
+ private long countAllStatistics(Map<NameIdentifier, List<StatisticEntry<?>>>
statisticsByTable) {
+ if (statisticsByTable == null) {
+ return 0;
+ }
+ return
statisticsByTable.values().stream().mapToLong(this::countStatistics).sum();
+ }
+
+ private long countAllTableStatistics(
+ Map<NameIdentifier, TableStatisticsBundle> statisticsByTable) {
+ if (statisticsByTable == null) {
+ return 0;
+ }
+ return statisticsByTable.values().stream()
+ .mapToLong(bundle -> countStatistics(bundle.tableStatistics()))
+ .sum();
+ }
+
+ private long countAllPartitionStatistics(
+ Map<NameIdentifier, TableStatisticsBundle> partitionStatisticsByTable) {
+ if (partitionStatisticsByTable == null) {
+ return 0;
+ }
+ return partitionStatisticsByTable.values().stream()
+ .mapToLong(bundle ->
countPartitionStatistics(bundle.partitionStatistics()))
+ .sum();
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricRecord.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricRecord.java
new file mode 100644
index 0000000000..84da12fdcb
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricRecord.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater.metrics.storage;
+
+/** Serializable metric record used by {@link MetricsRepository}. */
+public interface MetricRecord {
+ /** Timestamp in epoch seconds. */
+ long getTimestamp();
+
+ /** Encoded metric value. */
+ String getValue();
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricRecordImpl.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricRecordImpl.java
new file mode 100644
index 0000000000..4d5cda98a0
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricRecordImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater.metrics.storage;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+/** Immutable {@link MetricRecord} implementation backed by timestamp and
encoded value. */
+@AllArgsConstructor
+@EqualsAndHashCode
+@Getter
+public final class MetricRecordImpl implements MetricRecord {
+ private final long timestamp;
+ private final String value;
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsRepository.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsRepository.java
new file mode 100644
index 0000000000..c921164bf4
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsRepository.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater.metrics.storage;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.NameIdentifier;
+
+/** SPI for persisting metrics produced by the optimizer updater. */
+public interface MetricsRepository extends AutoCloseable {
+
+ /** Initialize the storage backend with configuration properties. */
+ void initialize(Map<String, String> properties);
+
+ /** Persist a table metric (optionally scoped to a partition). */
+ void storeTableMetric(
+ NameIdentifier nameIdentifier,
+ String metricName,
+ Optional<String> partition,
+ MetricRecord metric);
+
+ /** Load table-level metrics within a time window [fromTimestamp,
toTimestamp). */
+ Map<String, List<MetricRecord>> getTableMetrics(
+ NameIdentifier nameIdentifier, long fromTimestamp, long toTimestamp);
+
+ /** Load partition-level metrics within a time window [fromTimestamp,
toTimestamp). */
+ Map<String, List<MetricRecord>> getPartitionMetrics(
+ NameIdentifier nameIdentifier, String partition, long fromTimestamp,
long toTimestamp);
+
+ /** Delete table metrics older than the supplied timestamp (epoch seconds),
exclusive. */
+ int cleanupTableMetricsBefore(long timestamp);
+
+ /** Persist a job metric. */
+ void storeJobMetric(NameIdentifier nameIdentifier, String metricName,
MetricRecord metric);
+
+ /** Load job metrics within a time window [fromTimestamp, toTimestamp). */
+ Map<String, List<MetricRecord>> getJobMetrics(
+ NameIdentifier nameIdentifier, long fromTimestamp, long toTimestamp);
+
+ /** Delete job metrics older than the supplied timestamp (epoch seconds),
exclusive. */
+ int cleanupJobMetricsBefore(long timestamp);
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/util/ToStatistic.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/util/ToStatistic.java
new file mode 100644
index 0000000000..4a13628a93
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/util/ToStatistic.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater.util;
+
+import java.util.List;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+
+/**
+ * Represents an object that can be converted into statistics entries.
+ *
+ * <p>This interface is intended for updater components that need to expose
their statistics in a
+ * common format.
+ */
+public interface ToStatistic {
+ /**
+ * Converts this object into a list of statistics entries.
+ *
+ * @return list of statistics entries; empty when no statistics are produced
+ */
+ List<StatisticEntry<?>> toStatistic();
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
index 2855194509..bc90dd9e35 100644
---
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
@@ -68,6 +68,4 @@ public class TestProviderUtils {
Assertions.assertNotNull(tableMetadataProvider);
Assertions.assertTrue(tableMetadataProvider instanceof
GravitinoTableMetadataProvider);
}
-
- // Updater/monitor providers removed for recommender-only scope.
}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestStatisticValueUtils.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestStatisticValueUtils.java
new file mode 100644
index 0000000000..574cb8768d
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestStatisticValueUtils.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class TestStatisticValueUtils {
+
+ @Test
+ void avgReturnsEmptyForEmptyList() {
+ Assertions.assertTrue(StatisticValueUtils.avg(List.of()).isEmpty());
+ }
+
+ @Test
+ void avgCalculatesCorrectAverageForLongValues() {
+ List<StatisticValue<?>> values =
+ List.of(
+ StatisticValues.longValue(10L),
+ StatisticValues.longValue(20L),
+ StatisticValues.longValue(30L));
+ StatisticValue<?> result = StatisticValueUtils.avg(values).orElseThrow();
+ Assertions.assertEquals(20.0, result.value());
+ }
+
+ @Test
+ void avgCalculatesCorrectAverageForDoubleValues() {
+ List<StatisticValue<?>> values =
+ List.of(
+ StatisticValues.doubleValue(10.5),
+ StatisticValues.doubleValue(20.5),
+ StatisticValues.doubleValue(30.5));
+ StatisticValue<?> result = StatisticValueUtils.avg(values).orElseThrow();
+ Assertions.assertEquals(20.5, result.value());
+ }
+
+ @Test
+ void sumReturnsEmptyForEmptyList() {
+ Assertions.assertTrue(StatisticValueUtils.sum(List.of()).isEmpty());
+ }
+
+ @Test
+ void sumCalculatesCorrectSumForLongValues() {
+ List<StatisticValue<?>> values =
+ List.of(
+ StatisticValues.longValue(10L),
+ StatisticValues.longValue(20L),
+ StatisticValues.longValue(30L));
+ StatisticValue<?> result = StatisticValueUtils.sum(values).orElseThrow();
+ Assertions.assertEquals(60L, result.value());
+ }
+
+ @Test
+ void sumCalculatesCorrectSumForDoubleValues() {
+ List<StatisticValue<?>> values =
+ List.of(
+ StatisticValues.doubleValue(10.5),
+ StatisticValues.doubleValue(20.5),
+ StatisticValues.doubleValue(30.5));
+ StatisticValue<?> result = StatisticValueUtils.sum(values).orElseThrow();
+ Assertions.assertEquals(61.5, result.value());
+ }
+
+ @Test
+ void divThrowsExceptionForZeroDivisor() {
+ StatisticValue<?> value = StatisticValues.longValue(10L);
+ IllegalArgumentException exception =
+ Assertions.assertThrowsExactly(
+ IllegalArgumentException.class, () ->
StatisticValueUtils.div(value, 0));
+ Assertions.assertEquals("Divisor cannot be zero", exception.getMessage());
+ }
+
+ @Test
+ void divCalculatesCorrectDivisionForLongValues() {
+ StatisticValue<?> value = StatisticValues.longValue(10L);
+ StatisticValue<?> result = StatisticValueUtils.div(value, 2);
+ Assertions.assertEquals(5.0, result.value());
+ }
+
+ @Test
+ void divCalculatesCorrectDivisionForDoubleValues() {
+ StatisticValue<?> value = StatisticValues.doubleValue(10.5);
+ StatisticValue<?> result = StatisticValueUtils.div(value, 2);
+ Assertions.assertEquals(5.25, result.value());
+ }
+
+ @Test
+ void avgThrowsExceptionForNonNumberValues() {
+ List<StatisticValue<?>> values =
+ List.of(
+ StatisticValues.longValue(10L),
+ StatisticValues.doubleValue(20.5),
+ StatisticValues.stringValue("invalid"));
+ Assertions.assertThrowsExactly(
+ IllegalArgumentException.class, () -> StatisticValueUtils.avg(values));
+ }
+
+ @Test
+ void testSerDeserEquals() {
+ StatisticValue<?> value = StatisticValues.longValue(100L);
+ String result = StatisticValueUtils.toString(value);
+ StatisticValue<?> deserializeValue =
StatisticValueUtils.fromString(result);
+ assertEquals(value, deserializeValue);
+
+ StatisticValue<?> doubleValue = StatisticValues.doubleValue(100.5);
+ result = StatisticValueUtils.toString(doubleValue);
+ deserializeValue = StatisticValueUtils.fromString(result);
+ assertEquals(doubleValue, deserializeValue);
+ }
+
+ @Test
+ void toStringSerializesStatisticValueCorrectly() {
+ StatisticValue<?> value = StatisticValues.longValue(100L);
+ String result = StatisticValueUtils.toString(value);
+ assertEquals("100", result);
+
+ value = StatisticValues.doubleValue(100.5);
+ result = StatisticValueUtils.toString(value);
+ assertEquals("100.5", result);
+ }
+
+ @Test
+ void fromStringDeserializesStatisticValueCorrectly() {
+ String valueStr = "100";
+ StatisticValue<?> result = StatisticValueUtils.fromString(valueStr);
+ assertTrue(result instanceof StatisticValues.LongValue);
+ assertEquals(100L, ((StatisticValues.LongValue) result).value());
+
+ String doubleValueStr = "100.5";
+ StatisticValue<?> doubleResult =
StatisticValueUtils.fromString(doubleValueStr);
+ assertTrue(doubleResult instanceof StatisticValues.DoubleValue);
+ assertEquals(100.5, ((StatisticValues.DoubleValue) doubleResult).value());
+ }
+
+ @Test
+ void toStringThrowsExceptionForNullValue() {
+ IllegalArgumentException exception =
+ assertThrows(IllegalArgumentException.class, () ->
StatisticValueUtils.toString(null));
+ assertEquals("StatisticValue cannot be null", exception.getMessage());
+ }
+
+ @Test
+ void fromStringThrowsExceptionForNullString() {
+ IllegalArgumentException exception =
+ assertThrows(IllegalArgumentException.class, () ->
StatisticValueUtils.fromString(null));
+ assertEquals("StatisticValue string cannot be null",
exception.getMessage());
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/MetricsUpdaterForTest.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/MetricsUpdaterForTest.java
new file mode 100644
index 0000000000..5f02504306
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/MetricsUpdaterForTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.updater.MetricsUpdater;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+
+public class MetricsUpdaterForTest implements MetricsUpdater {
+
+ public static final String NAME = "test-metrics-updater";
+ private static final List<MetricsUpdaterForTest> INSTANCES =
+ Collections.synchronizedList(new ArrayList<>());
+ private final AtomicInteger tableUpdates = new AtomicInteger();
+ private final AtomicInteger jobUpdates = new AtomicInteger();
+ private final AtomicInteger closeCalls = new AtomicInteger();
+
+ public MetricsUpdaterForTest() {
+ INSTANCES.add(this);
+ }
+
+ public static List<MetricsUpdaterForTest> instances() {
+ return new ArrayList<>(INSTANCES);
+ }
+
+ public static void reset() {
+ INSTANCES.clear();
+ }
+
+ public int tableUpdates() {
+ return tableUpdates.get();
+ }
+
+ public int jobUpdates() {
+ return jobUpdates.get();
+ }
+
+ public int closeCalls() {
+ return closeCalls.get();
+ }
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {}
+
+ @Override
+ public void updateTableMetrics(NameIdentifier nameIdentifier,
List<MetricSample> metrics) {
+ tableUpdates.incrementAndGet();
+ }
+
+ @Override
+ public void updateJobMetrics(NameIdentifier nameIdentifier,
List<MetricSample> metrics) {
+ jobUpdates.incrementAndGet();
+ }
+
+ @Override
+ public void close() {
+ closeCalls.incrementAndGet();
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticsCalculatorForTest.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticsCalculatorForTest.java
new file mode 100644
index 0000000000..3736deff07
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticsCalculatorForTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import
org.apache.gravitino.maintenance.optimizer.api.common.TableStatisticsBundle;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateBulkJobStatistics;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateBulkTableStatistics;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+
+public class StatisticsCalculatorForTest
+ implements SupportsCalculateBulkTableStatistics,
SupportsCalculateBulkJobStatistics {
+
+ public static final String NAME = "test-statistics-calculator";
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {}
+
+ @Override
+ public TableStatisticsBundle calculateTableStatistics(NameIdentifier
tableIdentifier) {
+ List<StatisticEntry<?>> tableStatistics = List.of(entry("row_count", 10L));
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics =
+ Map.of(
+ PartitionPath.of(List.of(new TestPartitionEntry("p1", "v1"))),
+ List.of(entry("row_count", 3L)),
+ PartitionPath.of(List.of(new TestPartitionEntry("p2", "v2"))),
+ List.of(entry("row_count", 7L)));
+ return new TableStatisticsBundle(tableStatistics, partitionStatistics);
+ }
+
+ @Override
+ public List<StatisticEntry<?>> calculateJobStatistics(NameIdentifier
jobIdentifier) {
+ return List.of(entry("output_rows", 5L));
+ }
+
+ @Override
+ public Map<NameIdentifier, TableStatisticsBundle>
calculateBulkTableStatistics() {
+ NameIdentifier identifier = NameIdentifier.of("catalog", "schema",
"table");
+ return Map.of(identifier, calculateTableStatistics(identifier));
+ }
+
+ @Override
+ public Map<NameIdentifier, List<StatisticEntry<?>>>
calculateAllJobStatistics() {
+ NameIdentifier identifier = NameIdentifier.of("job", "sample");
+ return Map.of(identifier, calculateJobStatistics(identifier));
+ }
+
+ private static StatisticEntry<?> entry(String name, long value) {
+ StatisticValue statisticValue = StatisticValues.longValue(value);
+ return new StatisticEntryImpl(name, statisticValue);
+ }
+
+ private static final class TestPartitionEntry implements PartitionEntry {
+ private final String name;
+ private final String value;
+
+ private TestPartitionEntry(String name, String value) {
+ this.name = name;
+ this.value = value;
+ }
+
+ @Override
+ public String partitionName() {
+ return name;
+ }
+
+ @Override
+ public String partitionValue() {
+ return value;
+ }
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticsUpdaterForTest.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticsUpdaterForTest.java
new file mode 100644
index 0000000000..a2e19a3d15
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticsUpdaterForTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+
+public class StatisticsUpdaterForTest implements StatisticsUpdater {
+
+ public static final String NAME = "test-statistics-updater";
+ private static final List<StatisticsUpdaterForTest> INSTANCES =
+ Collections.synchronizedList(new ArrayList<>());
+ private final AtomicInteger tableUpdates = new AtomicInteger();
+ private final AtomicInteger partitionUpdates = new AtomicInteger();
+ private final AtomicInteger closeCalls = new AtomicInteger();
+
+ public StatisticsUpdaterForTest() {
+ INSTANCES.add(this);
+ }
+
+ public static List<StatisticsUpdaterForTest> instances() {
+ return new ArrayList<>(INSTANCES);
+ }
+
+ public static void reset() {
+ INSTANCES.clear();
+ }
+
+ public int tableUpdates() {
+ return tableUpdates.get();
+ }
+
+ public int partitionUpdates() {
+ return partitionUpdates.get();
+ }
+
+ public int closeCalls() {
+ return closeCalls.get();
+ }
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {}
+
+ @Override
+ public void updateTableStatistics(
+ NameIdentifier tableIdentifier, List<StatisticEntry<?>> tableStatistics)
{
+ tableUpdates.incrementAndGet();
+ }
+
+ @Override
+ public void updatePartitionStatistics(
+ NameIdentifier tableIdentifier,
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics) {
+ partitionUpdates.incrementAndGet();
+ }
+
+ @Override
+ public void close() {
+ closeCalls.incrementAndGet();
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/TestUpdater.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/TestUpdater.java
new file mode 100644
index 0000000000..f7eb63b8c8
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/TestUpdater.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.Comparator;
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestUpdater {
+
+ @AfterEach
+ void tearDown() {
+ StatisticsUpdaterForTest.reset();
+ MetricsUpdaterForTest.reset();
+ }
+
+ @Test
+ void testUpdateStatistics() {
+ OptimizerEnv optimizerEnv =
+ mockOptimizerEnv(StatisticsUpdaterForTest.NAME,
MetricsUpdaterForTest.NAME);
+ NameIdentifier identifier = NameIdentifier.of("catalog", "schema",
"table");
+
+ createUpdater(optimizerEnv)
+ .update(StatisticsCalculatorForTest.NAME, List.of(identifier),
UpdateType.STATISTICS);
+
+ StatisticsUpdaterForTest statisticsUpdater = selectStatisticsUpdater();
+ MetricsUpdaterForTest metricsUpdater = selectMetricsUpdater();
+ assertNotNull(statisticsUpdater);
+ assertNotNull(metricsUpdater);
+ assertEquals(1, statisticsUpdater.tableUpdates());
+ assertEquals(1, statisticsUpdater.partitionUpdates());
+ assertEquals(0, metricsUpdater.tableUpdates());
+ assertEquals(0, metricsUpdater.jobUpdates());
+ }
+
+ @Test
+ void testUpdateMetricsWithJobStatistics() {
+ OptimizerEnv optimizerEnv =
+ mockOptimizerEnv(StatisticsUpdaterForTest.NAME,
MetricsUpdaterForTest.NAME);
+ NameIdentifier identifier = NameIdentifier.of("catalog", "schema",
"table");
+
+ createUpdater(optimizerEnv)
+ .update(StatisticsCalculatorForTest.NAME, List.of(identifier),
UpdateType.METRICS);
+
+ StatisticsUpdaterForTest statisticsUpdater = selectStatisticsUpdater();
+ MetricsUpdaterForTest metricsUpdater = selectMetricsUpdater();
+ assertNotNull(statisticsUpdater);
+ assertNotNull(metricsUpdater);
+ assertEquals(0, statisticsUpdater.tableUpdates());
+ assertEquals(0, statisticsUpdater.partitionUpdates());
+ assertEquals(2, metricsUpdater.tableUpdates());
+ assertEquals(1, metricsUpdater.jobUpdates());
+ }
+
+ @Test
+ void testUpdateAllStatistics() {
+ OptimizerEnv optimizerEnv =
+ mockOptimizerEnv(StatisticsUpdaterForTest.NAME,
MetricsUpdaterForTest.NAME);
+
+ createUpdater(optimizerEnv).updateAll(StatisticsCalculatorForTest.NAME,
UpdateType.STATISTICS);
+
+ StatisticsUpdaterForTest statisticsUpdater = selectStatisticsUpdater();
+ MetricsUpdaterForTest metricsUpdater = selectMetricsUpdater();
+ assertNotNull(statisticsUpdater);
+ assertNotNull(metricsUpdater);
+ assertEquals(1, statisticsUpdater.tableUpdates());
+ assertEquals(1, statisticsUpdater.partitionUpdates());
+ assertEquals(0, metricsUpdater.tableUpdates());
+ assertEquals(0, metricsUpdater.jobUpdates());
+ }
+
+ @Test
+ void testUpdateAllMetricsWithJobStatistics() {
+ OptimizerEnv optimizerEnv =
+ mockOptimizerEnv(StatisticsUpdaterForTest.NAME,
MetricsUpdaterForTest.NAME);
+
+ createUpdater(optimizerEnv).updateAll(StatisticsCalculatorForTest.NAME,
UpdateType.METRICS);
+
+ StatisticsUpdaterForTest statisticsUpdater = selectStatisticsUpdater();
+ MetricsUpdaterForTest metricsUpdater = selectMetricsUpdater();
+ assertNotNull(statisticsUpdater);
+ assertNotNull(metricsUpdater);
+ assertEquals(0, statisticsUpdater.tableUpdates());
+ assertEquals(0, statisticsUpdater.partitionUpdates());
+ assertEquals(2, metricsUpdater.tableUpdates());
+ assertEquals(1, metricsUpdater.jobUpdates());
+ }
+
+ @Test
+ void testCloseClosesProviders() throws Exception {
+ OptimizerEnv optimizerEnv =
+ mockOptimizerEnv(StatisticsUpdaterForTest.NAME,
MetricsUpdaterForTest.NAME);
+
+ Updater updater = createUpdater(optimizerEnv);
+ updater.close();
+
+ StatisticsUpdaterForTest statisticsUpdater = selectStatisticsUpdater();
+ MetricsUpdaterForTest metricsUpdater = selectMetricsUpdater();
+ assertNotNull(statisticsUpdater);
+ assertNotNull(metricsUpdater);
+ assertEquals(1, statisticsUpdater.closeCalls());
+ assertEquals(1, metricsUpdater.closeCalls());
+ }
+
+ private Updater createUpdater(OptimizerEnv optimizerEnv) {
+ return new Updater(optimizerEnv);
+ }
+
+ private StatisticsUpdaterForTest selectStatisticsUpdater() {
+ return StatisticsUpdaterForTest.instances().stream()
+ .max(
+ Comparator.comparingInt(
+ updater ->
+ updater.tableUpdates() + updater.partitionUpdates() +
updater.closeCalls()))
+ .orElse(null);
+ }
+
+ private MetricsUpdaterForTest selectMetricsUpdater() {
+ return MetricsUpdaterForTest.instances().stream()
+ .max(
+ Comparator.comparingInt(
+ updater -> updater.tableUpdates() + updater.jobUpdates() +
updater.closeCalls()))
+ .orElse(null);
+ }
+
+ private OptimizerEnv mockOptimizerEnv(String statisticsUpdater, String
metricsUpdater) {
+ OptimizerConfig config = Mockito.mock(OptimizerConfig.class);
+ Mockito.when(config.get(OptimizerConfig.STATISTICS_UPDATER_CONFIG))
+ .thenReturn(statisticsUpdater);
+
Mockito.when(config.get(OptimizerConfig.METRICS_UPDATER_CONFIG)).thenReturn(metricsUpdater);
+ OptimizerEnv optimizerEnv = Mockito.mock(OptimizerEnv.class);
+ Mockito.when(optimizerEnv.config()).thenReturn(config);
+ return optimizerEnv;
+ }
+}
diff --git
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
new file mode 100644
index 0000000000..1948a23487
--- /dev/null
+++
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.gravitino.maintenance.optimizer.updater.StatisticsUpdaterForTest
+org.apache.gravitino.maintenance.optimizer.updater.MetricsUpdaterForTest
diff --git
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator
new file mode 100644
index 0000000000..2e0ab7cd28
--- /dev/null
+++
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+org.apache.gravitino.maintenance.optimizer.updater.StatisticsCalculatorForTest
diff --git
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater
new file mode 100644
index 0000000000..ee1202f65e
--- /dev/null
+++
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+org.apache.gravitino.maintenance.optimizer.updater.StatisticsUpdaterForTest