This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new a670864cfd  [#9653] feat(optimizer): add compaction strategy related 
impls  (#9904)
a670864cfd is described below

commit a670864cfd5009ef4005f5218114b925ad5bab0e
Author: FANNG <[email protected]>
AuthorDate: Thu Feb 12 12:28:05 2026 +0900

     [#9653] feat(optimizer): add compaction strategy related impls  (#9904)
    
    ### What changes were proposed in this pull request?
    
    * Adds QLExpress4 dependency for dynamic expression evaluation in
    strategy triggers and scoring
    * Implements compaction strategy handler with partition-aware
    recommendation logic
    * Adds partition utility methods for generating SQL WHERE clauses from
    partition metadata
    
    Example Output (CompactionJobAdapter) Given a job context for db.table
    with two partitions (a='x1', b='y1') and (a='x2', b='y2'), and job
    options
      {target-file-size-bytes: "1024"}:
    
    ```
      jobConfig() ->
      {
        "table": "db.table",
        "where": "(a = 'x1' AND b = 'y1') OR (a = 'x2' AND b = 'y2')",
        "options": "{\"target-file-size-bytes\", \"1024\"}"
      }
    ```
    
    ### Why are the changes needed?
    
    Fix: #9653
    
    ### Does this PR introduce _any_ user-facing change? no
    
    ### How was this patch tested?
    existing tests
---
 LICENSE.bin                                        |   1 +
 gradle/libs.versions.toml                          |   2 +
 maintenance/optimizer/build.gradle.kts             |  41 ++
 .../optimizer/api/common/PartitionStrategy.java    |  66 +++
 .../maintenance/optimizer/api/common/Strategy.java |   2 +-
 .../api/recommender/JobExecutionContext.java       |  10 +-
 .../api/recommender/StrategyEvaluation.java        |  26 +-
 .../optimizer/common/conf/OptimizerConfig.java     |  23 ++
 .../optimizer/recommender/Recommender.java         |  70 +++-
 .../handler/BaseExpressionStrategyHandler.java     | 285 +++++++++++++
 .../handler/StrategyEvaluationImpl.java}           |  34 +-
 .../handler/compaction/CompactionJobContext.java   |  57 +++
 .../compaction/CompactionStrategyHandler.java      |  69 ++++
 .../job/GravitinoCompactionJobAdapter.java         |  83 ++++
 .../recommender/job/GravitinoJobSubmitter.java     |  76 +++-
 .../optimizer/recommender/job/PartitionUtils.java  | 176 ++++++++
 .../statistics/GravitinoStatisticsProvider.java    |   3 +-
 .../recommender/strategy/GravitinoStrategy.java    |  62 ++-
 .../recommender/util/ExpressionEvaluator.java      |  49 +++
 .../recommender/util/QLExpressionEvaluator.java    | 112 ++++++
 .../util/StatisticsUtils.java}                     |  36 +-
 .../optimizer/recommender/util/StrategyUtils.java  |  65 +++
 .../optimizer/common/conf/TestOptimizerConfig.java |  16 +
 .../recommender/TestRecommenderOrdering.java       |  34 +-
 .../recommender/TestStrategyFiltering.java         |   3 +-
 .../compaction/CompactionStrategyForTest.java      |  68 ++++
 .../compaction/TestCompactionStrategyHandler.java  | 330 +++++++++++++++
 .../job/TestBuiltinIcebergRewriteDataFiles.java    | 441 +++++++++++++++++++++
 .../job/TestGravitinoCompactionJobAdapter.java     |  59 +++
 .../recommender/job/TestGravitinoJobSubmitter.java |  97 +++++
 .../recommender/job/TestPartitionUtils.java        | 220 ++++++++++
 .../util/TestQLExpressionEvaluator.java            | 170 ++++++++
 32 files changed, 2710 insertions(+), 76 deletions(-)

diff --git a/LICENSE.bin b/LICENSE.bin
index ab096465e9..0350cd952d 100644
--- a/LICENSE.bin
+++ b/LICENSE.bin
@@ -385,6 +385,7 @@
    Google FlatBuffers
    IPAddress
    Aliyun SDK OSS
+   QLExpress
 
    This product bundles various third-party components also under the
    Apache Software Foundation License 1.1
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 43cfd73d8d..d013347d58 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -137,6 +137,7 @@ ognl = "3.4.7"
 concurrent-trees = "2.6.0"
 jakarta-validation = "2.0.2"
 aspectj = "1.9.24"
+ql-expression = "4.0.3"
 
 [libraries]
 aspectj-aspectjrt = { group = "org.aspectj", name = "aspectjrt", version.ref = 
"aspectj" }
@@ -322,6 +323,7 @@ concurrent-trees = { group = 
"com.googlecode.concurrent-trees", name = "concurre
 jcasbin = { group='org.casbin', name='jcasbin', version.ref="jcasbin" }
 openlineage-java= { group = "io.openlineage", name = "openlineage-java", 
version.ref = "openlineage" }
 ognl = { group='ognl', name='ognl', version.ref="ognl" }
+ql-expression = { group='com.alibaba', name='qlexpress4', 
version.ref="ql-expression" }
 
 [bundles]
 log4j = ["slf4j-api", "log4j-slf4j2-impl", "log4j-api", "log4j-core", 
"log4j-12-api", "log4j-layout-template-json"]
diff --git a/maintenance/optimizer/build.gradle.kts 
b/maintenance/optimizer/build.gradle.kts
index 638dadbe3f..501b9ff3bb 100644
--- a/maintenance/optimizer/build.gradle.kts
+++ b/maintenance/optimizer/build.gradle.kts
@@ -24,6 +24,12 @@ plugins {
   id("idea")
 }
 
+val scalaVersion: String =
+  project.properties["scalaVersion"] as? String ?: 
extra["defaultScalaVersion"].toString()
+val sparkVersion: String = libs.versions.spark33.get()
+val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
+val icebergVersion: String = libs.versions.iceberg4connector.get()
+
 dependencies {
   implementation(project(":api"))
   implementation(project(":catalogs:catalog-common"))
@@ -39,12 +45,43 @@ dependencies {
   implementation(libs.jackson.databind)
   implementation(libs.jackson.annotations)
   implementation(libs.guava)
+  implementation(libs.ql.expression)
 
   annotationProcessor(libs.lombok)
   compileOnly(libs.lombok)
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
+  testImplementation(libs.mockito.core)
+  testImplementation("org.slf4j:slf4j-api:1.7.36")
+  testRuntimeOnly("org.slf4j:slf4j-simple:1.7.36")
+  testRuntimeOnly(
+    
"org.scala-lang.modules:scala-collection-compat_$scalaVersion:${libs.versions.scala.collection.compat.get()}"
+  )
+  testImplementation(
+    
"org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion"
+  ) {
+    exclude(group = "org.slf4j", module = "slf4j-api")
+    exclude(group = "org.apache.logging.log4j", module = "log4j-slf4j-impl")
+    exclude(group = "org.slf4j", module = "slf4j-log4j12")
+  }
+  
testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
 {
+    exclude(group = "org.slf4j", module = "slf4j-api")
+    exclude(group = "org.apache.logging.log4j", module = "log4j-slf4j-impl")
+    exclude(group = "org.slf4j", module = "slf4j-log4j12")
+  }
+  
testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion") {
+    exclude(group = "org.slf4j", module = "slf4j-api")
+    exclude(group = "org.apache.logging.log4j", module = "log4j-slf4j-impl")
+    exclude(group = "org.slf4j", module = "slf4j-log4j12")
+  }
+  testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") 
{
+    exclude(group = "org.slf4j", module = "slf4j-api")
+    exclude(group = "org.apache.logging.log4j", module = "log4j-slf4j-impl")
+    exclude(group = "org.slf4j", module = "slf4j-log4j12")
+  }
+  testImplementation(libs.testcontainers)
   testAnnotationProcessor(libs.lombok)
   testCompileOnly(libs.lombok)
 
@@ -86,6 +123,10 @@ tasks {
   }
 }
 
+configurations.testRuntimeClasspath {
+  exclude(group = "org.apache.logging.log4j", module = "log4j-slf4j2-impl")
+}
+
 tasks.test {
   val skipITs = project.hasProperty("skipITs")
   if (skipITs) {
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionStrategy.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionStrategy.java
new file mode 100644
index 0000000000..d5cd5fb4ec
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionStrategy.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.common;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/**
+ * Strategy definition for partitioned tables.
+ *
+ * <p>Partition strategies provide additional configuration for aggregating 
partition scores and
+ * selecting the number of partitions to evaluate.
+ */
+@DeveloperApi
+public interface PartitionStrategy extends Strategy {
+
+  int DEFAULT_MAX_PARTITION_NUM = 100;
+
+  /**
+   * Partition table score aggregation mode.
+   *
+   * <p>Defaults to {@link ScoreMode#AVG}.
+   *
+   * @return score mode enum
+   */
+  default ScoreMode partitionTableScoreMode() {
+    return ScoreMode.AVG;
+  }
+
+  /**
+   * Maximum number of partitions to include in a table evaluation.
+   *
+   * <p>Defaults to {@value #DEFAULT_MAX_PARTITION_NUM}.
+   *
+   * @return max partition number
+   */
+  default int maxPartitionNum() {
+    return DEFAULT_MAX_PARTITION_NUM;
+  }
+
+  /** Partition table score aggregation mode. */
+  enum ScoreMode {
+    /** Average score of all partitions. */
+    AVG,
+    /** Maximum score of all partitions. */
+    MAX,
+    /** Sum score of all partitions. */
+    SUM,
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java
index 0030e7d738..593b6e6f4b 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java
@@ -27,7 +27,7 @@ import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyHandle
  * Strategy definition supplied by the control plane. The recommender pulls 
strategies from a {@link
  * 
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyProvider}, 
routes them to a
  * {@link StrategyHandler} by {@link #strategyType()}, and lets the handler 
interpret the remaining
- * fields as needed.
+ * fields as needed. For partition-specific behavior, implement {@link 
PartitionStrategy}.
  */
 @DeveloperApi
 public interface Strategy {
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java
index ec95fcce66..ff6df4c604 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java
@@ -38,14 +38,14 @@ public interface JobExecutionContext {
   NameIdentifier nameIdentifier();
 
   /**
-   * Free-form job configuration, such as engine parameters (e.g., target file 
size bytes).
+   * Free-form job options, such as engine parameters (e.g., target file size 
bytes).
    *
-   * <p>The {@code StrategyHandler} is free to add additional job 
configuration besides the job
-   * options specified in the strategy.
+   * <p>The {@code StrategyHandler} is free to add additional job options 
besides the job options
+   * specified in the strategy.
    *
-   * @return immutable map of configuration entries
+   * @return immutable map of option entries
    */
-  Map<String, String> jobConfig();
+  Map<String, String> jobOptions();
 
   /**
    * Job template name to resolve the job in the job submitter.
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
index d9965978ed..fa04dfe45d 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
@@ -19,6 +19,8 @@
 
 package org.apache.gravitino.maintenance.optimizer.api.recommender;
 
+import java.util.Optional;
+
 /**
  * Encapsulates the scored result and job execution context for a single 
strategy evaluation. The
  * recommender ranks evaluations by {@link #score()} before asking the {@link
@@ -27,12 +29,32 @@ package 
org.apache.gravitino.maintenance.optimizer.api.recommender;
  */
 public interface StrategyEvaluation {
 
+  /**
+   * Evaluation placeholder indicating that no execution should happen. It 
uses score {@code -1} and
+   * an empty job execution context.
+   */
+  StrategyEvaluation NO_EXECUTION =
+      new StrategyEvaluation() {
+        @Override
+        public long score() {
+          return -1L;
+        }
+
+        @Override
+        public Optional<JobExecutionContext> jobExecutionContext() {
+          return Optional.empty();
+        }
+      };
+
   /**
    * Score used to rank multiple recommendations of the same strategy. Higher 
wins; equal scores
    * preserve the evaluation order returned by the {@code StrategyHandler}.
    */
   long score();
 
-  /** Job execution context for this evaluation. */
-  JobExecutionContext jobExecutionContext();
+  /**
+   * Job execution context for this evaluation. Implementations return {@link 
Optional#empty()} when
+   * {@link #NO_EXECUTION} is used to signal that no job should be submitted.
+   */
+  Optional<JobExecutionContext> jobExecutionContext();
 }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
index 3f62b85c17..b0890957ac 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
@@ -44,6 +44,8 @@ public class OptimizerConfig extends Config {
   public static final String GRAVITINO_METALAKE = OPTIMIZER_PREFIX + 
"gravitinoMetalake";
   public static final String GRAVITINO_DEFAULT_CATALOG =
       OPTIMIZER_PREFIX + "gravitinoDefaultCatalog";
+  public static final String JOB_ADAPTER_PREFIX = OPTIMIZER_PREFIX + 
"jobAdapter.";
+  public static final String JOB_SUBMITTER_CONFIG_PREFIX = OPTIMIZER_PREFIX + 
"jobSubmitterConfig.";
 
   private static final String RECOMMENDER_PREFIX = OPTIMIZER_PREFIX + 
"recommender.";
   private static final String STATISTICS_PROVIDER = RECOMMENDER_PREFIX + 
"statisticsProvider";
@@ -132,4 +134,25 @@ public class OptimizerConfig extends Config {
     super(false);
     loadFromMap(properties, k -> true);
   }
+
+  /**
+   * Returns job submitter custom config entries with the {@code
+   * gravitino.optimizer.jobSubmitterConfig.} prefix stripped.
+   *
+   * @return custom job submitter config map
+   */
+  public Map<String, String> jobSubmitterConfigs() {
+    return getConfigsWithPrefix(JOB_SUBMITTER_CONFIG_PREFIX);
+  }
+
+  public String getStrategyHandlerClassName(String strategyHandlerName) {
+    String configKey =
+        String.format(OPTIMIZER_PREFIX + "strategyHandler.%s.className", 
strategyHandlerName);
+    return configMap.get(configKey);
+  }
+
+  public String getJobAdapterClassName(String jobTemplateName) {
+    String configKey = String.format(JOB_ADAPTER_PREFIX + "%s.className", 
jobTemplateName);
+    return configMap.get(configKey);
+  }
 }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
index c5a142293a..0602e7f286 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
@@ -29,7 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
@@ -79,6 +78,7 @@ public class Recommender implements AutoCloseable {
   private final TableMetadataProvider tableMetadataProvider;
   private final JobSubmitter jobSubmitter;
   private final CloseableGroup closeableGroup = new CloseableGroup();
+  private final OptimizerEnv optimizerEnv;
 
   /**
    * Create a recommender whose providers and submitter are resolved from the 
optimizer
@@ -93,6 +93,7 @@ public class Recommender implements AutoCloseable {
     TableMetadataProvider tableMetadataProvider = 
loadTableMetadataProvider(config);
     JobSubmitter jobSubmitter = loadJobSubmitter(config);
 
+    this.optimizerEnv = optimizerEnv;
     this.strategyProvider = strategyProvider;
     this.statisticsProvider = statisticsProvider;
     this.tableMetadataProvider = tableMetadataProvider;
@@ -111,7 +112,10 @@ public class Recommender implements AutoCloseable {
       StrategyProvider strategyProvider,
       StatisticsProvider statisticsProvider,
       TableMetadataProvider tableMetadataProvider,
-      JobSubmitter jobSubmitter) {
+      JobSubmitter jobSubmitter,
+      OptimizerEnv optimizerEnv) {
+
+    this.optimizerEnv = optimizerEnv;
     this.strategyProvider = strategyProvider;
     this.statisticsProvider = statisticsProvider;
     this.tableMetadataProvider = tableMetadataProvider;
@@ -139,12 +143,25 @@ public class Recommender implements AutoCloseable {
 
     for (Map.Entry<String, List<NameIdentifier>> entry : 
identifiersByStrategyName.entrySet()) {
       String strategyName = entry.getKey();
-      List<JobExecutionContext> jobConfigs =
+      List<StrategyEvaluation> evaluations =
           recommendForOneStrategy(entry.getValue(), strategyName);
-      for (JobExecutionContext jobConfig : jobConfigs) {
-        String templateName = jobConfig.jobTemplateName();
-        String jobId = jobSubmitter.submitJob(templateName, jobConfig);
-        LOG.info("Submit job {} for strategy {} with context {}", jobId, 
strategyName, jobConfig);
+      for (StrategyEvaluation evaluation : evaluations) {
+        JobExecutionContext jobExecutionContext =
+            evaluation
+                .jobExecutionContext()
+                .orElseThrow(
+                    () ->
+                        new IllegalStateException(
+                            "Job execution context is missing for evaluation 
of strategy "
+                                + strategyName));
+        String templateName = jobExecutionContext.jobTemplateName();
+        String jobId = jobSubmitter.submitJob(templateName, 
jobExecutionContext);
+        LOG.info(
+            "Submit job {} for strategy {} with context {}",
+            jobId,
+            strategyName,
+            jobExecutionContext);
+        logRecommendation(strategyName, evaluation);
       }
     }
   }
@@ -162,7 +179,7 @@ public class Recommender implements AutoCloseable {
     closeableGroup.register(jobSubmitter, "job submitter");
   }
 
-  private List<JobExecutionContext> recommendForOneStrategy(
+  private List<StrategyEvaluation> recommendForOneStrategy(
       List<NameIdentifier> identifiers, String strategyName) {
     LOG.info("Recommend strategy {} for identifiers {}", strategyName, 
identifiers);
     Strategy strategy = strategyProvider.strategy(strategyName);
@@ -175,6 +192,13 @@ public class Recommender implements AutoCloseable {
         continue;
       }
       StrategyEvaluation evaluation = strategyHandler.evaluate();
+      if (evaluation.score() < 0 || 
evaluation.jobExecutionContext().isEmpty()) {
+        LOG.info(
+            "Skip strategy {} for identifier {} because evaluation score is 
negative or job execution context is missing",
+            strategyName,
+            identifier);
+        continue;
+      }
       LOG.info(
           "Recommend strategy {} for identifier {} score: {}",
           strategyName,
@@ -183,9 +207,11 @@ public class Recommender implements AutoCloseable {
       scoreQueue.add(evaluation);
     }
 
-    return scoreQueue.stream()
-        .map(StrategyEvaluation::jobExecutionContext)
-        .collect(Collectors.toList());
+    List<StrategyEvaluation> results = new ArrayList<>();
+    while (!scoreQueue.isEmpty()) {
+      results.add(scoreQueue.poll());
+    }
+    return results;
   }
 
   private StrategyHandler loadStrategyHandler(Strategy strategy, 
NameIdentifier nameIdentifier) {
@@ -251,9 +277,8 @@ public class Recommender implements AutoCloseable {
    * by configuration or an explicit registry that maps stable strategy type 
strings (for example,
    * {@code COMPACTION}) to {@link StrategyHandler} implementations.
    */
-  @SuppressWarnings("UnusedVariable")
   private String getStrategyHandlerClassName(String strategyType) {
-    return "";
+    return optimizerEnv.config().getStrategyHandlerClassName(strategyType);
   }
 
   private Map<String, List<NameIdentifier>> getIdentifiersByStrategyName(
@@ -302,4 +327,23 @@ public class Recommender implements AutoCloseable {
                 + "configure a statistics provider that implements 
SupportTableStatistics.",
             statisticsProvider.name()));
   }
+
+  private void logRecommendation(String strategyName, StrategyEvaluation 
evaluation) {
+    JobExecutionContext jobExecutionContext =
+        evaluation
+            .jobExecutionContext()
+            .orElseThrow(
+                () ->
+                    new IllegalStateException(
+                        "Job execution context is missing for evaluation of 
strategy "
+                            + strategyName));
+    System.out.println(
+        String.format(
+            "RECOMMEND: strategy=%s identifier=%s score=%d jobTemplate=%s 
jobOptions=%s",
+            strategyName,
+            jobExecutionContext.nameIdentifier(),
+            evaluation.score(),
+            jobExecutionContext.jobTemplateName(),
+            jobExecutionContext.jobOptions()));
+  }
 }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/BaseExpressionStrategyHandler.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/BaseExpressionStrategyHandler.java
new file mode 100644
index 0000000000..5c0aa59663
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/BaseExpressionStrategyHandler.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.handler;
+
+import com.google.common.base.Preconditions;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+import lombok.Value;
+import lombok.experimental.Accessors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionStrategy;
+import 
org.apache.gravitino.maintenance.optimizer.api.common.PartitionStrategy.ScoreMode;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyEvaluation;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyHandler;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyHandlerContext;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.ExpressionEvaluator;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.QLExpressionEvaluator;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.StatisticsUtils;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.StrategyUtils;
+import org.apache.gravitino.rel.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base strategy handler that provides common expression evaluation and 
statistics handling.
+ *
+ * <p>Subclasses supply strategy-specific logic while relying on the shared 
context, statistics
+ * normalization, and expression evaluation utilities.
+ */
+public abstract class BaseExpressionStrategyHandler implements StrategyHandler 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseExpressionStrategyHandler.class);
+  // Sort partitions by score descending (highest score first).
+  private static final Comparator<PartitionScore> PARTITION_SCORE_ORDER =
+      (a, b) -> Long.compare(b.score(), a.score());
+
+  private final ExpressionEvaluator expressionEvaluator;
+  private Strategy strategy;
+  private List<StatisticEntry<?>> tableStatistics;
+  private Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics;
+  private Table tableMetadata;
+  private NameIdentifier nameIdentifier;
+
+  /** Create a handler that evaluates expressions with the default QL 
evaluator. */
+  protected BaseExpressionStrategyHandler() {
+    this.expressionEvaluator = new QLExpressionEvaluator();
+  }
+
+  @Override
+  public void initialize(StrategyHandlerContext context) {
+    Preconditions.checkArgument(context.tableMetadata().isPresent(), "Table 
metadata is null");
+    this.tableMetadata = context.tableMetadata().get();
+    this.nameIdentifier = context.nameIdentifier();
+    this.strategy = context.strategy();
+    this.tableStatistics = context.tableStatistics();
+    this.partitionStatistics = context.partitionStatistics();
+  }
+
+  @Override
+  public boolean shouldTrigger() {
+    if (isPartitionTable()) {
+      return shouldTriggerForPartitionTable();
+    }
+    return shouldTriggerForNonPartitionTable();
+  }
+
+  @Override
+  public StrategyEvaluation evaluate() {
+    if (isPartitionTable()) {
+      return evaluateForPartitionTable();
+    }
+    return evaluateForNonPartitionTable();
+  }
+
+  /**
+   * Build the execution context for the selected partitions.
+   *
+   * @param nameIdentifier target table identifier
+   * @param strategy strategy being evaluated
+   * @param tableMetadata table metadata requested by the handler
+   * @param partitions selected partitions, empty for non-partitioned tables
+   * @param jobOptions job options derived from the strategy
+   * @return job execution context
+   */
+  protected abstract JobExecutionContext buildJobExecutionContext(
+      NameIdentifier nameIdentifier,
+      Strategy strategy,
+      Table tableMetadata,
+      List<PartitionPath> partitions,
+      Map<String, String> jobOptions);
+
+  private int maxPartitionNum() {
+    if (strategy instanceof PartitionStrategy) {
+      return ((PartitionStrategy) strategy).maxPartitionNum();
+    }
+    return PartitionStrategy.DEFAULT_MAX_PARTITION_NUM;
+  }
+
+  private boolean isPartitionTable() {
+    return tableMetadata.partitioning().length > 0;
+  }
+
+  private boolean shouldTriggerForPartitionTable() {
+    if (partitionStatistics.isEmpty()) {
+      LOG.info("No partition statistics available for table {}", 
nameIdentifier);
+      return false;
+    }
+    String triggerExpression = triggerExpression(strategy);
+    return partitionStatistics.values().stream()
+        .anyMatch(partitionStats -> evaluateBool(triggerExpression, 
partitionStats));
+  }
+
+  private boolean shouldTriggerForNonPartitionTable() {
+    return evaluateBool(triggerExpression(strategy), tableStatistics);
+  }
+
+  private StrategyEvaluation evaluateForNonPartitionTable() {
+    long score = evaluateLong(scoreExpression(strategy), tableStatistics);
+    if (score <= 0) {
+      return StrategyEvaluation.NO_EXECUTION;
+    }
+    JobExecutionContext jobContext =
+        buildJobExecutionContext(
+            nameIdentifier, strategy, tableMetadata, List.of(), 
strategy.jobOptions());
+    return new StrategyEvaluationImpl(score, jobContext);
+  }
+
+  /**
+   * Aggregate partition scores into a table score. The score mode is 
controlled by {@link
+   * 
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategy#PARTITION_TABLE_SCORE_MODE}
+   * and defaults to {@code avg}.
+   */
+  private long getTableScoreFromPartitions(List<PartitionScore> 
partitionScores) {
+    if (partitionScores.isEmpty()) {
+      return -1L;
+    }
+    ScoreMode scoreMode = partitionTableScoreMode();
+    switch (scoreMode) {
+      case SUM:
+        return partitionScores.stream().mapToLong(PartitionScore::score).sum();
+      case MAX:
+        return 
partitionScores.stream().mapToLong(PartitionScore::score).max().orElse(-1L);
+      case AVG:
+        return partitionScores.stream().mapToLong(PartitionScore::score).sum()
+            / partitionScores.size();
+      default:
+        LOG.warn(
+            "Unsupported partition table score mode '{}' for strategy {}, 
defaulting to avg",
+            scoreMode,
+            strategy.name());
+        return partitionScores.stream().mapToLong(PartitionScore::score).sum()
+            / partitionScores.size();
+    }
+  }
+
+  private StrategyEvaluation evaluateForPartitionTable() {
+    List<PartitionScore> partitionScores = 
getTopPartitionScores(maxPartitionNum());
+    if (partitionScores.isEmpty()) {
+      return StrategyEvaluation.NO_EXECUTION;
+    }
+    List<PartitionPath> partitions =
+        
partitionScores.stream().map(PartitionScore::partition).collect(Collectors.toList());
+    JobExecutionContext jobContext =
+        buildJobExecutionContext(
+            nameIdentifier, strategy, tableMetadata, partitions, 
strategy.jobOptions());
+    long tableScore = getTableScoreFromPartitions(partitionScores);
+    return new StrategyEvaluationImpl(tableScore, jobContext);
+  }
+
+  private ScoreMode partitionTableScoreMode() {
+    if (strategy instanceof PartitionStrategy) {
+      return ((PartitionStrategy) strategy).partitionTableScoreMode();
+    }
+    return ScoreMode.AVG;
+  }
+
+  private long evaluateLong(String expression, List<StatisticEntry<?>> 
statistics) {
+    Map<String, Object> context = buildExpressionContext(strategy, statistics);
+    try {
+      return expressionEvaluator.evaluateLong(expression, context);
+    } catch (RuntimeException e) {
+      LOG.warn("Failed to evaluate expression '{}' with context {}", 
expression, context, e);
+      return -1L;
+    }
+  }
+
+  private boolean evaluateBool(String expression, List<StatisticEntry<?>> 
statistics) {
+    Map<String, Object> context = buildExpressionContext(strategy, statistics);
+    try {
+      return expressionEvaluator.evaluateBool(expression, context);
+    } catch (RuntimeException e) {
+      LOG.warn("Failed to evaluate expression '{}' with context {}", 
expression, context, e);
+      return false;
+    }
+  }
+
+  private static Map<String, Object> buildExpressionContext(
+      Strategy strategy, List<StatisticEntry<?>> statistics) {
+    Map<String, Object> context = new HashMap<>();
+    context.putAll(StatisticsUtils.buildStatisticsContext(statistics));
+    strategy
+        .rules()
+        .forEach(
+            (k, v) -> {
+              try {
+                context.put(k, Long.parseLong(v.toString()));
+              } catch (NumberFormatException e) {
+                // Ignore non-numeric rule values when building numeric 
expression inputs.
+              }
+            });
+    return context;
+  }
+
+  private String triggerExpression(Strategy strategy) {
+    return StrategyUtils.getTriggerExpression(strategy);
+  }
+
+  private String scoreExpression(Strategy strategy) {
+    return StrategyUtils.getScoreExpression(strategy);
+  }
+
+  /**
+   * Return the highest-scoring partitions in descending order.
+   *
+   * @param limit max partitions to return
+   * @return top partition scores, empty when none score above zero
+   */
+  private List<PartitionScore> getTopPartitionScores(int limit) {
+    if (limit <= 0) {
+      return List.of();
+    }
+    PriorityQueue<PartitionScore> scoreQueue =
+        new PriorityQueue<>(limit, PARTITION_SCORE_ORDER.reversed());
+    partitionStatistics.forEach(
+        (partitionPath, statistics) -> {
+          boolean trigger = evaluateBool(triggerExpression(strategy), 
statistics);
+          if (trigger) {
+            long partitionScore = evaluateLong(scoreExpression(strategy), 
statistics);
+            if (partitionScore > 0) {
+              PartitionScore entry = new PartitionScore(partitionPath, 
partitionScore);
+              if (scoreQueue.size() < limit) {
+                scoreQueue.add(entry);
+              } else if (scoreQueue.peek() != null && partitionScore > 
scoreQueue.peek().score()) {
+                scoreQueue.poll();
+                scoreQueue.add(entry);
+              }
+            }
+          }
+        });
+
+    return 
scoreQueue.stream().sorted(PARTITION_SCORE_ORDER).collect(Collectors.toList());
+  }
+
+  @Value
+  @Accessors(fluent = true)
+  private static final class PartitionScore {
+    PartitionPath partition;
+    long score;
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/StrategyEvaluationImpl.java
similarity index 53%
copy from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
copy to 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/StrategyEvaluationImpl.java
index d9965978ed..6f7caac1d9 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/StrategyEvaluationImpl.java
@@ -17,22 +17,26 @@
  * under the License.
  */
 
-package org.apache.gravitino.maintenance.optimizer.api.recommender;
+package org.apache.gravitino.maintenance.optimizer.recommender.handler;
 
-/**
- * Encapsulates the scored result and job execution context for a single 
strategy evaluation. The
- * recommender ranks evaluations by {@link #score()} before asking the {@link
- * org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter 
JobSubmitter} to launch
- * work.
- */
-public interface StrategyEvaluation {
+import java.util.Optional;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.Accessors;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyEvaluation;
+
+@RequiredArgsConstructor
+final class StrategyEvaluationImpl implements StrategyEvaluation {
+
+  @Accessors(fluent = true)
+  @Getter
+  private final long score;
 
-  /**
-   * Score used to rank multiple recommendations of the same strategy. Higher 
wins; equal scores
-   * preserve the evaluation order returned by the {@code StrategyHandler}.
-   */
-  long score();
+  private final JobExecutionContext jobExecutionContext;
 
-  /** Job execution context for this evaluation. */
-  JobExecutionContext jobExecutionContext();
+  @Override
+  public Optional<JobExecutionContext> jobExecutionContext() {
+    return Optional.ofNullable(jobExecutionContext);
+  }
 }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionJobContext.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionJobContext.java
new file mode 100644
index 0000000000..f53000c455
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionJobContext.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction;
+
+import java.util.List;
+import java.util.Map;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+
+@RequiredArgsConstructor
+@ToString
+public class CompactionJobContext implements JobExecutionContext {
+  private final NameIdentifier name;
+  private final Map<String, String> jobOptions;
+  private final String jobTemplateName;
+  @Getter private final Column[] columns;
+  @Getter private final Transform[] partitioning;
+  @Getter private final List<PartitionPath> partitions;
+
+  @Override
+  public NameIdentifier nameIdentifier() {
+    return name;
+  }
+
+  @Override
+  public Map<String, String> jobOptions() {
+    return jobOptions;
+  }
+
+  @Override
+  public String jobTemplateName() {
+    return jobTemplateName;
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyHandler.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyHandler.java
new file mode 100644
index 0000000000..774e2a8119
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.handler.BaseExpressionStrategyHandler;
+import org.apache.gravitino.rel.Table;
+
+/**
+ * Strategy handler that builds compaction job contexts from table metadata 
and strategy settings.
+ */
+public class CompactionStrategyHandler extends BaseExpressionStrategyHandler {
+
+  public static final String NAME = "compaction";
+
+  @Override
+  public Set<DataRequirement> dataRequirements() {
+    return EnumSet.of(
+        DataRequirement.TABLE_METADATA,
+        DataRequirement.TABLE_STATISTICS,
+        DataRequirement.PARTITION_STATISTICS);
+  }
+
+  @Override
+  public String strategyType() {
+    return NAME;
+  }
+
+  @Override
+  protected JobExecutionContext buildJobExecutionContext(
+      NameIdentifier nameIdentifier,
+      Strategy strategy,
+      Table tableMetadata,
+      List<PartitionPath> partitions,
+      Map<String, String> jobOptions) {
+    List<PartitionPath> resolvedPartitions = partitions == null ? List.of() : 
partitions;
+    return new CompactionJobContext(
+        nameIdentifier,
+        jobOptions,
+        strategy.jobTemplateName(),
+        tableMetadata.columns(),
+        tableMetadata.partitioning(),
+        resolvedPartitions);
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoCompactionJobAdapter.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoCompactionJobAdapter.java
new file mode 100644
index 0000000000..6ca9e67181
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoCompactionJobAdapter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import java.util.TreeMap;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import org.apache.gravitino.maintenance.optimizer.common.util.IdentifierUtils;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionJobContext;
+
+public class GravitinoCompactionJobAdapter implements GravitinoJobAdapter {
+
+  private static final ObjectMapper MAPPER =
+      new 
ObjectMapper().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
+
+  @Override
+  public Map<String, String> jobConfig(JobExecutionContext 
jobExecutionContext) {
+    Preconditions.checkArgument(
+        jobExecutionContext instanceof CompactionJobContext,
+        "jobExecutionContext must be CompactionJobExecutionContext");
+    CompactionJobContext jobContext = (CompactionJobContext) 
jobExecutionContext;
+    return ImmutableMap.of(
+        "table_identifier", getTableName(jobContext),
+        "where_clause", getWhereClause(jobContext),
+        "sort_order", "",
+        "strategy", "binpack",
+        "options", getOptions(jobContext));
+  }
+
+  private String getTableName(CompactionJobContext jobContext) {
+    return 
IdentifierUtils.removeCatalogFromIdentifier(jobContext.nameIdentifier()).toString();
+  }
+
+  private String getWhereClause(CompactionJobContext jobContext) {
+    if (jobContext.getPartitions().isEmpty()) {
+      return "";
+    }
+    // generate where clause from jobContext.partitionNames()
+    // 1. get partition column type
+    // 2. generate partition filter name, like day(xxx)
+    // 3. generate partition filter value, like '2023-10-01', TIMESTAMP 'xxx'
+    return PartitionUtils.getWhereClauseForPartitions(
+        jobContext.getPartitions(), jobContext.getColumns(), 
jobContext.getPartitioning());
+  }
+
+  private String getOptions(JobExecutionContext jobExecutionContext) {
+    Map<String, String> map = jobExecutionContext.jobOptions();
+    return convertMapToJson(map);
+  }
+
+  private static String convertMapToJson(Map<String, ?> map) {
+    if (map == null || map.isEmpty()) {
+      return "{}";
+    }
+    try {
+      return MAPPER.writeValueAsString(new TreeMap<>(map));
+    } catch (JsonProcessingException e) {
+      throw new IllegalStateException("Failed to serialize options to JSON", 
e);
+    }
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoJobSubmitter.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoJobSubmitter.java
index 455d51795d..07ee840ed4 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoJobSubmitter.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoJobSubmitter.java
@@ -20,12 +20,17 @@
 package org.apache.gravitino.maintenance.optimizer.recommender.job;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.client.GravitinoClient;
 import 
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
 import org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
 import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
 import 
org.apache.gravitino.maintenance.optimizer.common.util.GravitinoClientUtils;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionStrategyHandler;
 
 /** Submits optimizer jobs to Gravitino using job template adapters. */
 public class GravitinoJobSubmitter implements JobSubmitter {
@@ -33,14 +38,17 @@ public class GravitinoJobSubmitter implements JobSubmitter {
   public static final String NAME = "gravitino-job-submitter";
 
   private GravitinoClient gravitinoClient;
-
-  private final Map<String, Class<? extends GravitinoJobAdapter>> jobAdapters 
= Map.of();
+  private OptimizerEnv optimizerEnv;
+  private OptimizerConfig optimizerConfig;
 
   /**
    * Returns the provider name for configuration lookup.
    *
    * @return provider name
    */
+  private final Map<String, Class<? extends GravitinoJobAdapter>> jobAdapters =
+      ImmutableMap.of(CompactionStrategyHandler.NAME, 
GravitinoCompactionJobAdapter.class);
+
   @Override
   public String name() {
     return NAME;
@@ -53,7 +61,8 @@ public class GravitinoJobSubmitter implements JobSubmitter {
    */
   @Override
   public void initialize(OptimizerEnv optimizerEnv) {
-    this.gravitinoClient = GravitinoClientUtils.createClient(optimizerEnv);
+    this.optimizerEnv = optimizerEnv;
+    this.optimizerConfig = optimizerEnv.config();
   }
 
   /**
@@ -65,9 +74,10 @@ public class GravitinoJobSubmitter implements JobSubmitter {
    */
   @Override
   public String submitJob(String jobTemplateName, JobExecutionContext 
jobExecutionContext) {
+    ensureClientInitialized();
     GravitinoJobAdapter jobAdapter = loadJobAdapter(jobTemplateName);
     return gravitinoClient
-        .runJob(jobTemplateName, jobAdapter.jobConfig(jobExecutionContext))
+        .runJob(jobTemplateName, buildJobConfig(optimizerConfig, 
jobExecutionContext, jobAdapter))
         .jobId();
   }
 
@@ -79,11 +89,67 @@ public class GravitinoJobSubmitter implements JobSubmitter {
     }
   }
 
+  private void ensureClientInitialized() {
+    if (gravitinoClient == null) {
+      if (optimizerEnv == null) {
+        throw new IllegalStateException("Job submitter is not initialized");
+      }
+      this.gravitinoClient = GravitinoClientUtils.createClient(optimizerEnv);
+    }
+  }
+
+  /**
+   * Merge job configs with precedence: optimizer config < adapter config.
+   *
+   * <p>Typical use cases:
+   *
+   * <ul>
+   *   <li>Optimizer config: shared engine/runtime defaults (for example, 
Spark settings).
+   *   <li>Adapter config: adapter-specific parameters (for example, WHERE 
filters) required by the
+   *       job template.
+   * </ul>
+   */
+  @VisibleForTesting
+  static Map<String, String> buildJobConfig(
+      OptimizerConfig optimizerConfig,
+      JobExecutionContext jobExecutionContext,
+      GravitinoJobAdapter jobAdapter) {
+    Map<String, String> submitterConfigs =
+        optimizerConfig == null ? Map.of() : 
optimizerConfig.jobSubmitterConfigs();
+    Map<String, String> adapterConfigs =
+        jobAdapter == null ? Map.of() : 
jobAdapter.jobConfig(jobExecutionContext);
+
+    Map<String, String> mergedConfigs = new LinkedHashMap<>();
+    mergedConfigs.putAll(submitterConfigs);
+    mergedConfigs.putAll(adapterConfigs);
+    return mergedConfigs;
+  }
+
   @VisibleForTesting
   GravitinoJobAdapter loadJobAdapter(String jobTemplateName) {
     Class<? extends GravitinoJobAdapter> jobAdapterClz = 
jobAdapters.get(jobTemplateName);
     if (jobAdapterClz == null) {
-      throw new IllegalArgumentException("No job adapter found for template: " 
+ jobTemplateName);
+      String jobAdapterClassName =
+          optimizerConfig == null ? null : 
optimizerConfig.getJobAdapterClassName(jobTemplateName);
+      if (StringUtils.isBlank(jobAdapterClassName)) {
+        throw new IllegalArgumentException("No job adapter found for template: 
" + jobTemplateName);
+      }
+      try {
+        Class<?> rawClass = Class.forName(jobAdapterClassName);
+        if (!GravitinoJobAdapter.class.isAssignableFrom(rawClass)) {
+          throw new IllegalArgumentException(
+              "Configured job adapter class does not implement 
GravitinoJobAdapter: "
+                  + jobAdapterClassName);
+        }
+        jobAdapterClz = rawClass.asSubclass(GravitinoJobAdapter.class);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "Failed to load job adapter class '"
+                + jobAdapterClassName
+                + "' for template: "
+                + jobTemplateName,
+            e);
+      }
     }
     try {
       return jobAdapterClz.getDeclaredConstructor().newInstance();
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/PartitionUtils.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/PartitionUtils.java
new file mode 100644
index 0000000000..1d6251ab40
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/PartitionUtils.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Type;
+
+public class PartitionUtils {
+
+  public static String getWhereClauseForPartitions(
+      List<PartitionPath> partitions, Column[] columns, Transform[] 
partitioning) {
+    Preconditions.checkArgument(
+        partitions != null && !partitions.isEmpty(), "partitions cannot be 
null or empty");
+    Preconditions.checkArgument(ArrayUtils.isNotEmpty(columns), "columns 
cannot be null or empty");
+    Preconditions.checkArgument(
+        partitioning != null && partitioning.length > 0, "partitioning cannot 
be null or empty");
+
+    List<String> predicates = 
Lists.newArrayListWithExpectedSize(partitions.size());
+    for (PartitionPath partition : partitions) {
+      predicates.add(getWhereClauseForPartition(partition.entries(), columns, 
partitioning));
+    }
+    // Wrap each partition-level predicate in parentheses to preserve logical 
grouping
+    // e.g. "(col1 = v1 AND col2 = v2) OR (col1 = v3 AND col2 = v4)"
+    return predicates.stream().map(p -> "(" + p + 
")").collect(Collectors.joining(" OR "));
+  }
+
+  // For Identity transform, the where clause is like "columnName = value"
+  // For bucket transform, the where clause is like "bucket(columnName, 
numBuckets) = value"
+  // For truncate transform, the where clause is like "truncate(columnName, 
width) = value"
+  // For year/month/day/hour transform, the where clause is like 
"year(columnName) = value"
+  // We could get value from partition.get(i).partitionValue(), if the value 
type is string, we need
+  // to add quotes like "value",  if the value type is number, we don't need 
to add quotes. if the
+  // value type is date/datetime, we need to add quotes like TIMESTAMP 
'2024-01-01'
+  public static String getWhereClauseForPartition(
+      List<PartitionEntry> partition, Column[] columns, Transform[] 
partitioning) {
+    Preconditions.checkArgument(
+        partition != null && !partition.isEmpty(), "partition cannot be null 
or empty");
+    Preconditions.checkArgument(ArrayUtils.isNotEmpty(columns), "columns 
cannot be null or empty");
+    Preconditions.checkArgument(
+        partitioning != null && partitioning.length == partition.size(),
+        "partitioning must match the size of partition entries");
+
+    Map<String, Column> columnMap =
+        Arrays.stream(columns)
+            .collect(
+                Collectors.toMap(
+                    c -> c.name().toLowerCase(Locale.ROOT), 
Function.identity(), (l, r) -> l));
+
+    List<String> predicates = 
Lists.newArrayListWithExpectedSize(partition.size());
+    for (int i = 0; i < partition.size(); i++) {
+      PartitionEntry entry = partition.get(i);
+      Transform transform = partitioning[i];
+
+      String columnName = getColumnName(transform);
+      Column column =
+          Preconditions.checkNotNull(
+              columnMap.get(columnName.toLowerCase(Locale.ROOT)),
+              "Column '%s' not found in table schema",
+              columnName);
+
+      String expression = buildExpression(transform, columnName);
+      String literal = formatLiteral(transform, column, 
entry.partitionValue());
+      predicates.add(String.format("%s = %s", expression, literal));
+    }
+
+    return String.join(" AND ", predicates);
+  }
+
+  private static String getColumnName(Transform transform) {
+    if (transform instanceof Transform.SingleFieldTransform) {
+      return joinFieldName(((Transform.SingleFieldTransform) 
transform).fieldName());
+    } else if (transform instanceof Transforms.TruncateTransform) {
+      return joinFieldName(((Transforms.TruncateTransform) 
transform).fieldName());
+    } else if (transform instanceof Transforms.BucketTransform) {
+      String[][] fieldNames = ((Transforms.BucketTransform) 
transform).fieldNames();
+      Preconditions.checkArgument(
+          fieldNames.length > 0, "Bucket transform must have at least one 
field");
+      return joinFieldName(fieldNames[0]);
+    }
+    throw new IllegalArgumentException("Unsupported transform: " + 
transform.getClass().getName());
+  }
+
+  private static String buildExpression(Transform transform, String 
columnName) {
+    if (transform instanceof Transforms.IdentityTransform) {
+      return columnName;
+    } else if (transform instanceof Transforms.YearTransform
+        || transform instanceof Transforms.MonthTransform
+        || transform instanceof Transforms.DayTransform
+        || transform instanceof Transforms.HourTransform) {
+      return transform.name() + "(" + columnName + ")";
+    } else if (transform instanceof Transforms.BucketTransform) {
+      return "bucket("
+          + columnName
+          + ", "
+          + ((Transforms.BucketTransform) transform).numBuckets()
+          + ")";
+    } else if (transform instanceof Transforms.TruncateTransform) {
+      return "truncate("
+          + columnName
+          + ", "
+          + ((Transforms.TruncateTransform) transform).width()
+          + ")";
+    }
+    throw new IllegalArgumentException("Unsupported transform: " + 
transform.getClass().getName());
+  }
+
+  private static String formatLiteral(Transform transform, Column column, 
String rawValue) {
+    if (transform instanceof Transforms.YearTransform
+        || transform instanceof Transforms.MonthTransform
+        || transform instanceof Transforms.DayTransform
+        || transform instanceof Transforms.HourTransform
+        || transform instanceof Transforms.BucketTransform) {
+      return rawValue;
+    }
+
+    Type type = column.dataType();
+    switch (type.name()) {
+      case BOOLEAN:
+        return rawValue.toLowerCase(Locale.ROOT);
+      case BYTE:
+      case SHORT:
+      case INTEGER:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case DECIMAL:
+        return rawValue;
+      case DATE:
+        return "DATE '" + rawValue + "'";
+      case TIMESTAMP:
+        return "TIMESTAMP '" + rawValue + "'";
+      case TIME:
+        return "TIME '" + rawValue + "'";
+      default:
+        return "\"" + escapeStringLiteral(rawValue) + "\"";
+    }
+  }
+
+  private static String escapeStringLiteral(String value) {
+    return value.replace("\\", "\\\\").replace("\"", "\\\"");
+  }
+
+  private static String joinFieldName(String[] fieldNameParts) {
+    return String.join(".", fieldNameParts);
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/statistics/GravitinoStatisticsProvider.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/statistics/GravitinoStatisticsProvider.java
index 394691d286..76bb7ac8db 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/statistics/GravitinoStatisticsProvider.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/statistics/GravitinoStatisticsProvider.java
@@ -19,6 +19,7 @@
 
 package org.apache.gravitino.maintenance.optimizer.recommender.statistics;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -113,7 +114,7 @@ public class GravitinoStatisticsProvider implements 
SupportTableStatistics {
         .forEach(
             statistic ->
                 statisticsByPartition
-                    .computeIfAbsent(partitions, key -> new 
java.util.ArrayList<>())
+                    .computeIfAbsent(partitions, key -> new ArrayList<>())
                     .add(new StatisticEntryImpl<>(statistic.name(), 
statistic.value().get())));
   }
 
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategy.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategy.java
index 0dfb665c17..a66bb43b7a 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategy.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategy.java
@@ -23,18 +23,29 @@ import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionStrategy;
+import 
org.apache.gravitino.maintenance.optimizer.api.common.PartitionStrategy.ScoreMode;
 import org.apache.gravitino.policy.Policy;
 import org.apache.gravitino.policy.PolicyContent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Strategy implementation backed by a Gravitino policy. */
-public class GravitinoStrategy implements Strategy {
+public class GravitinoStrategy implements PartitionStrategy {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoStrategy.class);
 
   @VisibleForTesting public static final String STRATEGY_TYPE_KEY = 
"strategy.type";
 
   @VisibleForTesting public static final String JOB_TEMPLATE_NAME_KEY = 
"job.template-name";
 
   private static final String JOB_OPTIONS_PREFIX = "job.options.";
+  /** Rule key for the partition table score aggregation mode. */
+  public static final String PARTITION_TABLE_SCORE_MODE = 
"partition_table_score_mode";
+  /** Rule key for the maximum number of partitions selected for execution. */
+  public static final String MAX_PARTITION_NUM = "max_partition_num";
+
+  private static final int DEFAULT_MAX_PARTITION_NUM = 100;
 
   private final Policy policy;
 
@@ -118,4 +129,51 @@ public class GravitinoStrategy implements Strategy {
     return 
Optional.ofNullable(policy.content().properties().get(JOB_TEMPLATE_NAME_KEY))
         .orElseThrow(() -> new IllegalArgumentException("job.template-name is 
not set"));
   }
+
+  @Override
+  public ScoreMode partitionTableScoreMode() {
+    Object value = rules().get(PARTITION_TABLE_SCORE_MODE);
+    if (value == null) {
+      return ScoreMode.AVG;
+    }
+    if (value instanceof ScoreMode) {
+      return (ScoreMode) value;
+    }
+    String mode = value.toString().trim().toLowerCase();
+    if (mode.isEmpty()) {
+      return ScoreMode.AVG;
+    }
+    switch (mode) {
+      case "sum":
+        return ScoreMode.SUM;
+      case "max":
+        return ScoreMode.MAX;
+      case "avg":
+        return ScoreMode.AVG;
+      default:
+        LOG.warn(
+            "Unsupported partition table score mode '{}' for strategy {}, 
defaulting to avg",
+            mode,
+            name());
+        return ScoreMode.AVG;
+    }
+  }
+
+  @Override
+  public int maxPartitionNum() {
+    Object value = rules().get(MAX_PARTITION_NUM);
+    if (value == null) {
+      return DEFAULT_MAX_PARTITION_NUM;
+    }
+    String limit = value.toString().trim();
+    if (limit.isEmpty()) {
+      return DEFAULT_MAX_PARTITION_NUM;
+    }
+    try {
+      int parsed = Integer.parseInt(limit);
+      return parsed > 0 ? parsed : DEFAULT_MAX_PARTITION_NUM;
+    } catch (NumberFormatException e) {
+      return DEFAULT_MAX_PARTITION_NUM;
+    }
+  }
 }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/ExpressionEvaluator.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/ExpressionEvaluator.java
new file mode 100644
index 0000000000..71d8a4d6e1
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/ExpressionEvaluator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.util;
+
+import java.util.Map;
+
+/**
+ * Evaluates rule expressions against a provided context map.
+ *
+ * <p>Implementations must treat {@code context} keys as variable names and 
resolve them when
+ * computing boolean or numeric results. Callers are expected to supply any 
required variables in
+ * the context map.
+ */
+public interface ExpressionEvaluator {
+  /**
+   * Evaluates an expression that returns a boolean value.
+   *
+   * @param expression expression to evaluate
+   * @param context variable bindings for the expression
+   * @return evaluation result
+   */
+  boolean evaluateBool(String expression, Map<String, Object> context);
+
+  /**
+   * Evaluates an expression that returns a numeric value and coerces it to a 
{@code long}.
+   *
+   * @param expression expression to evaluate
+   * @param context variable bindings for the expression
+   * @return evaluation result as a {@code long}
+   */
+  long evaluateLong(String expression, Map<String, Object> context);
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/QLExpressionEvaluator.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/QLExpressionEvaluator.java
new file mode 100644
index 0000000000..cd7609ebca
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/QLExpressionEvaluator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.util;
+
+import com.alibaba.qlexpress4.Express4Runner;
+import com.alibaba.qlexpress4.InitOptions;
+import com.alibaba.qlexpress4.QLOptions;
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+
+public class QLExpressionEvaluator implements ExpressionEvaluator {
+  private static final Express4Runner RUNNER = new 
Express4Runner(InitOptions.DEFAULT_OPTIONS);
+
+  @Override
+  public long evaluateLong(String expression, Map<String, Object> context) {
+    return toLong(evaluate(expression, context));
+  }
+
+  @Override
+  public boolean evaluateBool(String expression, Map<String, Object> context) {
+    return (boolean) evaluate(expression, context);
+  }
+
+  private Object evaluate(String expression, Map<String, Object> context) {
+    Preconditions.checkArgument(StringUtils.isNotBlank(expression), 
"expression is blank");
+    Preconditions.checkArgument(context != null, "context is null");
+    String formattedExpression = formatExpression(expression, context);
+    return RUNNER
+        .execute(formattedExpression, formatContextKey(context), 
QLOptions.DEFAULT_OPTIONS)
+        .getResult();
+  }
+
+  private Map<String, Object> formatContextKey(Map<String, Object> context) {
+    return context.entrySet().stream()
+        .collect(
+            Collectors.toMap(
+                entry -> normalizeIdentifier(entry.getKey()), entry -> 
entry.getValue()));
+  }
+
+  private String formatExpression(String expression, Map<String, Object> 
context) {
+    Map<String, String> replacements =
+        context.keySet().stream()
+            .collect(
+                Collectors.toMap(key -> key, this::normalizeIdentifier, (left, 
right) -> left));
+    replacements.entrySet().removeIf(entry -> 
entry.getKey().equals(entry.getValue()));
+    if (replacements.isEmpty()) {
+      return expression;
+    }
+
+    String alternation =
+        
replacements.keySet().stream().map(Pattern::quote).collect(Collectors.joining("|"));
+    Pattern pattern = Pattern.compile("(?<![A-Za-z0-9_])(" + alternation + 
")(?![A-Za-z0-9_])");
+    Matcher matcher = pattern.matcher(expression);
+    StringBuffer buffer = new StringBuffer();
+    while (matcher.find()) {
+      String matched = matcher.group(1);
+      matcher.appendReplacement(
+          buffer, Matcher.quoteReplacement(replacements.getOrDefault(matched, 
matched)));
+    }
+    matcher.appendTail(buffer);
+    return buffer.toString();
+  }
+
+  private String normalizeIdentifier(String name) {
+    return name.replace("-", "_");
+  }
+
+  private Long toLong(Object obj) {
+    if (obj instanceof Long) {
+      return (Long) obj;
+    }
+
+    if (obj instanceof Integer) {
+      return ((Integer) obj).longValue();
+    }
+
+    if (obj instanceof BigDecimal) {
+      return ((BigDecimal) obj).longValue();
+    }
+
+    if (obj instanceof Number) {
+      if (obj instanceof Double || obj instanceof Float) {
+        return Math.round(((Number) obj).doubleValue());
+      }
+      return ((Number) obj).longValue();
+    }
+
+    throw new IllegalArgumentException("Object cannot be converted to Long");
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/StatisticsUtils.java
similarity index 52%
copy from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
copy to 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/StatisticsUtils.java
index d9965978ed..2fa683646b 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/StatisticsUtils.java
@@ -17,22 +17,28 @@
  * under the License.
  */
 
-package org.apache.gravitino.maintenance.optimizer.api.recommender;
+package org.apache.gravitino.maintenance.optimizer.recommender.util;
 
-/**
- * Encapsulates the scored result and job execution context for a single 
strategy evaluation. The
- * recommender ranks evaluations by {@link #score()} before asking the {@link
- * org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter 
JobSubmitter} to launch
- * work.
- */
-public interface StrategyEvaluation {
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+
+public class StatisticsUtils {
 
-  /**
-   * Score used to rank multiple recommendations of the same strategy. Higher 
wins; equal scores
-   * preserve the evaluation order returned by the {@code StrategyHandler}.
-   */
-  long score();
+  private StatisticsUtils() {}
 
-  /** Job execution context for this evaluation. */
-  JobExecutionContext jobExecutionContext();
+  public static Map<String, Object> buildStatisticsContext(
+      List<StatisticEntry<?>> tableStatistics) {
+    Map<String, Object> context = new HashMap<>();
+    if (tableStatistics == null) {
+      return context;
+    }
+    for (StatisticEntry<?> statistic : tableStatistics) {
+      if (statistic != null && statistic.name() != null) {
+        context.put(statistic.name(), statistic.value().value());
+      }
+    }
+    return context;
+  }
 }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/StrategyUtils.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/StrategyUtils.java
new file mode 100644
index 0000000000..93106f22ff
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/StrategyUtils.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.util;
+
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+
+/** Utility methods and rule keys for interpreting optimizer strategies. */
+public class StrategyUtils {
+
+  /** Prefix for job option keys exposed as rules. */
+  public static final String JOB_ROLE_PREFIX = "job.";
+  /** Rule key for the trigger expression. */
+  public static final String TRIGGER_EXPR = "trigger-expr";
+  /** Rule key for the score expression. */
+  public static final String SCORE_EXPR = "score-expr";
+
+  private static final String DEFAULT_TRIGGER_EXPR = "false";
+  private static final String DEFAULT_SCORE_EXPR = "-1";
+  /**
+   * Resolve the trigger expression for a strategy.
+   *
+   * @param strategy strategy definition
+   * @return trigger expression or {@code false} by default
+   */
+  public static String getTriggerExpression(Strategy strategy) {
+    Object value = strategy.rules().get(TRIGGER_EXPR);
+    if (value == null) {
+      return DEFAULT_TRIGGER_EXPR;
+    }
+    String expression = value.toString();
+    return expression.trim().isEmpty() ? DEFAULT_TRIGGER_EXPR : expression;
+  }
+
+  /**
+   * Resolve the score expression for a strategy.
+   *
+   * @param strategy strategy definition
+   * @return score expression or {@code -1} by default
+   */
+  public static String getScoreExpression(Strategy strategy) {
+    Object value = strategy.rules().get(SCORE_EXPR);
+    if (value == null) {
+      return DEFAULT_SCORE_EXPR;
+    }
+    String expression = value.toString();
+    return expression.trim().isEmpty() ? DEFAULT_SCORE_EXPR : expression;
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/conf/TestOptimizerConfig.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/conf/TestOptimizerConfig.java
index c84d6954c3..903e62d77e 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/conf/TestOptimizerConfig.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/conf/TestOptimizerConfig.java
@@ -53,4 +53,20 @@ class TestOptimizerConfig {
         () -> config.get(OptimizerConfig.GRAVITINO_METALAKE_CONFIG));
     
Assertions.assertNull(config.get(OptimizerConfig.GRAVITINO_DEFAULT_CATALOG_CONFIG));
   }
+
+  @Test
+  void testJobSubmitterConfigsWithPrefix() {
+    Map<String, String> properties =
+        Map.of(
+            OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "spark.master",
+            "yarn",
+            OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "queue",
+            "default",
+            OptimizerConfig.GRAVITINO_URI,
+            "http://example.com";);
+    OptimizerConfig config = new OptimizerConfig(properties);
+
+    Assertions.assertEquals(
+        Map.of("spark.master", "yarn", "queue", "default"), 
config.jobSubmitterConfigs());
+  }
 }
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestRecommenderOrdering.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestRecommenderOrdering.java
index 6fcfcbe4fd..c49695d4b6 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestRecommenderOrdering.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestRecommenderOrdering.java
@@ -22,6 +22,7 @@ package 
org.apache.gravitino.maintenance.optimizer.recommender;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.PriorityQueue;
 import org.apache.gravitino.NameIdentifier;
 import 
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
@@ -55,7 +56,7 @@ class TestRecommenderOrdering {
 
     List<JobExecutionContext> ordered = new ArrayList<>(scoreQueue.size());
     while (!scoreQueue.isEmpty()) {
-      ordered.add(scoreQueue.poll().jobExecutionContext());
+      ordered.add(scoreQueue.poll().jobExecutionContext().orElseThrow());
     }
     return ordered;
   }
@@ -68,23 +69,24 @@ class TestRecommenderOrdering {
       }
 
       @Override
-      public JobExecutionContext jobExecutionContext() {
-        return new JobExecutionContext() {
-          @Override
-          public NameIdentifier nameIdentifier() {
-            return identifier;
-          }
+      public Optional<JobExecutionContext> jobExecutionContext() {
+        return Optional.of(
+            new JobExecutionContext() {
+              @Override
+              public NameIdentifier nameIdentifier() {
+                return identifier;
+              }
 
-          @Override
-          public Map<String, String> jobConfig() {
-            return Map.of();
-          }
+              @Override
+              public Map<String, String> jobOptions() {
+                return Map.of();
+              }
 
-          @Override
-          public String jobTemplateName() {
-            return "template";
-          }
-        };
+              @Override
+              public String jobTemplateName() {
+                return "template";
+              }
+            });
       }
     };
   }
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestStrategyFiltering.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestStrategyFiltering.java
index 8479458b45..aa3e0bda6e 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestStrategyFiltering.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestStrategyFiltering.java
@@ -19,6 +19,7 @@
 
 package org.apache.gravitino.maintenance.optimizer.recommender;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -68,7 +69,7 @@ class TestStrategyFiltering {
           .forEach(
               strategy ->
                   identifiersByStrategyName
-                      .computeIfAbsent(strategy.name(), key -> new 
java.util.ArrayList<>())
+                      .computeIfAbsent(strategy.name(), key -> new 
ArrayList<>())
                       .add(identifier));
     }
     return identifiersByStrategyName;
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyForTest.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyForTest.java
new file mode 100644
index 0000000000..1934b0f215
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyForTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.StrategyUtils;
+
+public class CompactionStrategyForTest implements Strategy {
+
+  static final String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  @Override
+  public String name() {
+    return "compaction-policy-for-test";
+  }
+
+  @Override
+  public String strategyType() {
+    return "compaction";
+  }
+
+  @Override
+  public Map<String, Object> rules() {
+    return ImmutableMap.of(
+        "min_datafile_mse",
+        1000,
+        StrategyUtils.JOB_ROLE_PREFIX + TARGET_FILE_SIZE_BYTES,
+        1024,
+        StrategyUtils.TRIGGER_EXPR,
+        "datafile_mse > min_datafile_mse",
+        StrategyUtils.SCORE_EXPR,
+        "datafile_mse * delete_file_num");
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return Map.of();
+  }
+
+  @Override
+  public Map<String, String> jobOptions() {
+    return Map.of(TARGET_FILE_SIZE_BYTES, "1024");
+  }
+
+  @Override
+  public String jobTemplateName() {
+    return "compaction-template";
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/TestCompactionStrategyHandler.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/TestCompactionStrategyHandler.java
new file mode 100644
index 0000000000..5e8c545e6b
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/TestCompactionStrategyHandler.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionStrategy;
+import 
org.apache.gravitino.maintenance.optimizer.api.common.PartitionStrategy.ScoreMode;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyEvaluation;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyHandlerContext;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategy;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.StrategyUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+class TestCompactionStrategyHandler {
+
+  private final Strategy strategy = new CompactionStrategyForTest();
+
+  @Test
+  void testShouldTriggerWithoutPartition() {
+    NameIdentifier tableId = NameIdentifier.of("db", "table");
+    Table tableMetadata = Mockito.mock(Table.class);
+    Mockito.when(tableMetadata.partitioning())
+        .thenReturn(new 
org.apache.gravitino.rel.expressions.transforms.Transform[0]);
+
+    List<StatisticEntry<?>> stats =
+        Arrays.asList(new StatisticEntryImpl("datafile_mse", 
StatisticValues.longValue(2000L)));
+    StrategyHandlerContext context =
+        StrategyHandlerContext.builder(tableId, strategy)
+            .withTableMetadata(tableMetadata)
+            .withTableStatistics(stats)
+            .build();
+    CompactionStrategyHandler handler = new CompactionStrategyHandler();
+    handler.initialize(context);
+    Assertions.assertTrue(handler.shouldTrigger());
+
+    stats = Arrays.asList(new StatisticEntryImpl("datafile_mse", 
StatisticValues.longValue(10L)));
+    StrategyHandlerContext lowContext =
+        StrategyHandlerContext.builder(tableId, strategy)
+            .withTableMetadata(tableMetadata)
+            .withTableStatistics(stats)
+            .build();
+    CompactionStrategyHandler lowHandler = new CompactionStrategyHandler();
+    lowHandler.initialize(lowContext);
+    Assertions.assertFalse(lowHandler.shouldTrigger());
+  }
+
+  @Test
+  void testShouldTriggerActionWithPartitions() {
+    NameIdentifier tableId = NameIdentifier.of("db", "table");
+    Table tableMetadata = Mockito.mock(Table.class);
+    Mockito.when(tableMetadata.partitioning())
+        .thenReturn(
+            new org.apache.gravitino.rel.expressions.transforms.Transform[] {
+              Transforms.identity("p")
+            });
+
+    Map<PartitionPath, List<StatisticEntry<?>>> partitionStats =
+        Map.of(
+            PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "1"))),
+            List.of(new StatisticEntryImpl("datafile_mse", 
StatisticValues.longValue(10L))),
+            PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "2"))),
+            List.of(new StatisticEntryImpl("datafile_mse", 
StatisticValues.longValue(2000L))));
+
+    StrategyHandlerContext context =
+        StrategyHandlerContext.builder(tableId, strategy)
+            .withTableMetadata(tableMetadata)
+            .withTableStatistics(Arrays.asList())
+            .withPartitionStatistics(partitionStats)
+            .build();
+
+    CompactionStrategyHandler handler = new CompactionStrategyHandler();
+    handler.initialize(context);
+    Assertions.assertTrue(handler.shouldTrigger());
+
+    Map<PartitionPath, List<StatisticEntry<?>>> allLowStats =
+        Map.of(
+            PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "1"))),
+            List.of(new StatisticEntryImpl("datafile_mse", 
StatisticValues.longValue(1L))),
+            PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "2"))),
+            List.of(new StatisticEntryImpl("datafile_mse", 
StatisticValues.longValue(5L))));
+    StrategyHandlerContext lowContext =
+        StrategyHandlerContext.builder(tableId, strategy)
+            .withTableMetadata(tableMetadata)
+            .withTableStatistics(Arrays.asList())
+            .withPartitionStatistics(allLowStats)
+            .build();
+
+    CompactionStrategyHandler lowHandler = new CompactionStrategyHandler();
+    lowHandler.initialize(lowContext);
+    Assertions.assertFalse(lowHandler.shouldTrigger());
+  }
+
+  @Test
+  void testJobConfig() {
+    NameIdentifier tableId = NameIdentifier.of("db", "table");
+    Table tableMetadata = Mockito.mock(Table.class);
+    JobExecutionContext config =
+        new CompactionJobContext(
+            tableId,
+            strategy.jobOptions(),
+            strategy.jobTemplateName(),
+            tableMetadata.columns(),
+            tableMetadata.partitioning(),
+            List.of());
+    Assertions.assertTrue(config instanceof CompactionJobContext);
+    CompactionJobContext compactionConfig = (CompactionJobContext) config;
+    Assertions.assertEquals(tableId, compactionConfig.nameIdentifier());
+    Assertions.assertEquals(
+        ImmutableMap.of(CompactionStrategyForTest.TARGET_FILE_SIZE_BYTES, 
"1024"),
+        compactionConfig.jobOptions());
+    Assertions.assertTrue(compactionConfig.getPartitions().isEmpty());
+  }
+
+  @Test
+  void testEvaluatePartitionTableScoreMode() {
+    NameIdentifier tableId = NameIdentifier.of("db", "table");
+    Table tableMetadata = Mockito.mock(Table.class);
+    Mockito.when(tableMetadata.partitioning())
+        .thenReturn(
+            new org.apache.gravitino.rel.expressions.transforms.Transform[] {
+              Transforms.identity("p")
+            });
+    Mockito.when(tableMetadata.columns()).thenReturn(new Column[0]);
+
+    Map<PartitionPath, List<StatisticEntry<?>>> partitionStats =
+        Map.of(
+            PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "1"))),
+            List.of(new StatisticEntryImpl("datafile_mse", 
StatisticValues.longValue(10L))),
+            PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "2"))),
+            List.of(new StatisticEntryImpl("datafile_mse", 
StatisticValues.longValue(30L))));
+
+    Assertions.assertEquals(
+        40L, evaluatePartitionScore(tableId, tableMetadata, partitionStats, 
ScoreMode.SUM, null));
+    Assertions.assertEquals(
+        30L, evaluatePartitionScore(tableId, tableMetadata, partitionStats, 
ScoreMode.MAX, null));
+    Assertions.assertEquals(
+        20L, evaluatePartitionScore(tableId, tableMetadata, partitionStats, 
null, null));
+  }
+
+  @Test
+  void testEvaluateMaxPartitionNumFromStrategy() {
+    NameIdentifier tableId = NameIdentifier.of("db", "table");
+    Table tableMetadata = Mockito.mock(Table.class);
+    Mockito.when(tableMetadata.partitioning())
+        .thenReturn(
+            new org.apache.gravitino.rel.expressions.transforms.Transform[] {
+              Transforms.identity("p")
+            });
+    Mockito.when(tableMetadata.columns()).thenReturn(new Column[0]);
+
+    Map<PartitionPath, List<StatisticEntry<?>>> partitionStats =
+        Map.of(
+            PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "1"))),
+            List.of(new StatisticEntryImpl("datafile_mse", 
StatisticValues.longValue(10L))),
+            PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "2"))),
+            List.of(new StatisticEntryImpl("datafile_mse", 
StatisticValues.longValue(20L))),
+            PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "3"))),
+            List.of(new StatisticEntryImpl("datafile_mse", 
StatisticValues.longValue(30L))));
+
+    Assertions.assertEquals(
+        30L, evaluatePartitionScore(tableId, tableMetadata, partitionStats, 
null, 1));
+    Assertions.assertEquals(
+        25L, evaluatePartitionScore(tableId, tableMetadata, partitionStats, 
null, 2));
+  }
+
+  @Test
+  void testTopPartitionSelectionRespectsLimitAndOrder() {
+    NameIdentifier tableId = NameIdentifier.of("db", "table");
+    Table tableMetadata = Mockito.mock(Table.class);
+    Mockito.when(tableMetadata.partitioning())
+        .thenReturn(
+            new org.apache.gravitino.rel.expressions.transforms.Transform[] {
+              Transforms.identity("p")
+            });
+    Mockito.when(tableMetadata.columns()).thenReturn(new Column[0]);
+
+    PartitionPath lowPartition = PartitionPath.of(Arrays.asList(new 
PartitionEntryImpl("p", "1")));
+    PartitionPath highPartition = PartitionPath.of(Arrays.asList(new 
PartitionEntryImpl("p", "2")));
+    PartitionPath midPartition = PartitionPath.of(Arrays.asList(new 
PartitionEntryImpl("p", "3")));
+
+    Map<PartitionPath, List<StatisticEntry<?>>> partitionStats =
+        Map.of(
+            lowPartition,
+            List.of(new StatisticEntryImpl("datafile_mse", 
StatisticValues.longValue(10L))),
+            highPartition,
+            List.of(new StatisticEntryImpl("datafile_mse", 
StatisticValues.longValue(30L))),
+            midPartition,
+            List.of(new StatisticEntryImpl("datafile_mse", 
StatisticValues.longValue(20L))));
+
+    StrategyHandlerContext context =
+        StrategyHandlerContext.builder(tableId, buildStrategy(null, 2))
+            .withTableMetadata(tableMetadata)
+            .withTableStatistics(List.of())
+            .withPartitionStatistics(partitionStats)
+            .build();
+
+    CompactionStrategyHandler handler = new CompactionStrategyHandler();
+    handler.initialize(context);
+    StrategyEvaluation evaluation = handler.evaluate();
+
+    CompactionJobContext jobContext =
+        (CompactionJobContext) evaluation.jobExecutionContext().orElseThrow();
+    List<PartitionPath> selected = jobContext.getPartitions();
+
+    Assertions.assertEquals(2, selected.size(), "Should return top two 
partitions");
+    Assertions.assertEquals(highPartition, selected.get(0), "Highest score 
should come first");
+    Assertions.assertEquals(midPartition, selected.get(1), "Second highest 
score expected");
+  }
+
+  private long evaluatePartitionScore(
+      NameIdentifier tableId,
+      Table tableMetadata,
+      Map<PartitionPath, List<StatisticEntry<?>>> partitionStats,
+      ScoreMode scoreMode,
+      Integer maxPartitionNum) {
+    StrategyHandlerContext context =
+        StrategyHandlerContext.builder(tableId, buildStrategy(scoreMode, 
maxPartitionNum))
+            .withTableMetadata(tableMetadata)
+            .withTableStatistics(List.of())
+            .withPartitionStatistics(partitionStats)
+            .build();
+
+    CompactionStrategyHandler handler = new CompactionStrategyHandler();
+    handler.initialize(context);
+    StrategyEvaluation evaluation = handler.evaluate();
+    Assertions.assertTrue(evaluation.jobExecutionContext().isPresent());
+    return evaluation.score();
+  }
+
+  private Strategy buildStrategy(ScoreMode scoreMode, Integer maxPartitionNum) 
{
+    Map<String, Object> rules = new HashMap<>();
+    rules.put(StrategyUtils.TRIGGER_EXPR, "datafile_mse > 0");
+    rules.put(StrategyUtils.SCORE_EXPR, "datafile_mse");
+    if (scoreMode != null) {
+      rules.put(GravitinoStrategy.PARTITION_TABLE_SCORE_MODE, scoreMode);
+    }
+    if (maxPartitionNum != null) {
+      rules.put(GravitinoStrategy.MAX_PARTITION_NUM, maxPartitionNum);
+    }
+    return new PartitionStrategy() {
+      @Override
+      public String name() {
+        return "compaction-score-mode-test";
+      }
+
+      @Override
+      public String strategyType() {
+        return "compaction";
+      }
+
+      @Override
+      public Map<String, Object> rules() {
+        return rules;
+      }
+
+      @Override
+      public Map<String, String> properties() {
+        return Map.of();
+      }
+
+      @Override
+      public Map<String, String> jobOptions() {
+        return Map.of();
+      }
+
+      @Override
+      public String jobTemplateName() {
+        return "compaction-template";
+      }
+
+      @Override
+      public ScoreMode partitionTableScoreMode() {
+        Object value = rules.get(GravitinoStrategy.PARTITION_TABLE_SCORE_MODE);
+        return value instanceof ScoreMode ? (ScoreMode) value : ScoreMode.AVG;
+      }
+
+      @Override
+      public int maxPartitionNum() {
+        Object value = rules.get(GravitinoStrategy.MAX_PARTITION_NUM);
+        if (value == null) {
+          return 100;
+        }
+        if (value instanceof Number) {
+          int parsed = ((Number) value).intValue();
+          return parsed > 0 ? parsed : 100;
+        }
+        try {
+          int parsed = Integer.parseInt(value.toString());
+          return parsed > 0 ? parsed : 100;
+        } catch (NumberFormatException e) {
+          return 100;
+        }
+      }
+    };
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
new file mode 100644
index 0000000000..83062a773b
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.dto.rel.ColumnDTO;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionJobContext;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+
+// Requires a running Gravitino server and Spark environment; enable with 
GRAVITINO_ENV_IT=true.
+@EnabledIfEnvironmentVariable(named = "GRAVITINO_ENV_IT", matches = "true")
+public class TestBuiltinIcebergRewriteDataFiles {
+
+  private static final String SERVER_URI = "http://localhost:8090";;
+  private static final String METALAKE_NAME = "test";
+  private static final String ICEBERG_REST_URI = 
"http://localhost:9001/iceberg";;
+  private static final String JOB_TEMPLATE_NAME = 
"builtin-iceberg-rewrite-data-files";
+  private static final String SPARK_CATALOG_NAME = "rest_catalog";
+  private static final String WAREHOUSE_LOCATION = "";
+
+  @Test
+  void testSubmitRewriteDataFilesJobByConfig() throws Exception {
+    String tableName = "rewrite_table1";
+    String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName;
+    runWithSparkAndMetalake(
+        (spark, metalake) -> {
+          createTableAndInsertData(spark, fullTableName);
+          Map<String, String> jobConf = buildManualJobConfig(tableName);
+          submitCompactionJob(metalake, jobConf);
+        });
+  }
+
+  @Test
+  void 
testSubmitBuiltinIcebergRewriteDataFilesJobFromAdapterAndOptimizerConfig() 
throws Exception {
+    String tableName = "rewrite_table2";
+    String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName;
+    runWithSparkAndMetalake(
+        (spark, metalake) -> {
+          createTableAndInsertData(spark, fullTableName);
+          OptimizerConfig optimizerConfig = createOptimizerConfig();
+          Map<String, String> jobOptions =
+              Map.of("min-input-files", "1", "target-file-size-bytes", 
"1048576");
+          Map<String, String> jobConf =
+              buildCompactionJobConfig(
+                  optimizerConfig,
+                  tableName,
+                  jobOptions,
+                  new Column[0],
+                  new Transform[0],
+                  Collections.emptyList());
+          submitCompactionJob(metalake, jobConf);
+        });
+  }
+
+  @Test
+  void testCompactNonPartitionTable() throws Exception {
+    String tableName = "rewrite_non_partition_table";
+    String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName;
+
+    runWithSparkAndMetalake(
+        (spark, metalake) -> {
+          createTableAndInsertData(spark, fullTableName);
+
+          long beforeFiles = countDataFiles(spark, fullTableName);
+          Assertions.assertTrue(beforeFiles > 1, "Expected multiple data files 
before compaction");
+
+          OptimizerConfig optimizerConfig = createOptimizerConfig();
+          Map<String, String> jobOptions = Map.of("min-input-files", "1");
+          Map<String, String> jobConf =
+              buildCompactionJobConfig(
+                  optimizerConfig,
+                  tableName,
+                  jobOptions,
+                  new Column[0],
+                  new Transform[0],
+                  Collections.emptyList());
+          submitCompactionJob(metalake, jobConf);
+
+          refreshTable(spark, fullTableName);
+          long afterFiles = countDataFiles(spark, fullTableName);
+          Assertions.assertTrue(
+              afterFiles < beforeFiles,
+              String.format(
+                  "Expected fewer data files after compaction after:%d, 
before:%d",
+                  afterFiles, beforeFiles));
+        });
+  }
+
+  @Test
+  void testCompactPartitionTable() throws Exception {
+    String tableName = "rewrite_partition_table";
+    String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName;
+
+    runWithSparkAndMetalake(
+        (spark, metalake) -> {
+          createPartitionTableAndInsertData(spark, fullTableName);
+
+          Map<String, Long> beforeCounts = countDataFilesByPartition(spark, 
fullTableName);
+          Map<String, Long> beforeMaxSizes = maxFileSizeByPartition(spark, 
fullTableName);
+          String targetPartition1 = partitionKey(2024, "1");
+          String targetPartition2 = partitionKey(2024, "2");
+          String otherPartition1 = partitionKey(2025, "1");
+          String otherPartition2 = partitionKey(2025, "2");
+
+          Assertions.assertTrue(
+              beforeCounts.getOrDefault(targetPartition1, 0L) > 1,
+              "Expected multiple data files in " + targetPartition1 + " before 
compaction");
+          Assertions.assertTrue(
+              beforeCounts.getOrDefault(targetPartition2, 0L) > 1,
+              "Expected multiple data files in " + targetPartition2 + " before 
compaction");
+          Assertions.assertTrue(
+              beforeCounts.getOrDefault(otherPartition1, 0L) > 1,
+              "Expected multiple data files in " + otherPartition1 + " before 
compaction");
+          Assertions.assertTrue(
+              beforeCounts.getOrDefault(otherPartition2, 0L) > 1,
+              "Expected multiple data files in " + otherPartition2 + " before 
compaction");
+
+          OptimizerConfig optimizerConfig = createOptimizerConfig();
+
+          List<PartitionPath> partitions =
+              Arrays.asList(
+                  PartitionPath.of(
+                      Arrays.asList(
+                          new PartitionEntryImpl("year", "2024"),
+                          new PartitionEntryImpl("month", "1"))),
+                  PartitionPath.of(
+                      Arrays.asList(
+                          new PartitionEntryImpl("year", "2024"),
+                          new PartitionEntryImpl("month", "2"))));
+          Column[] columns =
+              new Column[] {
+                column("year", Types.IntegerType.get()), column("month", 
Types.StringType.get())
+              };
+          Transform[] partitioning =
+              new Transform[] {Transforms.identity("year"), 
Transforms.identity("month")};
+
+          Map<String, String> jobOptions = Map.of("min-input-files", "1");
+          Map<String, String> jobConf =
+              buildCompactionJobConfig(
+                  optimizerConfig, tableName, jobOptions, columns, 
partitioning, partitions);
+          submitCompactionJob(metalake, jobConf);
+
+          refreshTable(spark, fullTableName);
+          Map<String, Long> afterCounts = countDataFilesByPartition(spark, 
fullTableName);
+          Map<String, Long> afterMaxSizes = maxFileSizeByPartition(spark, 
fullTableName);
+          Assertions.assertTrue(
+              afterMaxSizes.getOrDefault(targetPartition1, 0L)
+                  > beforeMaxSizes.getOrDefault(targetPartition1, 0L),
+              "Expected larger data files after compaction for " + 
targetPartition1);
+          Assertions.assertTrue(
+              afterMaxSizes.getOrDefault(targetPartition2, 0L)
+                  > beforeMaxSizes.getOrDefault(targetPartition2, 0L),
+              "Expected larger data files after compaction for " + 
targetPartition2);
+          Assertions.assertEquals(
+              beforeCounts.getOrDefault(otherPartition1, 0L),
+              afterCounts.getOrDefault(otherPartition1, 0L),
+              "Expected no compaction for " + otherPartition1);
+          Assertions.assertEquals(
+              beforeCounts.getOrDefault(otherPartition2, 0L),
+              afterCounts.getOrDefault(otherPartition2, 0L),
+              "Expected no compaction for " + otherPartition2);
+          Assertions.assertEquals(
+              beforeMaxSizes.getOrDefault(otherPartition1, 0L),
+              afterMaxSizes.getOrDefault(otherPartition1, 0L),
+              "Expected no size changes for " + otherPartition1);
+          Assertions.assertEquals(
+              beforeMaxSizes.getOrDefault(otherPartition2, 0L),
+              afterMaxSizes.getOrDefault(otherPartition2, 0L),
+              "Expected no size changes for " + otherPartition2);
+        });
+  }
+
+  private static GravitinoMetalake loadOrCreateMetalake(
+      GravitinoAdminClient client, String metalakeName) {
+    try {
+      return client.loadMetalake(metalakeName);
+    } catch (NoSuchMetalakeException ignored) {
+      return client.createMetalake(metalakeName, "IT metalake", Map.of());
+    }
+  }
+
+  @FunctionalInterface
+  private interface SparkMetalakeConsumer {
+    void accept(SparkSession spark, GravitinoMetalake metalake) throws 
Exception;
+  }
+
+  private static void runWithSparkAndMetalake(SparkMetalakeConsumer consumer) 
throws Exception {
+    SparkSession spark = createSparkSession();
+    try (GravitinoAdminClient client = 
GravitinoAdminClient.builder(SERVER_URI).build()) {
+      GravitinoMetalake metalake = loadOrCreateMetalake(client, METALAKE_NAME);
+      consumer.accept(spark, metalake);
+    } finally {
+      spark.stop();
+    }
+  }
+
+  private static void submitCompactionJob(GravitinoMetalake metalake, 
Map<String, String> jobConf) {
+    JobHandle jobHandle = metalake.runJob(JOB_TEMPLATE_NAME, jobConf);
+    System.out.println("Submitted job id: " + jobHandle.jobId());
+    Assertions.assertTrue(StringUtils.isNotBlank(jobHandle.jobId()), "Job id 
should not be blank");
+
+    Awaitility.await()
+        .atMost(Duration.ofMinutes(5))
+        .pollInterval(Duration.ofSeconds(2))
+        .until(
+            () -> {
+              JobHandle.Status status = 
metalake.getJob(jobHandle.jobId()).jobStatus();
+              return status == JobHandle.Status.SUCCEEDED
+                  || status == JobHandle.Status.FAILED
+                  || status == JobHandle.Status.CANCELLED;
+            });
+
+    JobHandle.Status finalStatus = 
metalake.getJob(jobHandle.jobId()).jobStatus();
+    Assertions.assertEquals(JobHandle.Status.SUCCEEDED, finalStatus, "Job 
should succeed");
+  }
+
+  private static Map<String, String> buildManualJobConfig(String tableName) {
+    Map<String, String> jobConf = new HashMap<>();
+    jobConf.put("table_identifier", "db." + tableName);
+    jobConf.put("where_clause", "");
+    jobConf.put("sort_order", "");
+    jobConf.put("strategy", "binpack");
+    jobConf.put("options", "{\"min-input-files\":\"1\"}");
+    jobConf.putAll(createOptimizerConfig().jobSubmitterConfigs());
+    return jobConf;
+  }
+
+  private static Map<String, String> buildCompactionJobConfig(
+      OptimizerConfig optimizerConfig,
+      String tableName,
+      Map<String, String> jobOptions,
+      Column[] columns,
+      Transform[] partitioning,
+      List<PartitionPath> partitions) {
+    CompactionJobContext jobContext =
+        new CompactionJobContext(
+            NameIdentifier.of(SPARK_CATALOG_NAME, "db", tableName),
+            jobOptions,
+            JOB_TEMPLATE_NAME,
+            columns,
+            partitioning,
+            partitions);
+    GravitinoCompactionJobAdapter adapter = new 
GravitinoCompactionJobAdapter();
+    return GravitinoJobSubmitter.buildJobConfig(optimizerConfig, jobContext, 
adapter);
+  }
+
+  private static long countDataFiles(SparkSession spark, String fullTableName) 
{
+    Row row =
+        spark
+            .sql(
+                "SELECT count(*) AS data_file_cnt FROM "
+                    + fullTableName
+                    + ".files WHERE content = 0")
+            .first();
+    return row.getLong(0);
+  }
+
+  private static void refreshTable(SparkSession spark, String fullTableName) {
+    spark.sql("REFRESH TABLE " + fullTableName);
+  }
+
+  private static Map<String, Long> countDataFilesByPartition(
+      SparkSession spark, String fullTableName) {
+    List<Row> rows =
+        spark
+            .sql(
+                "SELECT partition.year AS year, partition.month AS month, 
COUNT(*) AS data_file_cnt "
+                    + "FROM "
+                    + fullTableName
+                    + ".files WHERE content = 0 "
+                    + "GROUP BY partition.year, partition.month")
+            .collectAsList();
+    Map<String, Long> counts = new HashMap<>();
+    for (Row row : rows) {
+      int year = row.getInt(0);
+      String month = row.getString(1);
+      long count = row.getLong(2);
+      counts.put(partitionKey(year, month), count);
+    }
+    return counts;
+  }
+
+  private static Map<String, Long> maxFileSizeByPartition(
+      SparkSession spark, String fullTableName) {
+    List<Row> rows =
+        spark
+            .sql(
+                "SELECT partition.year AS year, partition.month AS month, "
+                    + "MAX(file_size_in_bytes) AS max_size "
+                    + "FROM "
+                    + fullTableName
+                    + ".files WHERE content = 0 "
+                    + "GROUP BY partition.year, partition.month")
+            .collectAsList();
+    Map<String, Long> sizes = new HashMap<>();
+    for (Row row : rows) {
+      int year = row.getInt(0);
+      String month = row.getString(1);
+      long maxSize = row.getLong(2);
+      sizes.put(partitionKey(year, month), maxSize);
+    }
+    return sizes;
+  }
+
+  private static String partitionKey(int year, String month) {
+    return "year=" + year + ",month=" + month;
+  }
+
+  private static Column column(String name, 
org.apache.gravitino.rel.types.Type type) {
+    return ColumnDTO.builder().withName(name).withDataType(type).build();
+  }
+
+  private static OptimizerConfig createOptimizerConfig() {
+    Map<String, String> optimizerConfigProps = new HashMap<>();
+    optimizerConfigProps.put(OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + 
"catalog_type", "rest");
+    optimizerConfigProps.put(
+        OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "catalog_uri", 
ICEBERG_REST_URI);
+    optimizerConfigProps.put(
+        OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "warehouse_location", 
WAREHOUSE_LOCATION);
+    optimizerConfigProps.put(
+        OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "catalog_name", 
SPARK_CATALOG_NAME);
+    optimizerConfigProps.put(
+        OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "spark_master", 
"local[2]");
+    optimizerConfigProps.put(
+        OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + 
"spark_executor_instances", "1");
+    optimizerConfigProps.put(
+        OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "spark_executor_cores", 
"1");
+    optimizerConfigProps.put(
+        OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "spark_executor_memory", 
"1g");
+    optimizerConfigProps.put(
+        OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "spark_driver_memory", 
"1g");
+    optimizerConfigProps.put(OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + 
"spark_conf", "{}");
+    return new OptimizerConfig(optimizerConfigProps);
+  }
+
+  private static SparkSession createSparkSession() {
+    return SparkSession.builder()
+        .master("local[2]")
+        .appName("builtin-iceberg-rewrite-data-files-it")
+        .config(
+            "spark.sql.extensions",
+            
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+        .config("spark.sql.catalog." + SPARK_CATALOG_NAME, 
"org.apache.iceberg.spark.SparkCatalog")
+        .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".type", "rest")
+        .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".cache-enabled", 
"false")
+        .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".uri", 
ICEBERG_REST_URI)
+        .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".warehouse", 
WAREHOUSE_LOCATION)
+        .getOrCreate();
+  }
+
+  private static void createPartitionTableAndInsertData(SparkSession spark, 
String fullTableName) {
+    spark.sql("CREATE NAMESPACE IF NOT EXISTS " + SPARK_CATALOG_NAME + ".db");
+    spark.sql("DROP TABLE IF EXISTS " + fullTableName);
+    spark.sql(
+        "CREATE TABLE "
+            + fullTableName
+            + " (id INT, data STRING, year INT, month STRING) USING iceberg "
+            + "PARTITIONED BY (year, month)");
+    spark.sql(
+        "ALTER TABLE "
+            + fullTableName
+            + " SET TBLPROPERTIES ('write.target-file-size-bytes'='1024000')");
+    int id = 0;
+    int[] years = new int[] {2024, 2025};
+    String[] months = new String[] {"1", "2"};
+    int rowsPerPartition = 20;
+    for (int year : years) {
+      for (String month : months) {
+        for (int i = 0; i < rowsPerPartition; i++) {
+          spark.sql(
+              "INSERT INTO "
+                  + fullTableName
+                  + " VALUES ("
+                  + id
+                  + ", 'value_"
+                  + id
+                  + "', "
+                  + year
+                  + ", '"
+                  + month
+                  + "'"
+                  + ")");
+          id++;
+        }
+      }
+    }
+  }
+
+  private static void createTableAndInsertData(SparkSession spark, String 
fullTableName) {
+    spark.sql("CREATE NAMESPACE IF NOT EXISTS " + SPARK_CATALOG_NAME + ".db");
+    spark.sql("DROP TABLE IF EXISTS " + fullTableName);
+    spark.sql("CREATE TABLE " + fullTableName + " (id INT, data STRING) USING 
iceberg");
+    spark.sql(
+        "ALTER TABLE "
+            + fullTableName
+            + " SET TBLPROPERTIES ('write.target-file-size-bytes'='1024000')");
+    for (int i = 0; i < 10; i++) {
+      spark.sql("INSERT INTO " + fullTableName + " VALUES (" + i + ", 'value_" 
+ i + "')");
+    }
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestGravitinoCompactionJobAdapter.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestGravitinoCompactionJobAdapter.java
new file mode 100644
index 0000000000..500138a7fb
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestGravitinoCompactionJobAdapter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionJobContext;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestGravitinoCompactionJobAdapter {
+
+  @Test
+  public void testJobTemplateName() {
+    GravitinoCompactionJobAdapter jobAdapter = new 
GravitinoCompactionJobAdapter();
+    Assertions.assertEquals(
+        Map.of(
+            "table_identifier", "db.table",
+            "where_clause", "",
+            "sort_order", "",
+            "strategy", "binpack",
+            "options", "{\"target_file_size_bytes\":\"1073741824\"}"),
+        jobAdapter.jobConfig(mockCompactionJobContext()));
+  }
+
+  private CompactionJobContext mockCompactionJobContext() {
+    String jobTemplateName = "compaction-job-template";
+    Column[] columns = new Column[0];
+    Transform[] partitioning = new Transform[0];
+    Map<String, String> jobOptions = Map.of("target_file_size_bytes", 
"1073741824");
+    return new CompactionJobContext(
+        NameIdentifier.of("catalog", "db", "table"),
+        jobOptions,
+        jobTemplateName,
+        columns,
+        partitioning,
+        List.of());
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestGravitinoJobSubmitter.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestGravitinoJobSubmitter.java
new file mode 100644
index 0000000000..621f41f7d5
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestGravitinoJobSubmitter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionStrategyHandler;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestGravitinoJobSubmitter {
+  @Test
+  void loadJobAdapterReturnsCompactionAdapter() {
+    GravitinoJobSubmitter submitter = new GravitinoJobSubmitter();
+    GravitinoJobAdapter adapter = 
submitter.loadJobAdapter(CompactionStrategyHandler.NAME);
+    Assertions.assertTrue(adapter instanceof GravitinoCompactionJobAdapter);
+  }
+
+  @Test
+  void loadJobAdapterFallsBackToConfiguredClassName() {
+    String jobTemplateName = "custom";
+    OptimizerConfig config =
+        new OptimizerConfig(
+            Map.of(
+                OptimizerConfig.GRAVITINO_URI,
+                "http://localhost:8090";,
+                OptimizerConfig.GRAVITINO_METALAKE,
+                "test-metalake",
+                OptimizerConfig.JOB_ADAPTER_PREFIX + jobTemplateName + 
".className",
+                GravitinoCompactionJobAdapter.class.getName()));
+    GravitinoJobSubmitter submitter = new GravitinoJobSubmitter();
+    submitter.initialize(new OptimizerEnv(config));
+
+    GravitinoJobAdapter adapter = submitter.loadJobAdapter(jobTemplateName);
+    Assertions.assertTrue(adapter instanceof GravitinoCompactionJobAdapter);
+  }
+
+  @Test
+  void buildJobConfigMergesWithExpectedPrecedence() {
+    OptimizerConfig config =
+        new OptimizerConfig(
+            Map.of(
+                OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "custom", 
"optimizer",
+                OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "override", 
"optimizer"));
+    GravitinoJobSubmitter submitter = new GravitinoJobSubmitter();
+    submitter.initialize(new OptimizerEnv(config));
+
+    JobExecutionContext context =
+        new JobExecutionContext() {
+          @Override
+          public NameIdentifier nameIdentifier() {
+            return NameIdentifier.of("db", "table");
+          }
+
+          @Override
+          public Map<String, String> jobOptions() {
+            return Map.of("context", "context", "override", "context");
+          }
+
+          @Override
+          public String jobTemplateName() {
+            return "compaction";
+          }
+        };
+
+    GravitinoJobAdapter adapter =
+        jobExecutionContext ->
+            Map.of("table", "db.table", "options", "map('k','v')", "override", 
"adapter");
+
+    Map<String, String> merged = GravitinoJobSubmitter.buildJobConfig(config, 
context, adapter);
+
+    Assertions.assertEquals("optimizer", merged.get("custom"));
+    Assertions.assertEquals("adapter", merged.get("override"));
+    Assertions.assertEquals("db.table", merged.get("table"));
+    Assertions.assertEquals("map('k','v')", merged.get("options"));
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestPartitionUtils.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestPartitionUtils.java
new file mode 100644
index 0000000000..6ff4859513
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestPartitionUtils.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.gravitino.dto.rel.ColumnDTO;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestPartitionUtils {
+
+  private static Column column(String name, 
org.apache.gravitino.rel.types.Type type) {
+    return ColumnDTO.builder().withName(name).withDataType(type).build();
+  }
+
+  @Test
+  void testIdentityStringPartition() {
+    List<PartitionEntry> partitions = Arrays.asList(new 
PartitionEntryImpl("colA", "O'Hara"));
+    Column[] columns = new Column[] {column("colA", Types.StringType.get())};
+    Transform[] transforms = new Transform[] {Transforms.identity("colA")};
+
+    String where = PartitionUtils.getWhereClauseForPartition(partitions, 
columns, transforms);
+
+    Assertions.assertEquals("colA = \"O'Hara\"", where);
+  }
+
+  @Test
+  void testIdentityStringPartitionEscapesQuotesAndBackslashes() {
+    List<PartitionEntry> partitions = Arrays.asList(new 
PartitionEntryImpl("colA", "a\"b\\c"));
+    Column[] columns = new Column[] {column("colA", Types.StringType.get())};
+    Transform[] transforms = new Transform[] {Transforms.identity("colA")};
+
+    String where = PartitionUtils.getWhereClauseForPartition(partitions, 
columns, transforms);
+
+    Assertions.assertEquals("colA = \"a\\\"b\\\\c\"", where);
+  }
+
+  @Test
+  void testIdentityTimestampPartition() {
+    List<PartitionEntry> partitions =
+        Arrays.asList(new PartitionEntryImpl("ts", "2024-01-01 00:00:00"));
+    Column[] columns = new Column[] {column("ts", 
Types.TimestampType.withTimeZone())};
+    Transform[] transforms = new Transform[] {Transforms.identity("ts")};
+
+    String where = PartitionUtils.getWhereClauseForPartition(partitions, 
columns, transforms);
+
+    Assertions.assertEquals("ts = TIMESTAMP '2024-01-01 00:00:00'", where);
+  }
+
+  @Test
+  void testBucketAndIdentityPartition() {
+    List<PartitionEntry> partitions =
+        Arrays.asList(new PartitionEntryImpl("colA", "abc"), new 
PartitionEntryImpl("bucket", "1"));
+    Column[] columns =
+        new Column[] {
+          column("colA", Types.StringType.get()), column("colB", 
Types.IntegerType.get())
+        };
+    Transform[] transforms =
+        new Transform[] {Transforms.identity("colA"), Transforms.bucket(2, new 
String[] {"colB"})};
+
+    String where = PartitionUtils.getWhereClauseForPartition(partitions, 
columns, transforms);
+
+    Assertions.assertEquals("colA = \"abc\" AND bucket(colB, 2) = 1", where);
+  }
+
+  @Test
+  void testTruncateAndYearPartition() {
+    List<PartitionEntry> partitions =
+        Arrays.asList(
+            new PartitionEntryImpl("truncate", "prefix"), new 
PartitionEntryImpl("year", "2024"));
+    Column[] columns =
+        new Column[] {
+          column("colC", Types.StringType.get()),
+          column("colD", Types.TimestampType.withoutTimeZone())
+        };
+    Transform[] transforms =
+        new Transform[] {Transforms.truncate(5, "colC"), 
Transforms.year("colD")};
+
+    String where = PartitionUtils.getWhereClauseForPartition(partitions, 
columns, transforms);
+
+    Assertions.assertEquals("truncate(colC, 5) = \"prefix\" AND year(colD) = 
2024", where);
+  }
+
+  @Test
+  void 
testWhereClauseCombinesMultiplePartitionsWithParenthesesForStringColumns() {
+    Column[] columns =
+        new Column[] {column("a", Types.StringType.get()), column("b", 
Types.StringType.get())};
+
+    List<PartitionEntry> p1 =
+        Arrays.asList(new PartitionEntryImpl("p1", "x1"), new 
PartitionEntryImpl("p1", "y1"));
+    List<PartitionEntry> p2 =
+        Arrays.asList(new PartitionEntryImpl("p2", "x2"), new 
PartitionEntryImpl("p2", "y2"));
+    List<PartitionPath> partitions = Arrays.asList(PartitionPath.of(p1), 
PartitionPath.of(p2));
+
+    Transform[] partitioning = new Transform[] {Transforms.identity("a"), 
Transforms.identity("b")};
+
+    String where = PartitionUtils.getWhereClauseForPartitions(partitions, 
columns, partitioning);
+
+    Assertions.assertEquals("(a = \"x1\" AND b = \"y1\") OR (a = \"x2\" AND b 
= \"y2\")", where);
+  }
+
+  @Test
+  void testWhereClauseEmitsNumericLiteralsWithoutQuotes() {
+    Column[] columns =
+        new Column[] {column("id", Types.LongType.get()), column("score", 
Types.DoubleType.get())};
+
+    List<PartitionEntry> p1 =
+        Arrays.asList(new PartitionEntryImpl("p1", "123"), new 
PartitionEntryImpl("p1", "45.6"));
+    List<PartitionEntry> p2 =
+        Arrays.asList(new PartitionEntryImpl("p2", "456"), new 
PartitionEntryImpl("p2", "78.9"));
+    List<PartitionPath> partitions = Arrays.asList(PartitionPath.of(p1), 
PartitionPath.of(p2));
+
+    Transform[] partitioning =
+        new Transform[] {Transforms.identity("id"), 
Transforms.identity("score")};
+
+    String where = PartitionUtils.getWhereClauseForPartitions(partitions, 
columns, partitioning);
+
+    Assertions.assertEquals("(id = 123 AND score = 45.6) OR (id = 456 AND 
score = 78.9)", where);
+  }
+
+  @Test
+  void testIdentityOnDateColumnIsFormattedAsDateLiteral() {
+    Column[] columns = new Column[] {column("dt", Types.DateType.get())};
+
+    List<PartitionEntry> p1 = Arrays.asList(new PartitionEntryImpl("p", 
"2024-01-01"));
+    List<PartitionPath> partitions = Arrays.asList(PartitionPath.of(p1));
+
+    Transform[] partitioning = new Transform[] {Transforms.identity("dt")};
+
+    String where = PartitionUtils.getWhereClauseForPartitions(partitions, 
columns, partitioning);
+
+    Assertions.assertEquals("(dt = DATE '2024-01-01')", where);
+  }
+
+  @Test
+  void testWhereClauseWithMixedTransformsAndMultiplePartitions() {
+    Column[] columns =
+        new Column[] {
+          column("event.ts", Types.TimestampType.withoutTimeZone()),
+          column("country", Types.StringType.get()),
+          column("user_id", Types.LongType.get()),
+          column("flag", Types.BooleanType.get())
+        };
+
+    Transform[] partitioning =
+        new Transform[] {
+          Transforms.hour(new String[] {"event", "ts"}),
+          Transforms.truncate(3, "country"),
+          Transforms.bucket(16, new String[] {"user_id"}),
+          Transforms.identity("flag")
+        };
+
+    List<PartitionEntry> p1 =
+        Arrays.asList(
+            new PartitionEntryImpl("p", "8"),
+            new PartitionEntryImpl("p", "usa"),
+            new PartitionEntryImpl("p", "5"),
+            new PartitionEntryImpl("p", "false"));
+    List<PartitionEntry> p2 =
+        Arrays.asList(
+            new PartitionEntryImpl("p", "12"),
+            new PartitionEntryImpl("p", "can"),
+            new PartitionEntryImpl("p", "9"),
+            new PartitionEntryImpl("p", "true"));
+    List<PartitionPath> partitions = Arrays.asList(PartitionPath.of(p1), 
PartitionPath.of(p2));
+
+    String where = PartitionUtils.getWhereClauseForPartitions(partitions, 
columns, partitioning);
+
+    Assertions.assertEquals(
+        "(hour(event.ts) = 8 AND truncate(country, 3) = \"usa\" AND 
bucket(user_id, 16) = 5 AND flag = false) "
+            + "OR (hour(event.ts) = 12 AND truncate(country, 3) = \"can\" AND 
bucket(user_id, 16) = 9 AND flag = true)",
+        where);
+  }
+
+  @Test
+  void testNullPartitionsThrowsIllegalArgumentException() {
+    Column[] columns = new Column[] {column("a", Types.StringType.get())};
+    Transform[] partitioning = new Transform[] {Transforms.identity("a")};
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> PartitionUtils.getWhereClauseForPartitions(null, columns, 
partitioning));
+  }
+
+  @Test
+  void testEmptyColumnsThrowsIllegalArgumentException() {
+    List<PartitionEntry> p1 = Arrays.asList(new PartitionEntryImpl("p", "v"));
+    List<PartitionPath> partitions = Arrays.asList(PartitionPath.of(p1));
+    Column[] columns = new Column[0];
+    Transform[] partitioning = new Transform[] {Transforms.identity("p")};
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> PartitionUtils.getWhereClauseForPartitions(partitions, columns, 
partitioning));
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/util/TestQLExpressionEvaluator.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/util/TestQLExpressionEvaluator.java
new file mode 100644
index 0000000000..cf372fcd38
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/util/TestQLExpressionEvaluator.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestQLExpressionEvaluator {
+
+  private final QLExpressionEvaluator evaluator = new QLExpressionEvaluator();
+
+  @Test
+  void testEvaluateLongWithValidExpression() {
+    Map<String, Object> context = new HashMap<>();
+    context.put("a", 10);
+    context.put("b", 20);
+
+    long result = evaluator.evaluateLong("a + b", context);
+    assertEquals(30L, result);
+  }
+
+  @Test
+  void testEvaluateLongWithDecimalResult() {
+    Map<String, Object> context = new HashMap<>();
+    context.put("a", 10);
+    context.put("b", 3);
+
+    long result = evaluator.evaluateLong("a / b", context);
+    assertEquals(3L, result); // Should truncate decimal part
+  }
+
+  @Test
+  void testEvaluateBoolWithTrueCondition() {
+    Map<String, Object> context = new HashMap<>();
+    context.put("x", 5);
+    context.put("y", 10);
+
+    boolean result = evaluator.evaluateBool("x < y", context);
+    assertTrue(result);
+  }
+
+  @Test
+  void testEvaluateBoolWithFalseCondition() {
+    Map<String, Object> context = new HashMap<>();
+    context.put("x", 15);
+    context.put("y", 10);
+
+    boolean result = evaluator.evaluateBool("x < y", context);
+    Assertions.assertFalse(result);
+  }
+
+  @Test
+  void testEvaluateWithMissingVariable() {
+    Map<String, Object> context = new HashMap<>();
+    context.put("a", 10);
+
+    assertThrows(
+        RuntimeException.class,
+        () -> {
+          evaluator.evaluateLong("a + b", context);
+        });
+  }
+
+  @Test
+  void testEvaluateWithInvalidExpression() {
+    Map<String, Object> context = new HashMap<>();
+
+    assertThrows(
+        RuntimeException.class,
+        () -> {
+          evaluator.evaluateLong("invalid expression", context);
+        });
+  }
+
+  @Test
+  void testEvaluateWithConstantExpression() {
+    Assertions.assertThrowsExactly(
+        IllegalArgumentException.class, () -> evaluator.evaluateLong("1 + 1", 
null));
+  }
+
+  @Test
+  void testEvaluateWithNullExpression() {
+    Map<String, Object> context = new HashMap<>();
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          evaluator.evaluateLong(null, context);
+        });
+  }
+
+  @Test
+  void testEvaluateWithDifferentVariableTypes() {
+    Map<String, Object> context = new HashMap<>();
+    context.put("intVal", 10);
+    context.put("doubleVal", 5.5);
+    context.put("stringVal", "hello");
+    context.put("boolVal", true);
+
+    // Test numeric operations
+    long numericResult = evaluator.evaluateLong("intVal + doubleVal", context);
+    assertEquals(16L, numericResult); // 10 + 5.5 = 15.5 -> rounded to 16
+
+    // Test boolean operations
+    boolean boolResult = evaluator.evaluateBool("boolVal && (intVal > 5)", 
context);
+    assertTrue(boolResult);
+  }
+
+  @Test
+  void testHyphenatedIdentifiersDoNotBreakSubtraction() {
+    Map<String, Object> context = new HashMap<>();
+    context.put("metric-1", 10);
+    context.put("metric2", 4);
+
+    long result = evaluator.evaluateLong("metric-1 - metric2", context);
+    assertEquals(6L, result);
+  }
+
+  @Test
+  void testNegativeLiteralPreserved() {
+    Map<String, Object> context = new HashMap<>();
+    context.put("x", 2);
+
+    long result = evaluator.evaluateLong("x + -1", context);
+    assertEquals(1L, result);
+  }
+
+  @Test
+  void testHyphenatedIdentifierNotMatchedInsideLargerToken() {
+    Map<String, Object> context = new HashMap<>();
+    context.put("metric-1", 5);
+    context.put("metric-1-extra", 7);
+
+    long result = evaluator.evaluateLong("metric-1 + metric-1-extra", context);
+    assertEquals(12L, result);
+  }
+
+  @Test
+  void testHyphenatedIdentifierNextToDotIsNotRewritten() {
+    Map<String, Object> context = new HashMap<>();
+    context.put("metric-1", 3);
+    context.put("a", 2);
+
+    long result = evaluator.evaluateLong("a + metric-1", context);
+    assertEquals(5L, result);
+  }
+}


Reply via email to