This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 4c74bd8db3 [#9652] feat(maintenance): support Gravitino providers for
recommender (#9651)
4c74bd8db3 is described below
commit 4c74bd8db3b66f598dc084c3302a75fa9aa5f51d
Author: FANNG <[email protected]>
AuthorDate: Thu Jan 8 18:36:19 2026 +0900
[#9652] feat(maintenance): support Gravitino providers for recommender
(#9651)
### What changes were proposed in this pull request?
support Gravitino providers for recommender
### Why are the changes needed?
Fix: #9652
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
---
build.gradle.kts | 2 +
maintenance/optimizer/build.gradle.kts | 23 +++-
.../optimizer/common/PartitionEntryImpl.java | 38 ++++++
.../optimizer/common/StatisticEntryImpl.java | 51 +++++++
.../optimizer/common/conf/OptimizerConfig.java | 28 ++--
.../common/util/GravitinoClientUtils.java | 45 ++++++
.../optimizer/common/util/IdentifierUtils.java | 74 ++++++++++
.../optimizer/recommender/Recommender.java | 27 +++-
.../recommender/job/GravitinoJobAdapter.java | 43 ++++++
.../recommender/job/GravitinoJobSubmitter.java | 95 +++++++++++++
.../recommender/job/NoopJobSubmitter.java | 76 +++++++++++
.../statistics/GravitinoStatisticsProvider.java | 141 +++++++++++++++++++
.../recommender/strategy/GravitinoStrategy.java | 121 +++++++++++++++++
.../strategy/GravitinoStrategyProvider.java | 123 +++++++++++++++++
.../table/GravitinoTableMetadataProvider.java | 81 +++++++++++
.../optimizer/recommender/util/PartitionUtils.java | 106 +++++++++++++++
...itino.maintenance.optimizer.api.common.Provider | 23 ++++
.../optimizer/common/conf/TestOptimizerConfig.java | 56 ++++++++
.../optimizer/common/util/TestIdentifierUtils.java | 106 +++++++++++++++
.../optimizer/common/util/TestProviderUtils.java | 73 ++++++++++
.../recommender/TestRecommenderOrdering.java | 91 +++++++++++++
.../recommender/TestStrategyFiltering.java | 151 +++++++++++++++++++++
.../recommender/util/TestPartitionUtils.java | 74 ++++++++++
23 files changed, 1634 insertions(+), 14 deletions(-)
diff --git a/build.gradle.kts b/build.gradle.kts
index eb8b45ade9..02ae5cbdf4 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -995,6 +995,7 @@ tasks {
!it.name.startsWith("flink") &&
!it.name.startsWith("iceberg") &&
!it.name.startsWith("lance") &&
+ !it.name.startsWith("optimizer") &&
!it.name.startsWith("spark") &&
!it.name.startsWith("hive-metastore") &&
it.name != "hadoop-common" &&
@@ -1042,6 +1043,7 @@ tasks {
!it.name.startsWith("trino-connector") &&
it.name != "hive-metastore2-libs" &&
it.name != "hive-metastore3-libs" &&
+ !it.name.startsWith("optimizer") &&
it.name != "hive-metastore-common" &&
it.name != "docs" &&
it.name != "hadoop-common" &&
diff --git a/maintenance/optimizer/build.gradle.kts
b/maintenance/optimizer/build.gradle.kts
index 66dfe8eb18..638dadbe3f 100644
--- a/maintenance/optimizer/build.gradle.kts
+++ b/maintenance/optimizer/build.gradle.kts
@@ -26,10 +26,29 @@ plugins {
dependencies {
implementation(project(":api"))
- implementation(project(":common"))
+ implementation(project(":catalogs:catalog-common"))
+ implementation(project(":clients:client-java"))
+ implementation(project(":core")) {
+ exclude("*")
+ }
+ implementation(project(":common")) {
+ exclude("*")
+ }
+ implementation(libs.bundles.log4j)
implementation(libs.commons.lang3)
+ implementation(libs.jackson.databind)
+ implementation(libs.jackson.annotations)
implementation(libs.guava)
- implementation(libs.slf4j.api)
+
+ annotationProcessor(libs.lombok)
+ compileOnly(libs.lombok)
+
+ testImplementation(libs.junit.jupiter.api)
+ testImplementation(libs.junit.jupiter.params)
+ testAnnotationProcessor(libs.lombok)
+ testCompileOnly(libs.lombok)
+
+ testRuntimeOnly(libs.junit.jupiter.engine)
}
tasks {
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionEntryImpl.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionEntryImpl.java
new file mode 100644
index 0000000000..3a2ce42f92
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionEntryImpl.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
+
+/** Immutable {@link PartitionEntry} implementation backed by Lombok. */
+@Accessors(fluent = true)
+@AllArgsConstructor
+@EqualsAndHashCode
+@Getter
+@ToString
+public final class PartitionEntryImpl implements PartitionEntry {
+ private final String partitionName;
+ private final String partitionValue;
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/StatisticEntryImpl.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/StatisticEntryImpl.java
new file mode 100644
index 0000000000..fef59f2f1c
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/StatisticEntryImpl.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.experimental.Accessors;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.stats.StatisticValue;
+
+/**
+ * Immutable {@link StatisticEntry} implementation backed by Lombok.
+ *
+ * @param <T> underlying value type
+ */
+@Accessors(fluent = true)
+@AllArgsConstructor
+@EqualsAndHashCode
+@Getter
+public final class StatisticEntryImpl<T> implements StatisticEntry<T> {
+ private final String name;
+ private final StatisticValue<T> value;
+
+ /**
+ * Returns a concise string for logging and diagnostics.
+ *
+ * @return formatted string containing the statistic name and value
+ */
+ @Override
+ public String toString() {
+ return "{ " + name + " : " + value.value() + '}';
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
index bf96e359dd..3f62b85c17 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
@@ -25,6 +25,10 @@ import org.apache.gravitino.Config;
import org.apache.gravitino.config.ConfigBuilder;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.config.ConfigEntry;
+import
org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter;
+import
org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider;
+import
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategyProvider;
+import
org.apache.gravitino.maintenance.optimizer.recommender.table.GravitinoTableMetadataProvider;
/**
* Central configuration holder for the optimizer/recommender runtime. Keys
are grouped under the
@@ -51,37 +55,45 @@ public class OptimizerConfig extends Config {
new ConfigBuilder(STATISTICS_PROVIDER)
.doc(
"Statistics provider implementation name (matches
Provider.name()) discoverable via "
- + "ServiceLoader. Example: 'gravitino-statistics-provider'.")
+ + "ServiceLoader. Example: '"
+ + GravitinoStatisticsProvider.NAME
+ + "'.")
.version(ConfigConstants.VERSION_1_2_0)
.stringConf()
- .create();
+ .createWithDefault(GravitinoStatisticsProvider.NAME);
public static final ConfigEntry<String> STRATEGY_PROVIDER_CONFIG =
new ConfigBuilder(STRATEGY_PROVIDER)
.doc(
"Strategy provider implementation name (matches Provider.name())
discoverable via "
- + "ServiceLoader. Example: 'gravitino-strategy-provider'.")
+ + "ServiceLoader. Example: '"
+ + GravitinoStrategyProvider.NAME
+ + "'.")
.version(ConfigConstants.VERSION_1_2_0)
.stringConf()
- .create();
+ .createWithDefault(GravitinoStrategyProvider.NAME);
public static final ConfigEntry<String> TABLE_META_PROVIDER_CONFIG =
new ConfigBuilder(TABLE_META_PROVIDER)
.doc(
"Table metadata provider implementation name (matches
Provider.name()) discoverable "
- + "via ServiceLoader. Example:
'gravitino-table-metadata-provider'.")
+ + "via ServiceLoader. Example: '"
+ + GravitinoTableMetadataProvider.NAME
+ + "'.")
.version(ConfigConstants.VERSION_1_2_0)
.stringConf()
- .create();
+ .createWithDefault(GravitinoTableMetadataProvider.NAME);
public static final ConfigEntry<String> JOB_SUBMITTER_CONFIG =
new ConfigBuilder(JOB_SUBMITTER)
.doc(
"Job submitter implementation name (matches Provider.name())
discoverable via "
- + "ServiceLoader. Example: 'gravitino-job-submitter'.")
+ + "ServiceLoader. Example: '"
+ + NoopJobSubmitter.NAME
+ + "'.")
.version(ConfigConstants.VERSION_1_2_0)
.stringConf()
- .create();
+ .createWithDefault(NoopJobSubmitter.NAME);
public static final ConfigEntry<String> GRAVITINO_URI_CONFIG =
new ConfigBuilder(GRAVITINO_URI)
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/GravitinoClientUtils.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/GravitinoClientUtils.java
new file mode 100644
index 0000000000..052d7b2d72
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/GravitinoClientUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+
+/** Utility methods for creating Gravitino clients from optimizer
configuration. */
+public final class GravitinoClientUtils {
+
+ private GravitinoClientUtils() {}
+
+ /**
+ * Creates a {@link GravitinoClient} using optimizer configuration.
+ *
+ * @param optimizerEnv optimizer environment
+ * @return configured Gravitino client
+ */
+ public static GravitinoClient createClient(OptimizerEnv optimizerEnv) {
+ Preconditions.checkArgument(optimizerEnv != null, "optimizerEnv must not
be null");
+ OptimizerConfig config = optimizerEnv.config();
+ String uri = config.get(OptimizerConfig.GRAVITINO_URI_CONFIG);
+ String metalake = config.get(OptimizerConfig.GRAVITINO_METALAKE_CONFIG);
+ return GravitinoClient.builder(uri).withMetalake(metalake).build();
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java
new file mode 100644
index 0000000000..415e3b5634
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+
+/** Utilities for working with fully qualified table identifiers. */
+public class IdentifierUtils {
+
+ private static final String NORMALIZED_IDENTIFIER_MESSAGE =
+ "Identifier must be catalog.schema.table";
+
+ /**
+ * Removes the catalog level from a catalog.schema.table identifier.
+ *
+ * @param tableIdentifier fully qualified table identifier
+ * @return schema.table identifier
+ * @throws IllegalArgumentException if the identifier is not
catalog.schema.table
+ */
+ public static NameIdentifier removeCatalogFromIdentifier(NameIdentifier
tableIdentifier) {
+ Preconditions.checkArgument(tableIdentifier != null, "tableIdentifier must
not be null");
+ Namespace namespace = tableIdentifier.namespace();
+ Preconditions.checkArgument(
+ namespace != null && namespace.levels().length == 2,
NORMALIZED_IDENTIFIER_MESSAGE);
+ return NameIdentifier.of(namespace.levels()[1], tableIdentifier.name());
+ }
+
+ /**
+ * Returns the catalog name from a catalog.schema.table identifier.
+ *
+ * @param tableIdentifier fully qualified table identifier
+ * @return catalog name
+ * @throws IllegalArgumentException if the identifier is not
catalog.schema.table
+ */
+ public static String getCatalogNameFromTableIdentifier(NameIdentifier
tableIdentifier) {
+ Preconditions.checkArgument(tableIdentifier != null, "tableIdentifier must
not be null");
+ Namespace namespace = tableIdentifier.namespace();
+ Preconditions.checkArgument(
+ namespace != null && namespace.levels().length == 2,
NORMALIZED_IDENTIFIER_MESSAGE);
+ return namespace.levels()[0];
+ }
+
+ /**
+ * Validates that a table identifier is normalized as catalog.schema.table.
+ *
+ * @param tableIdentifier identifier to validate
+ * @throws IllegalArgumentException if the identifier is not
catalog.schema.table
+ */
+ public static void requireTableIdentifierNormalized(NameIdentifier
tableIdentifier) {
+ Preconditions.checkArgument(tableIdentifier != null, "tableIdentifier must
not be null");
+ Namespace namespace = tableIdentifier.namespace();
+ Preconditions.checkArgument(
+ namespace != null && namespace.levels().length == 2,
NORMALIZED_IDENTIFIER_MESSAGE);
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
index 8cb960fd52..c5a142293a 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.maintenance.optimizer.recommender;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
@@ -102,10 +103,21 @@ public class Recommender implements AutoCloseable {
this.tableMetadataProvider.initialize(optimizerEnv);
this.jobSubmitter.initialize(optimizerEnv);
- closeableGroup.register(strategyProvider, "strategy provider");
- closeableGroup.register(statisticsProvider, "statistics provider");
- closeableGroup.register(tableMetadataProvider, "table metadata provider");
- closeableGroup.register(jobSubmitter, "job submitter");
+ addToCloseableGroup();
+ }
+
+ @VisibleForTesting
+ Recommender(
+ StrategyProvider strategyProvider,
+ StatisticsProvider statisticsProvider,
+ TableMetadataProvider tableMetadataProvider,
+ JobSubmitter jobSubmitter) {
+ this.strategyProvider = strategyProvider;
+ this.statisticsProvider = statisticsProvider;
+ this.tableMetadataProvider = tableMetadataProvider;
+ this.jobSubmitter = jobSubmitter;
+
+ addToCloseableGroup();
}
/**
@@ -143,6 +155,13 @@ public class Recommender implements AutoCloseable {
closeableGroup.close();
}
+ private void addToCloseableGroup() {
+ closeableGroup.register(strategyProvider, "strategy provider");
+ closeableGroup.register(statisticsProvider, "statistics provider");
+ closeableGroup.register(tableMetadataProvider, "table metadata provider");
+ closeableGroup.register(jobSubmitter, "job submitter");
+ }
+
private List<JobExecutionContext> recommendForOneStrategy(
List<NameIdentifier> identifiers, String strategyName) {
LOG.info("Recommend strategy {} for identifiers {}", strategyName,
identifiers);
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoJobAdapter.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoJobAdapter.java
new file mode 100644
index 0000000000..8e030d4988
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoJobAdapter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import java.util.Map;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+
+/**
+ * Translates optimizer job execution context into Gravitino job submission
inputs.
+ *
+ * <p>Usage: {@link GravitinoJobSubmitter} looks up an adapter for the
requested job template,
+ * creates a new instance, and calls {@link #jobConfig(JobExecutionContext)}
when submitting the job
+ * to Gravitino.
+ */
+public interface GravitinoJobAdapter {
+
+ /**
+ * Returns the Gravitino job configuration map derived from the execution
context.
+ *
+ * <p>The returned map is supplied as job parameters when submitting the job
to Gravitino.
+ *
+ * @param jobExecutionContext job execution context
+ * @return job configuration map
+ */
+ Map<String, String> jobConfig(JobExecutionContext jobExecutionContext);
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoJobSubmitter.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoJobSubmitter.java
new file mode 100644
index 0000000000..455d51795d
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoJobSubmitter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import org.apache.gravitino.client.GravitinoClient;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import
org.apache.gravitino.maintenance.optimizer.common.util.GravitinoClientUtils;
+
+/** Submits optimizer jobs to Gravitino using job template adapters. */
+public class GravitinoJobSubmitter implements JobSubmitter {
+
+ public static final String NAME = "gravitino-job-submitter";
+
+ private GravitinoClient gravitinoClient;
+
+ private final Map<String, Class<? extends GravitinoJobAdapter>> jobAdapters
= Map.of();
+
+ /**
+ * Returns the provider name for configuration lookup.
+ *
+ * @return provider name
+ */
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ /**
+ * Initializes the submitter with a Gravitino client derived from the
optimizer configuration.
+ *
+ * @param optimizerEnv optimizer environment
+ */
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {
+ this.gravitinoClient = GravitinoClientUtils.createClient(optimizerEnv);
+ }
+
+ /**
+ * Submits a job through Gravitino using the resolved job adapter.
+ *
+ * @param jobTemplateName template name used to select an adapter
+ * @param jobExecutionContext execution context for the job
+ * @return submitted job identifier
+ */
+ @Override
+ public String submitJob(String jobTemplateName, JobExecutionContext
jobExecutionContext) {
+ GravitinoJobAdapter jobAdapter = loadJobAdapter(jobTemplateName);
+ return gravitinoClient
+ .runJob(jobTemplateName, jobAdapter.jobConfig(jobExecutionContext))
+ .jobId();
+ }
+
+ /** Closes the underlying Gravitino client. */
+ @Override
+ public void close() throws Exception {
+ if (gravitinoClient != null) {
+ gravitinoClient.close();
+ }
+ }
+
+ @VisibleForTesting
+ GravitinoJobAdapter loadJobAdapter(String jobTemplateName) {
+ Class<? extends GravitinoJobAdapter> jobAdapterClz =
jobAdapters.get(jobTemplateName);
+ if (jobAdapterClz == null) {
+ throw new IllegalArgumentException("No job adapter found for template: "
+ jobTemplateName);
+ }
+ try {
+ return jobAdapterClz.getDeclaredConstructor().newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to create job adapter for template: " + jobTemplateName, e);
+ }
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/NoopJobSubmitter.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/NoopJobSubmitter.java
new file mode 100644
index 0000000000..5c101bf597
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/NoopJobSubmitter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Job submitter that logs requests without submitting any jobs. */
+public class NoopJobSubmitter implements JobSubmitter {
+ private final Logger LOG = LoggerFactory.getLogger(NoopJobSubmitter.class);
+
+ public static final String NAME = "noop-job-submitter";
+
+ /**
+ * Logs the job submission request and returns an empty job id.
+ *
+ * @param jobTemplateName job template name
+ * @param jobExecutionContext job execution context
+ * @return empty job id
+ */
+ @Override
+ public String submitJob(String jobTemplateName, JobExecutionContext
jobExecutionContext) {
+ LOG.info(
+ "NoopJobSubmitter submitJob: template={}, identifier={},
jobExecuteContext={}",
+ jobTemplateName,
+ jobExecutionContext.nameIdentifier(),
+ jobExecutionContext);
+ return "";
+ }
+
+ /**
+ * Returns the provider name for configuration lookup.
+ *
+ * @return provider name
+ */
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ /**
+ * No-op initialization hook.
+ *
+ * @param optimizerEnv optimizer environment
+ */
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {}
+
+ /**
+ * No-op close hook.
+ *
+ * @throws Exception never thrown
+ */
+ @Override
+ public void close() throws Exception {}
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/statistics/GravitinoStatisticsProvider.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/statistics/GravitinoStatisticsProvider.java
new file mode 100644
index 0000000000..394691d286
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/statistics/GravitinoStatisticsProvider.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.statistics;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.SupportTableStatistics;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import
org.apache.gravitino.maintenance.optimizer.common.util.GravitinoClientUtils;
+import org.apache.gravitino.maintenance.optimizer.common.util.IdentifierUtils;
+import
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatistics;
+import org.apache.gravitino.stats.Statistic;
+
+/** Statistics provider that reads table and partition statistics from
Gravitino. */
+public class GravitinoStatisticsProvider implements SupportTableStatistics {
+
+ public static final String NAME = "gravitino-statistics-provider";
+ private GravitinoClient gravitinoClient;
+
+ /**
+ * Initializes the provider with a Gravitino client derived from the
optimizer configuration.
+ *
+ * @param optimizerEnv optimizer environment
+ */
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {
+ this.gravitinoClient = GravitinoClientUtils.createClient(optimizerEnv);
+ }
+
+ /**
+ * Returns table-level statistics for the given table identifier.
+ *
+ * @param tableIdentifier fully qualified table identifier
+ * @return list of statistics entries
+ */
+ @Override
+ public List<StatisticEntry<?>> tableStatistics(NameIdentifier
tableIdentifier) {
+ IdentifierUtils.requireTableIdentifierNormalized(tableIdentifier);
+ Table t =
+ gravitinoClient
+
.loadCatalog(IdentifierUtils.getCatalogNameFromTableIdentifier(tableIdentifier))
+ .asTableCatalog()
+
.loadTable(IdentifierUtils.removeCatalogFromIdentifier(tableIdentifier));
+ List<Statistic> statistics = t.supportsStatistics().listStatistics();
+ return statistics.stream()
+ .filter(statistic -> statistic.value().isPresent())
+ .map(
+ statistic ->
+ (StatisticEntry<?>)
+ new StatisticEntryImpl(statistic.name(),
statistic.value().get()))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Returns partition-level statistics for the given table identifier.
+ *
+ * @param tableIdentifier fully qualified table identifier
+ * @return statistics grouped by partition path
+ */
+ @Override
+ public Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics(
+ NameIdentifier tableIdentifier) {
+ IdentifierUtils.requireTableIdentifierNormalized(tableIdentifier);
+ Table t =
+ gravitinoClient
+
.loadCatalog(IdentifierUtils.getCatalogNameFromTableIdentifier(tableIdentifier))
+ .asTableCatalog()
+
.loadTable(IdentifierUtils.removeCatalogFromIdentifier(tableIdentifier));
+ List<PartitionStatistics> partitionStatistics =
+
t.supportsPartitionStatistics().listPartitionStatistics(PartitionRange.ALL_PARTITIONS);
+
+ Map<PartitionPath, List<StatisticEntry<?>>> statisticsByPartition = new
LinkedHashMap<>();
+ partitionStatistics.forEach(
+ statistic -> toPartitionStatistics(statistic, statisticsByPartition));
+ return statisticsByPartition;
+ }
+
+ private void toPartitionStatistics(
+ PartitionStatistics partitionStatistics,
+ Map<PartitionPath, List<StatisticEntry<?>>> statisticsByPartition) {
+ PartitionPath partitions =
+
PartitionUtils.decodePartitionPath(partitionStatistics.partitionName());
+ Arrays.stream(partitionStatistics.statistics())
+ .filter(statistic -> statistic.value().isPresent())
+ .forEach(
+ statistic ->
+ statisticsByPartition
+ .computeIfAbsent(partitions, key -> new
java.util.ArrayList<>())
+ .add(new StatisticEntryImpl<>(statistic.name(),
statistic.value().get())));
+ }
+
+ /**
+ * Returns the provider name for configuration lookup.
+ *
+ * @return provider name
+ */
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ /**
+ * Closes the underlying Gravitino client.
+ *
+ * @throws Exception if closing fails
+ */
+ @Override
+ public void close() throws Exception {
+ if (gravitinoClient != null) {
+ gravitinoClient.close();
+ }
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategy.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategy.java
new file mode 100644
index 0000000000..0dfb665c17
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategy.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.strategy;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import org.apache.gravitino.policy.Policy;
+import org.apache.gravitino.policy.PolicyContent;
+
+/** Strategy implementation backed by a Gravitino policy. */
+public class GravitinoStrategy implements Strategy {
+
+ @VisibleForTesting public static final String STRATEGY_TYPE_KEY =
"strategy.type";
+
+ @VisibleForTesting public static final String JOB_TEMPLATE_NAME_KEY =
"job.template-name";
+
+ private static final String JOB_OPTIONS_PREFIX = "job.options.";
+
+ private final Policy policy;
+
+ /**
+ * Creates a strategy wrapper for the given policy.
+ *
+ * @param policy policy to wrap
+ */
+ public GravitinoStrategy(Policy policy) {
+ this.policy = policy;
+ }
+
+ /**
+ * Returns the strategy name.
+ *
+ * @return strategy name
+ */
+ @Override
+ public String name() {
+ return policy.name();
+ }
+
+ /**
+ * Returns the strategy type declared in policy properties.
+ *
+ * @return strategy type
+ */
+ @Override
+ public String strategyType() {
+ return policy.content().properties().get(STRATEGY_TYPE_KEY);
+ }
+
+ /**
+ * Returns policy properties as strategy properties.
+ *
+ * @return strategy properties
+ */
+ @Override
+ public Map<String, String> properties() {
+ return policy.content().properties();
+ }
+
+ /**
+ * Returns policy rules as strategy rules.
+ *
+ * @return strategy rules
+ */
+ @Override
+ public Map<String, Object> rules() {
+ PolicyContent content = policy.content();
+ Map<String, Object> rules = content.rules();
+ return rules == null ? Map.of() : rules;
+ }
+
+ /**
+ * Returns job options parsed from policy rules.
+ *
+ * @return job options
+ */
+ @Override
+ public Map<String, String> jobOptions() {
+ Map<String, String> jobOptions = new HashMap<>();
+ rules()
+ .forEach(
+ (key, value) -> {
+ if (key.startsWith(JOB_OPTIONS_PREFIX)) {
+ jobOptions.put(key.substring(JOB_OPTIONS_PREFIX.length()),
String.valueOf(value));
+ }
+ });
+ return jobOptions;
+ }
+
+ /**
+ * Returns the job template name for this strategy.
+ *
+ * @return job template name
+ * @throws IllegalArgumentException if the template name is not configured
+ */
+ @Override
+ public String jobTemplateName() {
+ return
Optional.ofNullable(policy.content().properties().get(JOB_TEMPLATE_NAME_KEY))
+ .orElseThrow(() -> new IllegalArgumentException("job.template-name is
not set"));
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategyProvider.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategyProvider.java
new file mode 100644
index 0000000000..6ff802873f
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategyProvider.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.strategy;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.exceptions.NoSuchPolicyException;
+import org.apache.gravitino.exceptions.NotFoundException;
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyProvider;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import
org.apache.gravitino.maintenance.optimizer.common.util.GravitinoClientUtils;
+import org.apache.gravitino.maintenance.optimizer.common.util.IdentifierUtils;
+import org.apache.gravitino.policy.Policy;
+import org.apache.gravitino.rel.Table;
+
+/** Strategy provider that loads policies from Gravitino. */
+public class GravitinoStrategyProvider implements StrategyProvider {
+
+ public static final String NAME = "gravitino-strategy-provider";
+ private GravitinoClient gravitinoClient;
+
+ /**
+ * Initializes the provider with a Gravitino client derived from the
optimizer configuration.
+ *
+ * @param optimizerEnv optimizer environment
+ */
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {
+ this.gravitinoClient = GravitinoClientUtils.createClient(optimizerEnv);
+ }
+
+ /**
+ * Returns the provider name for configuration lookup.
+ *
+ * @return provider name
+ */
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ /**
+ * Lists strategies attached to the specified table identifier.
+ *
+ * @param nameIdentifier fully qualified table identifier
+ * @return list of strategies, possibly empty
+ */
+ @Override
+ public List<Strategy> strategies(NameIdentifier nameIdentifier) {
+ IdentifierUtils.requireTableIdentifierNormalized(nameIdentifier);
+ Table t =
+ gravitinoClient
+
.loadCatalog(IdentifierUtils.getCatalogNameFromTableIdentifier(nameIdentifier))
+ .asTableCatalog()
+
.loadTable(IdentifierUtils.removeCatalogFromIdentifier(nameIdentifier));
+ String[] policyNames = t.supportsPolicies().listPolicies();
+ List<Strategy> policies =
+ Arrays.stream(policyNames)
+ .map(t.supportsPolicies()::getPolicy)
+ .filter(Objects::nonNull)
+ .map(this::toStrategy)
+ .collect(Collectors.toList());
+ return policies;
+ }
+
+ /**
+ * Returns a strategy by name.
+ *
+ * @param strategyName strategy name
+ * @return strategy
+ * @throws NotFoundException if the strategy does not exist
+ */
+ @Override
+ public Strategy strategy(String strategyName) throws NotFoundException {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(strategyName), "strategyName must not be
blank");
+ try {
+ return toStrategy(gravitinoClient.getPolicy(strategyName));
+ } catch (NoSuchPolicyException e) {
+ throw new NotFoundException(e, "Strategy '%s' not found", strategyName);
+ }
+ }
+
+ private Strategy toStrategy(Policy policy) {
+ return new GravitinoStrategy(policy);
+ }
+
+ /**
+ * Closes the underlying Gravitino client.
+ *
+ * @throws Exception if closing fails
+ */
+ @Override
+ public void close() throws Exception {
+ if (gravitinoClient != null) {
+ gravitinoClient.close();
+ }
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/table/GravitinoTableMetadataProvider.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/table/GravitinoTableMetadataProvider.java
new file mode 100644
index 0000000000..0813ffecee
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/table/GravitinoTableMetadataProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.table;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.client.GravitinoClient;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.TableMetadataProvider;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import
org.apache.gravitino.maintenance.optimizer.common.util.GravitinoClientUtils;
+import org.apache.gravitino.maintenance.optimizer.common.util.IdentifierUtils;
+import org.apache.gravitino.rel.Table;
+
+/** Table metadata provider backed by Gravitino catalog tables. */
+public class GravitinoTableMetadataProvider implements TableMetadataProvider {
+ public static final String NAME = "gravitino-table-metadata-provider";
+ private GravitinoClient gravitinoClient;
+
+ /**
+ * Initializes the provider with a Gravitino client derived from the
optimizer configuration.
+ *
+ * @param optimizerEnv optimizer environment
+ */
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {
+ this.gravitinoClient = GravitinoClientUtils.createClient(optimizerEnv);
+ }
+
+ /**
+ * Loads table metadata for the given table identifier.
+ *
+ * @param tableIdentifier fully qualified table identifier
+ * @return table metadata
+ */
+ @Override
+ public Table tableMetadata(NameIdentifier tableIdentifier) {
+ IdentifierUtils.requireTableIdentifierNormalized(tableIdentifier);
+ return gravitinoClient
+
.loadCatalog(IdentifierUtils.getCatalogNameFromTableIdentifier(tableIdentifier))
+ .asTableCatalog()
+
.loadTable(IdentifierUtils.removeCatalogFromIdentifier(tableIdentifier));
+ }
+
+ /**
+ * Returns the provider name for configuration lookup.
+ *
+ * @return provider name
+ */
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ /**
+ * Closes the underlying Gravitino client.
+ *
+ * @throws Exception if closing fails
+ */
+ @Override
+ public void close() throws Exception {
+ if (gravitinoClient != null) {
+ gravitinoClient.close();
+ }
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java
new file mode 100644
index 0000000000..392e7b1f5a
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.util;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+
+/** Helpers for converting between Gravitino partition names and {@link
PartitionPath}. */
+public class PartitionUtils {
+ private static final TypeReference<List<Map<String, String>>>
PARTITION_PATH_TYPE =
+ new TypeReference<List<Map<String, String>>>() {};
+
+ private PartitionUtils() {}
+
+ /**
+ * Encodes a {@link PartitionPath} into a JSON string.
+ *
+ * <p>For example, a path with entries {@code p1=v1, p2=v2} is encoded as
{@code [{"p1":"v1"},
+ * {"p2":"v2"}]}.
+ *
+ * @param partitionPath partition path
+ * @return encoded JSON string
+ */
+ public static String encodePartitionPath(PartitionPath partitionPath) {
+ Preconditions.checkArgument(partitionPath != null, "partitionPath must not
be null");
+ List<PartitionEntry> entries = partitionPath.entries();
+ Preconditions.checkArgument(entries != null && !entries.isEmpty(),
"partitionPath is empty");
+
+ List<Map<String, String>> encoded = new ArrayList<>(entries.size());
+ for (PartitionEntry entry : entries) {
+ String name = entry.partitionName();
+ String value = entry.partitionValue();
+ Preconditions.checkArgument(StringUtils.isNotBlank(name), "partitionName
cannot be blank");
+ Preconditions.checkArgument(StringUtils.isNotBlank(value),
"partitionValue cannot be blank");
+ Map<String, String> item = new LinkedHashMap<>(1);
+ item.put(name, value);
+ encoded.add(item);
+ }
+
+ try {
+ return JsonUtils.objectMapper().writeValueAsString(encoded);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Failed to encode partition path", e);
+ }
+ }
+
+ /**
+ * Decodes a JSON-encoded partition path into a {@link PartitionPath}.
+ *
+ * <p>Example format: {@code [{"p1":"v1"},{"p2":"v2"}]}.
+ *
+ * @param encodedPartitionPath JSON string representing the partition path
+ * @return parsed partition path
+ */
+ public static PartitionPath decodePartitionPath(String encodedPartitionPath)
{
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(encodedPartitionPath), "encodedPartitionPath
must not be blank");
+ List<Map<String, String>> decoded;
+ try {
+ decoded = JsonUtils.objectMapper().readValue(encodedPartitionPath,
PARTITION_PATH_TYPE);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Failed to decode partition path", e);
+ }
+ Preconditions.checkArgument(decoded != null && !decoded.isEmpty(),
"partitionPath is empty");
+
+ List<PartitionEntry> entries = new ArrayList<>(decoded.size());
+ for (Map<String, String> item : decoded) {
+ Preconditions.checkArgument(
+ item != null && item.size() == 1, "partition entry must contain one
key/value pair");
+ Map.Entry<String, String> kv = item.entrySet().iterator().next();
+ String name = kv.getKey();
+ String value = kv.getValue();
+ Preconditions.checkArgument(StringUtils.isNotBlank(name), "partitionName
cannot be blank");
+ Preconditions.checkArgument(StringUtils.isNotBlank(value),
"partitionValue cannot be blank");
+ entries.add(new PartitionEntryImpl(name, value));
+ }
+
+ return PartitionPath.of(entries);
+ }
+}
diff --git
a/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
b/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
new file mode 100644
index 0000000000..665bd4a3ff
--- /dev/null
+++
b/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategyProvider
+org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider
+org.apache.gravitino.maintenance.optimizer.recommender.table.GravitinoTableMetadataProvider
+org.apache.gravitino.maintenance.optimizer.recommender.job.GravitinoJobSubmitter
+org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/conf/TestOptimizerConfig.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/conf/TestOptimizerConfig.java
new file mode 100644
index 0000000000..c84d6954c3
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/conf/TestOptimizerConfig.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.conf;
+
+import java.util.Map;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class TestOptimizerConfig {
+
+ @Test
+ void testOptimizerConfigLoadsPropertiesCorrectly() {
+ Map<String, String> properties =
+ Map.of(
+ OptimizerConfig.GRAVITINO_URI, "http://example.com",
+ OptimizerConfig.GRAVITINO_METALAKE, "example-metalake",
+ OptimizerConfig.GRAVITINO_DEFAULT_CATALOG, "example-catalog");
+ OptimizerConfig config = new OptimizerConfig(properties);
+
+ Assertions.assertEquals("http://example.com",
config.get(OptimizerConfig.GRAVITINO_URI_CONFIG));
+ Assertions.assertEquals(
+ "example-metalake",
config.get(OptimizerConfig.GRAVITINO_METALAKE_CONFIG));
+ Assertions.assertEquals(
+ "example-catalog",
config.get(OptimizerConfig.GRAVITINO_DEFAULT_CATALOG_CONFIG));
+ }
+
+ @Test
+ void testOptimizerConfigHandlesMissingPropertiesGracefully() {
+ Map<String, String> properties = Map.of();
+ OptimizerConfig config = new OptimizerConfig(properties);
+
+ Assertions.assertEquals(
+ "http://localhost:8090",
config.get(OptimizerConfig.GRAVITINO_URI_CONFIG));
+ Assertions.assertThrowsExactly(
+ IllegalArgumentException.class,
+ () -> config.get(OptimizerConfig.GRAVITINO_METALAKE_CONFIG));
+
Assertions.assertNull(config.get(OptimizerConfig.GRAVITINO_DEFAULT_CATALOG_CONFIG));
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIdentifierUtils.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIdentifierUtils.java
new file mode 100644
index 0000000000..4b4958ff53
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIdentifierUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class TestIdentifierUtils {
+
+ @Test
+ void testRemoveCatalogFromIdentifierThrowsWhenCatalogMissing() {
+ Namespace namespace = Namespace.of("singleLevel");
+ NameIdentifier identifier = NameIdentifier.of(namespace, "tableName");
+
+ Assertions.assertThrowsExactly(
+ IllegalArgumentException.class,
+ () -> IdentifierUtils.removeCatalogFromIdentifier(identifier));
+ }
+
+ @Test
+ void testRemoveCatalogFromIdentifierRemovesFirstLevelForTwoLevelNamespace() {
+ Namespace namespace = Namespace.of("catalog", "schema");
+ NameIdentifier identifier = NameIdentifier.of(namespace, "tableName");
+
+ NameIdentifier result =
IdentifierUtils.removeCatalogFromIdentifier(identifier);
+
+ Assertions.assertEquals("schema", result.namespace().levels()[0]);
+ Assertions.assertEquals("tableName", result.name());
+ }
+
+ @Test
+ void testRemoveCatalogFromIdentifierThrowsExceptionForInvalidNamespace() {
+ Namespace namespace = Namespace.of();
+ NameIdentifier identifier = NameIdentifier.of(namespace, "tableName");
+
+ Assertions.assertThrowsExactly(
+ IllegalArgumentException.class,
+ () -> IdentifierUtils.removeCatalogFromIdentifier(identifier));
+ }
+
+ @Test
+ void testGetCatalogNameFromTableIdentifierThrowsWhenCatalogMissing() {
+ Namespace namespace = Namespace.of("schema");
+ NameIdentifier identifier = NameIdentifier.of(namespace, "tableName");
+
+ Assertions.assertThrowsExactly(
+ IllegalArgumentException.class,
+ () -> IdentifierUtils.getCatalogNameFromTableIdentifier(identifier));
+ }
+
+ @Test
+ void
testGetCatalogNameFromTableIdentifierReturnsFirstLevelForMultiLevelNamespace() {
+ Namespace namespace = Namespace.of("catalog", "schema");
+ NameIdentifier identifier = NameIdentifier.of(namespace, "tableName");
+
+ String result =
IdentifierUtils.getCatalogNameFromTableIdentifier(identifier);
+
+ Assertions.assertEquals("catalog", result);
+ }
+
+ @Test
+ void
testGetCatalogNameFromTableIdentifierThrowsExceptionForInvalidNamespace() {
+ Namespace namespace = Namespace.of();
+ NameIdentifier identifier = NameIdentifier.of(namespace, "tableName");
+
+ Assertions.assertThrowsExactly(
+ IllegalArgumentException.class,
+ () -> IdentifierUtils.getCatalogNameFromTableIdentifier(identifier));
+ }
+
+ @Test
+ void testRequireTableIdentifierNormalizedSucceedsWhenCatalogPresent() {
+ NameIdentifier identifier = NameIdentifier.of("catalog", "schema",
"tableName");
+
+ Assertions.assertDoesNotThrow(
+ () -> IdentifierUtils.requireTableIdentifierNormalized(identifier));
+ }
+
+ @Test
+ void testRequireTableIdentifierNormalizedThrowsWhenCatalogMissing() {
+ NameIdentifier identifier = NameIdentifier.of("schema", "tableName");
+
+ Assertions.assertThrowsExactly(
+ IllegalArgumentException.class,
+ () -> IdentifierUtils.requireTableIdentifierNormalized(identifier));
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
new file mode 100644
index 0000000000..2855194509
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.StatisticsProvider;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyProvider;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.TableMetadataProvider;
+import
org.apache.gravitino.maintenance.optimizer.recommender.job.GravitinoJobSubmitter;
+import
org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter;
+import
org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider;
+import
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategyProvider;
+import
org.apache.gravitino.maintenance.optimizer.recommender.table.GravitinoTableMetadataProvider;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestProviderUtils {
+
+ @Test
+ public void testCreateStrategyProviderInstance() {
+ StrategyProvider strategyProvider =
+
ProviderUtils.createStrategyProviderInstance(GravitinoStrategyProvider.NAME);
+ Assertions.assertNotNull(strategyProvider);
+ Assertions.assertTrue(strategyProvider instanceof
GravitinoStrategyProvider);
+ }
+
+ @Test
+ public void testCreateJobSubmitterInstance() {
+ JobSubmitter jobSubmitter =
+ ProviderUtils.createJobSubmitterInstance(GravitinoJobSubmitter.NAME);
+ Assertions.assertNotNull(jobSubmitter);
+ Assertions.assertTrue(jobSubmitter instanceof GravitinoJobSubmitter);
+
+ jobSubmitter =
ProviderUtils.createJobSubmitterInstance(NoopJobSubmitter.NAME);
+ Assertions.assertNotNull(jobSubmitter);
+ Assertions.assertTrue(jobSubmitter instanceof NoopJobSubmitter);
+ }
+
+ @Test
+ public void testCreateStatisticsProviderInstance() {
+ StatisticsProvider statisticsProvider =
+
ProviderUtils.createStatisticsProviderInstance(GravitinoStatisticsProvider.NAME);
+ Assertions.assertNotNull(statisticsProvider);
+ Assertions.assertTrue(statisticsProvider instanceof
GravitinoStatisticsProvider);
+ }
+
+ @Test
+ public void testCreateTableMetadataProviderInstance() {
+ TableMetadataProvider tableMetadataProvider =
+
ProviderUtils.createTableMetadataProviderInstance(GravitinoTableMetadataProvider.NAME);
+ Assertions.assertNotNull(tableMetadataProvider);
+ Assertions.assertTrue(tableMetadataProvider instanceof
GravitinoTableMetadataProvider);
+ }
+
+ // Updater/monitor providers removed for recommender-only scope.
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestRecommenderOrdering.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestRecommenderOrdering.java
new file mode 100644
index 0000000000..6fcfcbe4fd
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestRecommenderOrdering.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import org.apache.gravitino.NameIdentifier;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyEvaluation;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class TestRecommenderOrdering {
+
+ @Test
+ void testRecommendForOneStrategyOrdersByScoreDescending() {
+ NameIdentifier tableA = NameIdentifier.of("db", "t1");
+ NameIdentifier tableB = NameIdentifier.of("db", "t2");
+ NameIdentifier tableC = NameIdentifier.of("db", "t3");
+
+ Map<NameIdentifier, Long> scores = Map.of(tableA, 10L, tableB, 50L,
tableC, 30L);
+
+ List<JobExecutionContext> jobs = orderByScore(scores);
+
+ Assertions.assertEquals(3, jobs.size(), "All tables should produce a job
context");
+ Assertions.assertEquals(
+ tableB, jobs.get(0).nameIdentifier(), "Highest score should come
first");
+ Assertions.assertEquals(tableC, jobs.get(1).nameIdentifier(), "Second
highest score expected");
+ Assertions.assertEquals(tableA, jobs.get(2).nameIdentifier(), "Lowest
score expected last");
+ }
+
+ private static List<JobExecutionContext> orderByScore(Map<NameIdentifier,
Long> scores) {
+ PriorityQueue<StrategyEvaluation> scoreQueue =
+ new PriorityQueue<>((a, b) -> Long.compare(b.score(), a.score()));
+ scores.forEach((identifier, score) ->
scoreQueue.add(createEvaluation(identifier, score)));
+
+ List<JobExecutionContext> ordered = new ArrayList<>(scoreQueue.size());
+ while (!scoreQueue.isEmpty()) {
+ ordered.add(scoreQueue.poll().jobExecutionContext());
+ }
+ return ordered;
+ }
+
+ private static StrategyEvaluation createEvaluation(NameIdentifier
identifier, long score) {
+ return new StrategyEvaluation() {
+ @Override
+ public long score() {
+ return score;
+ }
+
+ @Override
+ public JobExecutionContext jobExecutionContext() {
+ return new JobExecutionContext() {
+ @Override
+ public NameIdentifier nameIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public Map<String, String> jobConfig() {
+ return Map.of();
+ }
+
+ @Override
+ public String jobTemplateName() {
+ return "template";
+ }
+ };
+ }
+ };
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestStrategyFiltering.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestStrategyFiltering.java
new file mode 100644
index 0000000000..8479458b45
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestStrategyFiltering.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyProvider;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class TestStrategyFiltering {
+
+ @Test
+ void testRecommendOnlyTablesWithStrategy() {
+ NameIdentifier tableWithPolicy = NameIdentifier.of("db", "t_with_policy");
+ NameIdentifier tableWithoutPolicy = NameIdentifier.of("db",
"t_without_policy");
+
+ StubStrategy compactionStrategy = new StubStrategy("strategy-1",
"COMPACTION", new HashMap<>());
+ StrategyProvider strategyProvider =
+ new StubStrategyProvider(
+ Map.of(
+ tableWithPolicy, List.of(compactionStrategy),
+ tableWithoutPolicy, List.of()),
+ Map.of(compactionStrategy.name(), compactionStrategy));
+
+ Map<String, List<NameIdentifier>> identifiersByStrategy =
+ groupIdentifiersByStrategyName(
+ List.of(tableWithPolicy, tableWithoutPolicy),
+ strategyProvider,
+ compactionStrategy.strategyType());
+
+ Assertions.assertEquals(
+ 1,
+ identifiersByStrategy.get(compactionStrategy.name()).size(),
+ "Only table with policy should be grouped");
+ Assertions.assertEquals(
+ tableWithPolicy,
+ identifiersByStrategy.get(compactionStrategy.name()).get(0),
+ "Wrong table grouped");
+ }
+
+ private static Map<String, List<NameIdentifier>>
groupIdentifiersByStrategyName(
+ List<NameIdentifier> identifiers, StrategyProvider strategyProvider,
String strategyType) {
+ Map<String, List<NameIdentifier>> identifiersByStrategyName = new
HashMap<>();
+ for (NameIdentifier identifier : identifiers) {
+ strategyProvider.strategies(identifier).stream()
+ .filter(strategy -> strategy.strategyType().equals(strategyType))
+ .forEach(
+ strategy ->
+ identifiersByStrategyName
+ .computeIfAbsent(strategy.name(), key -> new
java.util.ArrayList<>())
+ .add(identifier));
+ }
+ return identifiersByStrategyName;
+ }
+
+ private static final class StubStrategy implements Strategy {
+ private final String name;
+ private final String strategyType;
+ private final Map<String, Object> content;
+
+ StubStrategy(String name, String strategyType, Map<String, Object>
content) {
+ this.name = name;
+ this.strategyType = strategyType;
+ this.content = content;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public String strategyType() {
+ return strategyType;
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return Map.of();
+ }
+
+ @Override
+ public Map<String, Object> rules() {
+ return content;
+ }
+
+ @Override
+ public Map<String, String> jobOptions() {
+ return Map.of();
+ }
+
+ @Override
+ public String jobTemplateName() {
+ return "template";
+ }
+ }
+
+ private static final class StubStrategyProvider implements StrategyProvider {
+ private final Map<NameIdentifier, List<Strategy>> strategiesByTable;
+ private final Map<String, Strategy> strategiesByName;
+
+ StubStrategyProvider(
+ Map<NameIdentifier, List<Strategy>> strategiesByTable, Map<String,
Strategy> strategies) {
+ this.strategiesByTable = strategiesByTable;
+ this.strategiesByName = strategies;
+ }
+
+ @Override
+ public String name() {
+ return "stub-strategy-provider";
+ }
+
+ @Override
+ public void initialize(
+ org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv
optimizerEnv) {}
+
+ @Override
+ public List<Strategy> strategies(NameIdentifier nameIdentifier) {
+ return strategiesByTable.getOrDefault(nameIdentifier, List.of());
+ }
+
+ @Override
+ public Strategy strategy(String strategyName) {
+ return strategiesByName.get(strategyName);
+ }
+
+ @Override
+ public void close() throws Exception {}
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/util/TestPartitionUtils.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/util/TestPartitionUtils.java
new file mode 100644
index 0000000000..c8865d79d3
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/util/TestPartitionUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.util;
+
+import java.util.List;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class TestPartitionUtils {
+
+ @Test
+ void testEncodePartitionPathEmitsJsonArray() {
+ List<PartitionEntry> entries =
+ List.of(new PartitionEntryImpl("p1", "v1"), new
PartitionEntryImpl("p2", "v2"));
+ PartitionPath path = PartitionPath.of(entries);
+
+ String encoded = PartitionUtils.encodePartitionPath(path);
+
+ Assertions.assertEquals("[{\"p1\":\"v1\"},{\"p2\":\"v2\"}]", encoded);
+ }
+
+ @Test
+ void testDecodePartitionPathParsesEntries() {
+ String encoded = "[{\"p1\":\"v1\"},{\"p2\":\"v2\"}]";
+
+ PartitionPath path = PartitionUtils.decodePartitionPath(encoded);
+
+ Assertions.assertEquals(2, path.entries().size());
+ Assertions.assertEquals("p1", path.entries().get(0).partitionName());
+ Assertions.assertEquals("v1", path.entries().get(0).partitionValue());
+ Assertions.assertEquals("p2", path.entries().get(1).partitionName());
+ Assertions.assertEquals("v2", path.entries().get(1).partitionValue());
+ }
+
+ @Test
+ void testDecodePartitionPathRejectsBlankInput() {
+ IllegalArgumentException ex =
+ Assertions.assertThrows(
+ IllegalArgumentException.class, () ->
PartitionUtils.decodePartitionPath(" "));
+ Assertions.assertEquals("encodedPartitionPath must not be blank",
ex.getMessage());
+ }
+
+ @Test
+ void testEncodeThenDecodePreservesEntries() {
+ List<PartitionEntry> entries =
+ List.of(new PartitionEntryImpl("p1", "v1"), new
PartitionEntryImpl("p2", "v2"));
+ PartitionPath original = PartitionPath.of(entries);
+
+ String encoded = PartitionUtils.encodePartitionPath(original);
+ PartitionPath decoded = PartitionUtils.decodePartitionPath(encoded);
+
+ Assertions.assertEquals(original, decoded);
+ }
+}