This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new a670864cfd [#9653] feat(optimizer): add compaction strategy related
impls (#9904)
a670864cfd is described below
commit a670864cfd5009ef4005f5218114b925ad5bab0e
Author: FANNG <[email protected]>
AuthorDate: Thu Feb 12 12:28:05 2026 +0900
[#9653] feat(optimizer): add compaction strategy related impls (#9904)
### What changes were proposed in this pull request?
* Adds QLExpress4 dependency for dynamic expression evaluation in
strategy triggers and scoring
* Implements compaction strategy handler with partition-aware
recommendation logic
* Adds partition utility methods for generating SQL WHERE clauses from
partition metadata
Example Output (CompactionJobAdapter) Given a job context for db.table
with two partitions (a='x1', b='y1') and (a='x2', b='y2'), and job
options
{target-file-size-bytes: "1024"}:
```
jobConfig() ->
{
"table": "db.table",
"where": "(a = 'x1' AND b = 'y1') OR (a = 'x2' AND b = 'y2')",
"options": "{\"target-file-size-bytes\", \"1024\"}"
}
```
### Why are the changes needed?
Fix: #9653
### Does this PR introduce _any_ user-facing change? no
### How was this patch tested?
existing tests
---
LICENSE.bin | 1 +
gradle/libs.versions.toml | 2 +
maintenance/optimizer/build.gradle.kts | 41 ++
.../optimizer/api/common/PartitionStrategy.java | 66 +++
.../maintenance/optimizer/api/common/Strategy.java | 2 +-
.../api/recommender/JobExecutionContext.java | 10 +-
.../api/recommender/StrategyEvaluation.java | 26 +-
.../optimizer/common/conf/OptimizerConfig.java | 23 ++
.../optimizer/recommender/Recommender.java | 70 +++-
.../handler/BaseExpressionStrategyHandler.java | 285 +++++++++++++
.../handler/StrategyEvaluationImpl.java} | 34 +-
.../handler/compaction/CompactionJobContext.java | 57 +++
.../compaction/CompactionStrategyHandler.java | 69 ++++
.../job/GravitinoCompactionJobAdapter.java | 83 ++++
.../recommender/job/GravitinoJobSubmitter.java | 76 +++-
.../optimizer/recommender/job/PartitionUtils.java | 176 ++++++++
.../statistics/GravitinoStatisticsProvider.java | 3 +-
.../recommender/strategy/GravitinoStrategy.java | 62 ++-
.../recommender/util/ExpressionEvaluator.java | 49 +++
.../recommender/util/QLExpressionEvaluator.java | 112 ++++++
.../util/StatisticsUtils.java} | 36 +-
.../optimizer/recommender/util/StrategyUtils.java | 65 +++
.../optimizer/common/conf/TestOptimizerConfig.java | 16 +
.../recommender/TestRecommenderOrdering.java | 34 +-
.../recommender/TestStrategyFiltering.java | 3 +-
.../compaction/CompactionStrategyForTest.java | 68 ++++
.../compaction/TestCompactionStrategyHandler.java | 330 +++++++++++++++
.../job/TestBuiltinIcebergRewriteDataFiles.java | 441 +++++++++++++++++++++
.../job/TestGravitinoCompactionJobAdapter.java | 59 +++
.../recommender/job/TestGravitinoJobSubmitter.java | 97 +++++
.../recommender/job/TestPartitionUtils.java | 220 ++++++++++
.../util/TestQLExpressionEvaluator.java | 170 ++++++++
32 files changed, 2710 insertions(+), 76 deletions(-)
diff --git a/LICENSE.bin b/LICENSE.bin
index ab096465e9..0350cd952d 100644
--- a/LICENSE.bin
+++ b/LICENSE.bin
@@ -385,6 +385,7 @@
Google FlatBuffers
IPAddress
Aliyun SDK OSS
+ QLExpress
This product bundles various third-party components also under the
Apache Software Foundation License 1.1
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 43cfd73d8d..d013347d58 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -137,6 +137,7 @@ ognl = "3.4.7"
concurrent-trees = "2.6.0"
jakarta-validation = "2.0.2"
aspectj = "1.9.24"
+ql-expression = "4.0.3"
[libraries]
aspectj-aspectjrt = { group = "org.aspectj", name = "aspectjrt", version.ref =
"aspectj" }
@@ -322,6 +323,7 @@ concurrent-trees = { group =
"com.googlecode.concurrent-trees", name = "concurre
jcasbin = { group='org.casbin', name='jcasbin', version.ref="jcasbin" }
openlineage-java= { group = "io.openlineage", name = "openlineage-java",
version.ref = "openlineage" }
ognl = { group='ognl', name='ognl', version.ref="ognl" }
+ql-expression = { group='com.alibaba', name='qlexpress4',
version.ref="ql-expression" }
[bundles]
log4j = ["slf4j-api", "log4j-slf4j2-impl", "log4j-api", "log4j-core",
"log4j-12-api", "log4j-layout-template-json"]
diff --git a/maintenance/optimizer/build.gradle.kts
b/maintenance/optimizer/build.gradle.kts
index 638dadbe3f..501b9ff3bb 100644
--- a/maintenance/optimizer/build.gradle.kts
+++ b/maintenance/optimizer/build.gradle.kts
@@ -24,6 +24,12 @@ plugins {
id("idea")
}
+val scalaVersion: String =
+ project.properties["scalaVersion"] as? String ?:
extra["defaultScalaVersion"].toString()
+val sparkVersion: String = libs.versions.spark33.get()
+val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
+val icebergVersion: String = libs.versions.iceberg4connector.get()
+
dependencies {
implementation(project(":api"))
implementation(project(":catalogs:catalog-common"))
@@ -39,12 +45,43 @@ dependencies {
implementation(libs.jackson.databind)
implementation(libs.jackson.annotations)
implementation(libs.guava)
+ implementation(libs.ql.expression)
annotationProcessor(libs.lombok)
compileOnly(libs.lombok)
+ testImplementation(libs.awaitility)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
+ testImplementation(libs.mockito.core)
+ testImplementation("org.slf4j:slf4j-api:1.7.36")
+ testRuntimeOnly("org.slf4j:slf4j-simple:1.7.36")
+ testRuntimeOnly(
+
"org.scala-lang.modules:scala-collection-compat_$scalaVersion:${libs.versions.scala.collection.compat.get()}"
+ )
+ testImplementation(
+
"org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion"
+ ) {
+ exclude(group = "org.slf4j", module = "slf4j-api")
+ exclude(group = "org.apache.logging.log4j", module = "log4j-slf4j-impl")
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ }
+
testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
{
+ exclude(group = "org.slf4j", module = "slf4j-api")
+ exclude(group = "org.apache.logging.log4j", module = "log4j-slf4j-impl")
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ }
+
testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion") {
+ exclude(group = "org.slf4j", module = "slf4j-api")
+ exclude(group = "org.apache.logging.log4j", module = "log4j-slf4j-impl")
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ }
+ testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
{
+ exclude(group = "org.slf4j", module = "slf4j-api")
+ exclude(group = "org.apache.logging.log4j", module = "log4j-slf4j-impl")
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ }
+ testImplementation(libs.testcontainers)
testAnnotationProcessor(libs.lombok)
testCompileOnly(libs.lombok)
@@ -86,6 +123,10 @@ tasks {
}
}
+configurations.testRuntimeClasspath {
+ exclude(group = "org.apache.logging.log4j", module = "log4j-slf4j2-impl")
+}
+
tasks.test {
val skipITs = project.hasProperty("skipITs")
if (skipITs) {
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionStrategy.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionStrategy.java
new file mode 100644
index 0000000000..d5cd5fb4ec
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionStrategy.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.common;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/**
+ * Strategy definition for partitioned tables.
+ *
+ * <p>Partition strategies provide additional configuration for aggregating
partition scores and
+ * selecting the number of partitions to evaluate.
+ */
+@DeveloperApi
+public interface PartitionStrategy extends Strategy {
+
+ int DEFAULT_MAX_PARTITION_NUM = 100;
+
+ /**
+ * Partition table score aggregation mode.
+ *
+ * <p>Defaults to {@link ScoreMode#AVG}.
+ *
+ * @return score mode enum
+ */
+ default ScoreMode partitionTableScoreMode() {
+ return ScoreMode.AVG;
+ }
+
+ /**
+ * Maximum number of partitions to include in a table evaluation.
+ *
+ * <p>Defaults to {@value #DEFAULT_MAX_PARTITION_NUM}.
+ *
+ * @return max partition number
+ */
+ default int maxPartitionNum() {
+ return DEFAULT_MAX_PARTITION_NUM;
+ }
+
+ /** Partition table score aggregation mode. */
+ enum ScoreMode {
+ /** Average score of all partitions. */
+ AVG,
+ /** Maximum score of all partitions. */
+ MAX,
+ /** Sum score of all partitions. */
+ SUM,
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java
index 0030e7d738..593b6e6f4b 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java
@@ -27,7 +27,7 @@ import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyHandle
* Strategy definition supplied by the control plane. The recommender pulls
strategies from a {@link
*
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyProvider},
routes them to a
* {@link StrategyHandler} by {@link #strategyType()}, and lets the handler
interpret the remaining
- * fields as needed.
+ * fields as needed. For partition-specific behavior, implement {@link
PartitionStrategy}.
*/
@DeveloperApi
public interface Strategy {
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java
index ec95fcce66..ff6df4c604 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java
@@ -38,14 +38,14 @@ public interface JobExecutionContext {
NameIdentifier nameIdentifier();
/**
- * Free-form job configuration, such as engine parameters (e.g., target file
size bytes).
+ * Free-form job options, such as engine parameters (e.g., target file size
bytes).
*
- * <p>The {@code StrategyHandler} is free to add additional job
configuration besides the job
- * options specified in the strategy.
+ * <p>The {@code StrategyHandler} is free to add additional job options
besides the job options
+ * specified in the strategy.
*
- * @return immutable map of configuration entries
+ * @return immutable map of option entries
*/
- Map<String, String> jobConfig();
+ Map<String, String> jobOptions();
/**
* Job template name to resolve the job in the job submitter.
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
index d9965978ed..fa04dfe45d 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
@@ -19,6 +19,8 @@
package org.apache.gravitino.maintenance.optimizer.api.recommender;
+import java.util.Optional;
+
/**
* Encapsulates the scored result and job execution context for a single
strategy evaluation. The
* recommender ranks evaluations by {@link #score()} before asking the {@link
@@ -27,12 +29,32 @@ package
org.apache.gravitino.maintenance.optimizer.api.recommender;
*/
public interface StrategyEvaluation {
+ /**
+ * Evaluation placeholder indicating that no execution should happen. It
uses score {@code -1} and
+ * an empty job execution context.
+ */
+ StrategyEvaluation NO_EXECUTION =
+ new StrategyEvaluation() {
+ @Override
+ public long score() {
+ return -1L;
+ }
+
+ @Override
+ public Optional<JobExecutionContext> jobExecutionContext() {
+ return Optional.empty();
+ }
+ };
+
/**
* Score used to rank multiple recommendations of the same strategy. Higher
wins; equal scores
* preserve the evaluation order returned by the {@code StrategyHandler}.
*/
long score();
- /** Job execution context for this evaluation. */
- JobExecutionContext jobExecutionContext();
+ /**
+ * Job execution context for this evaluation. Implementations return {@link
Optional#empty()} when
+ * {@link #NO_EXECUTION} is used to signal that no job should be submitted.
+ */
+ Optional<JobExecutionContext> jobExecutionContext();
}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
index 3f62b85c17..b0890957ac 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
@@ -44,6 +44,8 @@ public class OptimizerConfig extends Config {
public static final String GRAVITINO_METALAKE = OPTIMIZER_PREFIX +
"gravitinoMetalake";
public static final String GRAVITINO_DEFAULT_CATALOG =
OPTIMIZER_PREFIX + "gravitinoDefaultCatalog";
+ public static final String JOB_ADAPTER_PREFIX = OPTIMIZER_PREFIX +
"jobAdapter.";
+ public static final String JOB_SUBMITTER_CONFIG_PREFIX = OPTIMIZER_PREFIX +
"jobSubmitterConfig.";
private static final String RECOMMENDER_PREFIX = OPTIMIZER_PREFIX +
"recommender.";
private static final String STATISTICS_PROVIDER = RECOMMENDER_PREFIX +
"statisticsProvider";
@@ -132,4 +134,25 @@ public class OptimizerConfig extends Config {
super(false);
loadFromMap(properties, k -> true);
}
+
+ /**
+ * Returns job submitter custom config entries with the {@code
+ * gravitino.optimizer.jobSubmitterConfig.} prefix stripped.
+ *
+ * @return custom job submitter config map
+ */
+ public Map<String, String> jobSubmitterConfigs() {
+ return getConfigsWithPrefix(JOB_SUBMITTER_CONFIG_PREFIX);
+ }
+
+ public String getStrategyHandlerClassName(String strategyHandlerName) {
+ String configKey =
+ String.format(OPTIMIZER_PREFIX + "strategyHandler.%s.className",
strategyHandlerName);
+ return configMap.get(configKey);
+ }
+
+ public String getJobAdapterClassName(String jobTemplateName) {
+ String configKey = String.format(JOB_ADAPTER_PREFIX + "%s.className",
jobTemplateName);
+ return configMap.get(configKey);
+ }
}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
index c5a142293a..0602e7f286 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
@@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
@@ -79,6 +78,7 @@ public class Recommender implements AutoCloseable {
private final TableMetadataProvider tableMetadataProvider;
private final JobSubmitter jobSubmitter;
private final CloseableGroup closeableGroup = new CloseableGroup();
+ private final OptimizerEnv optimizerEnv;
/**
* Create a recommender whose providers and submitter are resolved from the
optimizer
@@ -93,6 +93,7 @@ public class Recommender implements AutoCloseable {
TableMetadataProvider tableMetadataProvider =
loadTableMetadataProvider(config);
JobSubmitter jobSubmitter = loadJobSubmitter(config);
+ this.optimizerEnv = optimizerEnv;
this.strategyProvider = strategyProvider;
this.statisticsProvider = statisticsProvider;
this.tableMetadataProvider = tableMetadataProvider;
@@ -111,7 +112,10 @@ public class Recommender implements AutoCloseable {
StrategyProvider strategyProvider,
StatisticsProvider statisticsProvider,
TableMetadataProvider tableMetadataProvider,
- JobSubmitter jobSubmitter) {
+ JobSubmitter jobSubmitter,
+ OptimizerEnv optimizerEnv) {
+
+ this.optimizerEnv = optimizerEnv;
this.strategyProvider = strategyProvider;
this.statisticsProvider = statisticsProvider;
this.tableMetadataProvider = tableMetadataProvider;
@@ -139,12 +143,25 @@ public class Recommender implements AutoCloseable {
for (Map.Entry<String, List<NameIdentifier>> entry :
identifiersByStrategyName.entrySet()) {
String strategyName = entry.getKey();
- List<JobExecutionContext> jobConfigs =
+ List<StrategyEvaluation> evaluations =
recommendForOneStrategy(entry.getValue(), strategyName);
- for (JobExecutionContext jobConfig : jobConfigs) {
- String templateName = jobConfig.jobTemplateName();
- String jobId = jobSubmitter.submitJob(templateName, jobConfig);
- LOG.info("Submit job {} for strategy {} with context {}", jobId,
strategyName, jobConfig);
+ for (StrategyEvaluation evaluation : evaluations) {
+ JobExecutionContext jobExecutionContext =
+ evaluation
+ .jobExecutionContext()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Job execution context is missing for evaluation
of strategy "
+ + strategyName));
+ String templateName = jobExecutionContext.jobTemplateName();
+ String jobId = jobSubmitter.submitJob(templateName,
jobExecutionContext);
+ LOG.info(
+ "Submit job {} for strategy {} with context {}",
+ jobId,
+ strategyName,
+ jobExecutionContext);
+ logRecommendation(strategyName, evaluation);
}
}
}
@@ -162,7 +179,7 @@ public class Recommender implements AutoCloseable {
closeableGroup.register(jobSubmitter, "job submitter");
}
- private List<JobExecutionContext> recommendForOneStrategy(
+ private List<StrategyEvaluation> recommendForOneStrategy(
List<NameIdentifier> identifiers, String strategyName) {
LOG.info("Recommend strategy {} for identifiers {}", strategyName,
identifiers);
Strategy strategy = strategyProvider.strategy(strategyName);
@@ -175,6 +192,13 @@ public class Recommender implements AutoCloseable {
continue;
}
StrategyEvaluation evaluation = strategyHandler.evaluate();
+ if (evaluation.score() < 0 ||
evaluation.jobExecutionContext().isEmpty()) {
+ LOG.info(
+ "Skip strategy {} for identifier {} because evaluation score is
negative or job execution context is missing",
+ strategyName,
+ identifier);
+ continue;
+ }
LOG.info(
"Recommend strategy {} for identifier {} score: {}",
strategyName,
@@ -183,9 +207,11 @@ public class Recommender implements AutoCloseable {
scoreQueue.add(evaluation);
}
- return scoreQueue.stream()
- .map(StrategyEvaluation::jobExecutionContext)
- .collect(Collectors.toList());
+ List<StrategyEvaluation> results = new ArrayList<>();
+ while (!scoreQueue.isEmpty()) {
+ results.add(scoreQueue.poll());
+ }
+ return results;
}
private StrategyHandler loadStrategyHandler(Strategy strategy,
NameIdentifier nameIdentifier) {
@@ -251,9 +277,8 @@ public class Recommender implements AutoCloseable {
* by configuration or an explicit registry that maps stable strategy type
strings (for example,
* {@code COMPACTION}) to {@link StrategyHandler} implementations.
*/
- @SuppressWarnings("UnusedVariable")
private String getStrategyHandlerClassName(String strategyType) {
- return "";
+ return optimizerEnv.config().getStrategyHandlerClassName(strategyType);
}
private Map<String, List<NameIdentifier>> getIdentifiersByStrategyName(
@@ -302,4 +327,23 @@ public class Recommender implements AutoCloseable {
+ "configure a statistics provider that implements
SupportTableStatistics.",
statisticsProvider.name()));
}
+
+ private void logRecommendation(String strategyName, StrategyEvaluation
evaluation) {
+ JobExecutionContext jobExecutionContext =
+ evaluation
+ .jobExecutionContext()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Job execution context is missing for evaluation of
strategy "
+ + strategyName));
+ System.out.println(
+ String.format(
+ "RECOMMEND: strategy=%s identifier=%s score=%d jobTemplate=%s
jobOptions=%s",
+ strategyName,
+ jobExecutionContext.nameIdentifier(),
+ evaluation.score(),
+ jobExecutionContext.jobTemplateName(),
+ jobExecutionContext.jobOptions()));
+ }
}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/BaseExpressionStrategyHandler.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/BaseExpressionStrategyHandler.java
new file mode 100644
index 0000000000..5c0aa59663
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/BaseExpressionStrategyHandler.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.handler;
+
+import com.google.common.base.Preconditions;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+import lombok.Value;
+import lombok.experimental.Accessors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionStrategy;
+import
org.apache.gravitino.maintenance.optimizer.api.common.PartitionStrategy.ScoreMode;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyEvaluation;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyHandler;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyHandlerContext;
+import
org.apache.gravitino.maintenance.optimizer.recommender.util.ExpressionEvaluator;
+import
org.apache.gravitino.maintenance.optimizer.recommender.util.QLExpressionEvaluator;
+import
org.apache.gravitino.maintenance.optimizer.recommender.util.StatisticsUtils;
+import
org.apache.gravitino.maintenance.optimizer.recommender.util.StrategyUtils;
+import org.apache.gravitino.rel.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base strategy handler that provides common expression evaluation and
statistics handling.
+ *
+ * <p>Subclasses supply strategy-specific logic while relying on the shared
context, statistics
+ * normalization, and expression evaluation utilities.
+ */
+public abstract class BaseExpressionStrategyHandler implements StrategyHandler
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseExpressionStrategyHandler.class);
+ // Sort partitions by score descending (highest score first).
+ private static final Comparator<PartitionScore> PARTITION_SCORE_ORDER =
+ (a, b) -> Long.compare(b.score(), a.score());
+
+ private final ExpressionEvaluator expressionEvaluator;
+ private Strategy strategy;
+ private List<StatisticEntry<?>> tableStatistics;
+ private Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics;
+ private Table tableMetadata;
+ private NameIdentifier nameIdentifier;
+
+ /** Create a handler that evaluates expressions with the default QL
evaluator. */
+ protected BaseExpressionStrategyHandler() {
+ this.expressionEvaluator = new QLExpressionEvaluator();
+ }
+
+ @Override
+ public void initialize(StrategyHandlerContext context) {
+ Preconditions.checkArgument(context.tableMetadata().isPresent(), "Table
metadata is null");
+ this.tableMetadata = context.tableMetadata().get();
+ this.nameIdentifier = context.nameIdentifier();
+ this.strategy = context.strategy();
+ this.tableStatistics = context.tableStatistics();
+ this.partitionStatistics = context.partitionStatistics();
+ }
+
+ @Override
+ public boolean shouldTrigger() {
+ if (isPartitionTable()) {
+ return shouldTriggerForPartitionTable();
+ }
+ return shouldTriggerForNonPartitionTable();
+ }
+
+ @Override
+ public StrategyEvaluation evaluate() {
+ if (isPartitionTable()) {
+ return evaluateForPartitionTable();
+ }
+ return evaluateForNonPartitionTable();
+ }
+
+ /**
+ * Build the execution context for the selected partitions.
+ *
+ * @param nameIdentifier target table identifier
+ * @param strategy strategy being evaluated
+ * @param tableMetadata table metadata requested by the handler
+ * @param partitions selected partitions, empty for non-partitioned tables
+ * @param jobOptions job options derived from the strategy
+ * @return job execution context
+ */
+ protected abstract JobExecutionContext buildJobExecutionContext(
+ NameIdentifier nameIdentifier,
+ Strategy strategy,
+ Table tableMetadata,
+ List<PartitionPath> partitions,
+ Map<String, String> jobOptions);
+
+ private int maxPartitionNum() {
+ if (strategy instanceof PartitionStrategy) {
+ return ((PartitionStrategy) strategy).maxPartitionNum();
+ }
+ return PartitionStrategy.DEFAULT_MAX_PARTITION_NUM;
+ }
+
+ private boolean isPartitionTable() {
+ return tableMetadata.partitioning().length > 0;
+ }
+
+ private boolean shouldTriggerForPartitionTable() {
+ if (partitionStatistics.isEmpty()) {
+ LOG.info("No partition statistics available for table {}",
nameIdentifier);
+ return false;
+ }
+ String triggerExpression = triggerExpression(strategy);
+ return partitionStatistics.values().stream()
+ .anyMatch(partitionStats -> evaluateBool(triggerExpression,
partitionStats));
+ }
+
+ private boolean shouldTriggerForNonPartitionTable() {
+ return evaluateBool(triggerExpression(strategy), tableStatistics);
+ }
+
+ private StrategyEvaluation evaluateForNonPartitionTable() {
+ long score = evaluateLong(scoreExpression(strategy), tableStatistics);
+ if (score <= 0) {
+ return StrategyEvaluation.NO_EXECUTION;
+ }
+ JobExecutionContext jobContext =
+ buildJobExecutionContext(
+ nameIdentifier, strategy, tableMetadata, List.of(),
strategy.jobOptions());
+ return new StrategyEvaluationImpl(score, jobContext);
+ }
+
+ /**
+ * Aggregate partition scores into a table score. The score mode is
controlled by {@link
+ *
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategy#PARTITION_TABLE_SCORE_MODE}
+ * and defaults to {@code avg}.
+ */
+ private long getTableScoreFromPartitions(List<PartitionScore>
partitionScores) {
+ if (partitionScores.isEmpty()) {
+ return -1L;
+ }
+ ScoreMode scoreMode = partitionTableScoreMode();
+ switch (scoreMode) {
+ case SUM:
+ return partitionScores.stream().mapToLong(PartitionScore::score).sum();
+ case MAX:
+ return
partitionScores.stream().mapToLong(PartitionScore::score).max().orElse(-1L);
+ case AVG:
+ return partitionScores.stream().mapToLong(PartitionScore::score).sum()
+ / partitionScores.size();
+ default:
+ LOG.warn(
+ "Unsupported partition table score mode '{}' for strategy {},
defaulting to avg",
+ scoreMode,
+ strategy.name());
+ return partitionScores.stream().mapToLong(PartitionScore::score).sum()
+ / partitionScores.size();
+ }
+ }
+
+ private StrategyEvaluation evaluateForPartitionTable() {
+ List<PartitionScore> partitionScores =
getTopPartitionScores(maxPartitionNum());
+ if (partitionScores.isEmpty()) {
+ return StrategyEvaluation.NO_EXECUTION;
+ }
+ List<PartitionPath> partitions =
+
partitionScores.stream().map(PartitionScore::partition).collect(Collectors.toList());
+ JobExecutionContext jobContext =
+ buildJobExecutionContext(
+ nameIdentifier, strategy, tableMetadata, partitions,
strategy.jobOptions());
+ long tableScore = getTableScoreFromPartitions(partitionScores);
+ return new StrategyEvaluationImpl(tableScore, jobContext);
+ }
+
+ private ScoreMode partitionTableScoreMode() {
+ if (strategy instanceof PartitionStrategy) {
+ return ((PartitionStrategy) strategy).partitionTableScoreMode();
+ }
+ return ScoreMode.AVG;
+ }
+
+ private long evaluateLong(String expression, List<StatisticEntry<?>>
statistics) {
+ Map<String, Object> context = buildExpressionContext(strategy, statistics);
+ try {
+ return expressionEvaluator.evaluateLong(expression, context);
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to evaluate expression '{}' with context {}",
expression, context, e);
+ return -1L;
+ }
+ }
+
+ private boolean evaluateBool(String expression, List<StatisticEntry<?>>
statistics) {
+ Map<String, Object> context = buildExpressionContext(strategy, statistics);
+ try {
+ return expressionEvaluator.evaluateBool(expression, context);
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to evaluate expression '{}' with context {}",
expression, context, e);
+ return false;
+ }
+ }
+
+ private static Map<String, Object> buildExpressionContext(
+ Strategy strategy, List<StatisticEntry<?>> statistics) {
+ Map<String, Object> context = new HashMap<>();
+ context.putAll(StatisticsUtils.buildStatisticsContext(statistics));
+ strategy
+ .rules()
+ .forEach(
+ (k, v) -> {
+ try {
+ context.put(k, Long.parseLong(v.toString()));
+ } catch (NumberFormatException e) {
+ // Ignore non-numeric rule values when building numeric
expression inputs.
+ }
+ });
+ return context;
+ }
+
+ private String triggerExpression(Strategy strategy) {
+ return StrategyUtils.getTriggerExpression(strategy);
+ }
+
+ private String scoreExpression(Strategy strategy) {
+ return StrategyUtils.getScoreExpression(strategy);
+ }
+
+ /**
+ * Return the highest-scoring partitions in descending order.
+ *
+ * @param limit max partitions to return
+ * @return top partition scores, empty when none score above zero
+ */
+ private List<PartitionScore> getTopPartitionScores(int limit) {
+ if (limit <= 0) {
+ return List.of();
+ }
+ PriorityQueue<PartitionScore> scoreQueue =
+ new PriorityQueue<>(limit, PARTITION_SCORE_ORDER.reversed());
+ partitionStatistics.forEach(
+ (partitionPath, statistics) -> {
+ boolean trigger = evaluateBool(triggerExpression(strategy),
statistics);
+ if (trigger) {
+ long partitionScore = evaluateLong(scoreExpression(strategy),
statistics);
+ if (partitionScore > 0) {
+ PartitionScore entry = new PartitionScore(partitionPath,
partitionScore);
+ if (scoreQueue.size() < limit) {
+ scoreQueue.add(entry);
+ } else if (scoreQueue.peek() != null && partitionScore >
scoreQueue.peek().score()) {
+ scoreQueue.poll();
+ scoreQueue.add(entry);
+ }
+ }
+ }
+ });
+
+ return
scoreQueue.stream().sorted(PARTITION_SCORE_ORDER).collect(Collectors.toList());
+ }
+
+ @Value
+ @Accessors(fluent = true)
+ private static final class PartitionScore {
+ PartitionPath partition;
+ long score;
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/StrategyEvaluationImpl.java
similarity index 53%
copy from
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
copy to
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/StrategyEvaluationImpl.java
index d9965978ed..6f7caac1d9 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/StrategyEvaluationImpl.java
@@ -17,22 +17,26 @@
* under the License.
*/
-package org.apache.gravitino.maintenance.optimizer.api.recommender;
+package org.apache.gravitino.maintenance.optimizer.recommender.handler;
-/**
- * Encapsulates the scored result and job execution context for a single
strategy evaluation. The
- * recommender ranks evaluations by {@link #score()} before asking the {@link
- * org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter
JobSubmitter} to launch
- * work.
- */
-public interface StrategyEvaluation {
+import java.util.Optional;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.Accessors;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyEvaluation;
+
+@RequiredArgsConstructor
+final class StrategyEvaluationImpl implements StrategyEvaluation {
+
+ @Accessors(fluent = true)
+ @Getter
+ private final long score;
- /**
- * Score used to rank multiple recommendations of the same strategy. Higher
wins; equal scores
- * preserve the evaluation order returned by the {@code StrategyHandler}.
- */
- long score();
+ private final JobExecutionContext jobExecutionContext;
- /** Job execution context for this evaluation. */
- JobExecutionContext jobExecutionContext();
+ @Override
+ public Optional<JobExecutionContext> jobExecutionContext() {
+ return Optional.ofNullable(jobExecutionContext);
+ }
}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionJobContext.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionJobContext.java
new file mode 100644
index 0000000000..f53000c455
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionJobContext.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction;
+
+import java.util.List;
+import java.util.Map;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+
+@RequiredArgsConstructor
+@ToString
+public class CompactionJobContext implements JobExecutionContext {
+ private final NameIdentifier name;
+ private final Map<String, String> jobOptions;
+ private final String jobTemplateName;
+ @Getter private final Column[] columns;
+ @Getter private final Transform[] partitioning;
+ @Getter private final List<PartitionPath> partitions;
+
+ @Override
+ public NameIdentifier nameIdentifier() {
+ return name;
+ }
+
+ @Override
+ public Map<String, String> jobOptions() {
+ return jobOptions;
+ }
+
+ @Override
+ public String jobTemplateName() {
+ return jobTemplateName;
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyHandler.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyHandler.java
new file mode 100644
index 0000000000..774e2a8119
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import
org.apache.gravitino.maintenance.optimizer.recommender.handler.BaseExpressionStrategyHandler;
+import org.apache.gravitino.rel.Table;
+
+/**
+ * Strategy handler that builds compaction job contexts from table metadata
and strategy settings.
+ */
+public class CompactionStrategyHandler extends BaseExpressionStrategyHandler {
+
+ public static final String NAME = "compaction";
+
+ @Override
+ public Set<DataRequirement> dataRequirements() {
+ return EnumSet.of(
+ DataRequirement.TABLE_METADATA,
+ DataRequirement.TABLE_STATISTICS,
+ DataRequirement.PARTITION_STATISTICS);
+ }
+
+ @Override
+ public String strategyType() {
+ return NAME;
+ }
+
+ @Override
+ protected JobExecutionContext buildJobExecutionContext(
+ NameIdentifier nameIdentifier,
+ Strategy strategy,
+ Table tableMetadata,
+ List<PartitionPath> partitions,
+ Map<String, String> jobOptions) {
+ List<PartitionPath> resolvedPartitions = partitions == null ? List.of() :
partitions;
+ return new CompactionJobContext(
+ nameIdentifier,
+ jobOptions,
+ strategy.jobTemplateName(),
+ tableMetadata.columns(),
+ tableMetadata.partitioning(),
+ resolvedPartitions);
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoCompactionJobAdapter.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoCompactionJobAdapter.java
new file mode 100644
index 0000000000..6ca9e67181
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoCompactionJobAdapter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import java.util.TreeMap;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import org.apache.gravitino.maintenance.optimizer.common.util.IdentifierUtils;
+import
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionJobContext;
+
+public class GravitinoCompactionJobAdapter implements GravitinoJobAdapter {
+
+ private static final ObjectMapper MAPPER =
+ new
ObjectMapper().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
+
+ @Override
+ public Map<String, String> jobConfig(JobExecutionContext
jobExecutionContext) {
+ Preconditions.checkArgument(
+ jobExecutionContext instanceof CompactionJobContext,
+ "jobExecutionContext must be CompactionJobExecutionContext");
+ CompactionJobContext jobContext = (CompactionJobContext)
jobExecutionContext;
+ return ImmutableMap.of(
+ "table_identifier", getTableName(jobContext),
+ "where_clause", getWhereClause(jobContext),
+ "sort_order", "",
+ "strategy", "binpack",
+ "options", getOptions(jobContext));
+ }
+
+ private String getTableName(CompactionJobContext jobContext) {
+ return
IdentifierUtils.removeCatalogFromIdentifier(jobContext.nameIdentifier()).toString();
+ }
+
+ private String getWhereClause(CompactionJobContext jobContext) {
+ if (jobContext.getPartitions().isEmpty()) {
+ return "";
+ }
+ // generate where clause from jobContext.partitionNames()
+ // 1. get partition column type
+ // 2. generate partition filter name, like day(xxx)
+ // 3. generate partition filter value, like '2023-10-01', TIMESTAMP 'xxx'
+ return PartitionUtils.getWhereClauseForPartitions(
+ jobContext.getPartitions(), jobContext.getColumns(),
jobContext.getPartitioning());
+ }
+
+ private String getOptions(JobExecutionContext jobExecutionContext) {
+ Map<String, String> map = jobExecutionContext.jobOptions();
+ return convertMapToJson(map);
+ }
+
+ private static String convertMapToJson(Map<String, ?> map) {
+ if (map == null || map.isEmpty()) {
+ return "{}";
+ }
+ try {
+ return MAPPER.writeValueAsString(new TreeMap<>(map));
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException("Failed to serialize options to JSON",
e);
+ }
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoJobSubmitter.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoJobSubmitter.java
index 455d51795d..07ee840ed4 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoJobSubmitter.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/GravitinoJobSubmitter.java
@@ -20,12 +20,17 @@
package org.apache.gravitino.maintenance.optimizer.recommender.job;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import java.util.LinkedHashMap;
import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.client.GravitinoClient;
import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
import org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
import
org.apache.gravitino.maintenance.optimizer.common.util.GravitinoClientUtils;
+import
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionStrategyHandler;
/** Submits optimizer jobs to Gravitino using job template adapters. */
public class GravitinoJobSubmitter implements JobSubmitter {
@@ -33,14 +38,17 @@ public class GravitinoJobSubmitter implements JobSubmitter {
public static final String NAME = "gravitino-job-submitter";
private GravitinoClient gravitinoClient;
-
- private final Map<String, Class<? extends GravitinoJobAdapter>> jobAdapters
= Map.of();
+ private OptimizerEnv optimizerEnv;
+ private OptimizerConfig optimizerConfig;
/**
* Returns the provider name for configuration lookup.
*
* @return provider name
*/
+ private final Map<String, Class<? extends GravitinoJobAdapter>> jobAdapters =
+ ImmutableMap.of(CompactionStrategyHandler.NAME,
GravitinoCompactionJobAdapter.class);
+
@Override
public String name() {
return NAME;
@@ -53,7 +61,8 @@ public class GravitinoJobSubmitter implements JobSubmitter {
*/
@Override
public void initialize(OptimizerEnv optimizerEnv) {
- this.gravitinoClient = GravitinoClientUtils.createClient(optimizerEnv);
+ this.optimizerEnv = optimizerEnv;
+ this.optimizerConfig = optimizerEnv.config();
}
/**
@@ -65,9 +74,10 @@ public class GravitinoJobSubmitter implements JobSubmitter {
*/
@Override
public String submitJob(String jobTemplateName, JobExecutionContext
jobExecutionContext) {
+ ensureClientInitialized();
GravitinoJobAdapter jobAdapter = loadJobAdapter(jobTemplateName);
return gravitinoClient
- .runJob(jobTemplateName, jobAdapter.jobConfig(jobExecutionContext))
+ .runJob(jobTemplateName, buildJobConfig(optimizerConfig,
jobExecutionContext, jobAdapter))
.jobId();
}
@@ -79,11 +89,67 @@ public class GravitinoJobSubmitter implements JobSubmitter {
}
}
+ private void ensureClientInitialized() {
+ if (gravitinoClient == null) {
+ if (optimizerEnv == null) {
+ throw new IllegalStateException("Job submitter is not initialized");
+ }
+ this.gravitinoClient = GravitinoClientUtils.createClient(optimizerEnv);
+ }
+ }
+
+ /**
+ * Merge job configs with precedence: optimizer config < adapter config.
+ *
+ * <p>Typical use cases:
+ *
+ * <ul>
+ * <li>Optimizer config: shared engine/runtime defaults (for example,
Spark settings).
+ * <li>Adapter config: adapter-specific parameters (for example, WHERE
filters) required by the
+ * job template.
+ * </ul>
+ */
+ @VisibleForTesting
+ static Map<String, String> buildJobConfig(
+ OptimizerConfig optimizerConfig,
+ JobExecutionContext jobExecutionContext,
+ GravitinoJobAdapter jobAdapter) {
+ Map<String, String> submitterConfigs =
+ optimizerConfig == null ? Map.of() :
optimizerConfig.jobSubmitterConfigs();
+ Map<String, String> adapterConfigs =
+ jobAdapter == null ? Map.of() :
jobAdapter.jobConfig(jobExecutionContext);
+
+ Map<String, String> mergedConfigs = new LinkedHashMap<>();
+ mergedConfigs.putAll(submitterConfigs);
+ mergedConfigs.putAll(adapterConfigs);
+ return mergedConfigs;
+ }
+
@VisibleForTesting
GravitinoJobAdapter loadJobAdapter(String jobTemplateName) {
Class<? extends GravitinoJobAdapter> jobAdapterClz =
jobAdapters.get(jobTemplateName);
if (jobAdapterClz == null) {
- throw new IllegalArgumentException("No job adapter found for template: "
+ jobTemplateName);
+ String jobAdapterClassName =
+ optimizerConfig == null ? null :
optimizerConfig.getJobAdapterClassName(jobTemplateName);
+ if (StringUtils.isBlank(jobAdapterClassName)) {
+ throw new IllegalArgumentException("No job adapter found for template:
" + jobTemplateName);
+ }
+ try {
+ Class<?> rawClass = Class.forName(jobAdapterClassName);
+ if (!GravitinoJobAdapter.class.isAssignableFrom(rawClass)) {
+ throw new IllegalArgumentException(
+ "Configured job adapter class does not implement
GravitinoJobAdapter: "
+ + jobAdapterClassName);
+ }
+ jobAdapterClz = rawClass.asSubclass(GravitinoJobAdapter.class);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to load job adapter class '"
+ + jobAdapterClassName
+ + "' for template: "
+ + jobTemplateName,
+ e);
+ }
}
try {
return jobAdapterClz.getDeclaredConstructor().newInstance();
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/PartitionUtils.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/PartitionUtils.java
new file mode 100644
index 0000000000..1d6251ab40
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/job/PartitionUtils.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Type;
+
+public class PartitionUtils {
+
+ public static String getWhereClauseForPartitions(
+ List<PartitionPath> partitions, Column[] columns, Transform[]
partitioning) {
+ Preconditions.checkArgument(
+ partitions != null && !partitions.isEmpty(), "partitions cannot be
null or empty");
+ Preconditions.checkArgument(ArrayUtils.isNotEmpty(columns), "columns
cannot be null or empty");
+ Preconditions.checkArgument(
+ partitioning != null && partitioning.length > 0, "partitioning cannot
be null or empty");
+
+ List<String> predicates =
Lists.newArrayListWithExpectedSize(partitions.size());
+ for (PartitionPath partition : partitions) {
+ predicates.add(getWhereClauseForPartition(partition.entries(), columns,
partitioning));
+ }
+ // Wrap each partition-level predicate in parentheses to preserve logical
grouping
+ // e.g. "(col1 = v1 AND col2 = v2) OR (col1 = v3 AND col2 = v4)"
+ return predicates.stream().map(p -> "(" + p +
")").collect(Collectors.joining(" OR "));
+ }
+
+ // For Identity transform, the where clause is like "columnName = value"
+ // For bucket transform, the where clause is like "bucket(columnName,
numBuckets) = value"
+ // For truncate transform, the where clause is like "truncate(columnName,
width) = value"
+ // For year/month/day/hour transform, the where clause is like
"year(columnName) = value"
+ // We could get value from partition.get(i).partitionValue(), if the value
type is string, we need
+ // to add quotes like "value", if the value type is number, we don't need
to add quotes. if the
+ // value type is date/datetime, we need to add quotes like TIMESTAMP
'2024-01-01'
+ public static String getWhereClauseForPartition(
+ List<PartitionEntry> partition, Column[] columns, Transform[]
partitioning) {
+ Preconditions.checkArgument(
+ partition != null && !partition.isEmpty(), "partition cannot be null
or empty");
+ Preconditions.checkArgument(ArrayUtils.isNotEmpty(columns), "columns
cannot be null or empty");
+ Preconditions.checkArgument(
+ partitioning != null && partitioning.length == partition.size(),
+ "partitioning must match the size of partition entries");
+
+ Map<String, Column> columnMap =
+ Arrays.stream(columns)
+ .collect(
+ Collectors.toMap(
+ c -> c.name().toLowerCase(Locale.ROOT),
Function.identity(), (l, r) -> l));
+
+ List<String> predicates =
Lists.newArrayListWithExpectedSize(partition.size());
+ for (int i = 0; i < partition.size(); i++) {
+ PartitionEntry entry = partition.get(i);
+ Transform transform = partitioning[i];
+
+ String columnName = getColumnName(transform);
+ Column column =
+ Preconditions.checkNotNull(
+ columnMap.get(columnName.toLowerCase(Locale.ROOT)),
+ "Column '%s' not found in table schema",
+ columnName);
+
+ String expression = buildExpression(transform, columnName);
+ String literal = formatLiteral(transform, column,
entry.partitionValue());
+ predicates.add(String.format("%s = %s", expression, literal));
+ }
+
+ return String.join(" AND ", predicates);
+ }
+
+ private static String getColumnName(Transform transform) {
+ if (transform instanceof Transform.SingleFieldTransform) {
+ return joinFieldName(((Transform.SingleFieldTransform)
transform).fieldName());
+ } else if (transform instanceof Transforms.TruncateTransform) {
+ return joinFieldName(((Transforms.TruncateTransform)
transform).fieldName());
+ } else if (transform instanceof Transforms.BucketTransform) {
+ String[][] fieldNames = ((Transforms.BucketTransform)
transform).fieldNames();
+ Preconditions.checkArgument(
+ fieldNames.length > 0, "Bucket transform must have at least one
field");
+ return joinFieldName(fieldNames[0]);
+ }
+ throw new IllegalArgumentException("Unsupported transform: " +
transform.getClass().getName());
+ }
+
+ private static String buildExpression(Transform transform, String
columnName) {
+ if (transform instanceof Transforms.IdentityTransform) {
+ return columnName;
+ } else if (transform instanceof Transforms.YearTransform
+ || transform instanceof Transforms.MonthTransform
+ || transform instanceof Transforms.DayTransform
+ || transform instanceof Transforms.HourTransform) {
+ return transform.name() + "(" + columnName + ")";
+ } else if (transform instanceof Transforms.BucketTransform) {
+ return "bucket("
+ + columnName
+ + ", "
+ + ((Transforms.BucketTransform) transform).numBuckets()
+ + ")";
+ } else if (transform instanceof Transforms.TruncateTransform) {
+ return "truncate("
+ + columnName
+ + ", "
+ + ((Transforms.TruncateTransform) transform).width()
+ + ")";
+ }
+ throw new IllegalArgumentException("Unsupported transform: " +
transform.getClass().getName());
+ }
+
+ private static String formatLiteral(Transform transform, Column column,
String rawValue) {
+ if (transform instanceof Transforms.YearTransform
+ || transform instanceof Transforms.MonthTransform
+ || transform instanceof Transforms.DayTransform
+ || transform instanceof Transforms.HourTransform
+ || transform instanceof Transforms.BucketTransform) {
+ return rawValue;
+ }
+
+ Type type = column.dataType();
+ switch (type.name()) {
+ case BOOLEAN:
+ return rawValue.toLowerCase(Locale.ROOT);
+ case BYTE:
+ case SHORT:
+ case INTEGER:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case DECIMAL:
+ return rawValue;
+ case DATE:
+ return "DATE '" + rawValue + "'";
+ case TIMESTAMP:
+ return "TIMESTAMP '" + rawValue + "'";
+ case TIME:
+ return "TIME '" + rawValue + "'";
+ default:
+ return "\"" + escapeStringLiteral(rawValue) + "\"";
+ }
+ }
+
+ private static String escapeStringLiteral(String value) {
+ return value.replace("\\", "\\\\").replace("\"", "\\\"");
+ }
+
+ private static String joinFieldName(String[] fieldNameParts) {
+ return String.join(".", fieldNameParts);
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/statistics/GravitinoStatisticsProvider.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/statistics/GravitinoStatisticsProvider.java
index 394691d286..76bb7ac8db 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/statistics/GravitinoStatisticsProvider.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/statistics/GravitinoStatisticsProvider.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.maintenance.optimizer.recommender.statistics;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
@@ -113,7 +114,7 @@ public class GravitinoStatisticsProvider implements
SupportTableStatistics {
.forEach(
statistic ->
statisticsByPartition
- .computeIfAbsent(partitions, key -> new
java.util.ArrayList<>())
+ .computeIfAbsent(partitions, key -> new ArrayList<>())
.add(new StatisticEntryImpl<>(statistic.name(),
statistic.value().get())));
}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategy.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategy.java
index 0dfb665c17..a66bb43b7a 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategy.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategy.java
@@ -23,18 +23,29 @@ import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionStrategy;
+import
org.apache.gravitino.maintenance.optimizer.api.common.PartitionStrategy.ScoreMode;
import org.apache.gravitino.policy.Policy;
import org.apache.gravitino.policy.PolicyContent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Strategy implementation backed by a Gravitino policy. */
-public class GravitinoStrategy implements Strategy {
+public class GravitinoStrategy implements PartitionStrategy {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GravitinoStrategy.class);
@VisibleForTesting public static final String STRATEGY_TYPE_KEY =
"strategy.type";
@VisibleForTesting public static final String JOB_TEMPLATE_NAME_KEY =
"job.template-name";
private static final String JOB_OPTIONS_PREFIX = "job.options.";
+ /** Rule key for the partition table score aggregation mode. */
+ public static final String PARTITION_TABLE_SCORE_MODE =
"partition_table_score_mode";
+ /** Rule key for the maximum number of partitions selected for execution. */
+ public static final String MAX_PARTITION_NUM = "max_partition_num";
+
+ private static final int DEFAULT_MAX_PARTITION_NUM = 100;
private final Policy policy;
@@ -118,4 +129,51 @@ public class GravitinoStrategy implements Strategy {
return
Optional.ofNullable(policy.content().properties().get(JOB_TEMPLATE_NAME_KEY))
.orElseThrow(() -> new IllegalArgumentException("job.template-name is
not set"));
}
+
+ @Override
+ public ScoreMode partitionTableScoreMode() {
+ Object value = rules().get(PARTITION_TABLE_SCORE_MODE);
+ if (value == null) {
+ return ScoreMode.AVG;
+ }
+ if (value instanceof ScoreMode) {
+ return (ScoreMode) value;
+ }
+ String mode = value.toString().trim().toLowerCase();
+ if (mode.isEmpty()) {
+ return ScoreMode.AVG;
+ }
+ switch (mode) {
+ case "sum":
+ return ScoreMode.SUM;
+ case "max":
+ return ScoreMode.MAX;
+ case "avg":
+ return ScoreMode.AVG;
+ default:
+ LOG.warn(
+ "Unsupported partition table score mode '{}' for strategy {},
defaulting to avg",
+ mode,
+ name());
+ return ScoreMode.AVG;
+ }
+ }
+
+ @Override
+ public int maxPartitionNum() {
+ Object value = rules().get(MAX_PARTITION_NUM);
+ if (value == null) {
+ return DEFAULT_MAX_PARTITION_NUM;
+ }
+ String limit = value.toString().trim();
+ if (limit.isEmpty()) {
+ return DEFAULT_MAX_PARTITION_NUM;
+ }
+ try {
+ int parsed = Integer.parseInt(limit);
+ return parsed > 0 ? parsed : DEFAULT_MAX_PARTITION_NUM;
+ } catch (NumberFormatException e) {
+ return DEFAULT_MAX_PARTITION_NUM;
+ }
+ }
}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/ExpressionEvaluator.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/ExpressionEvaluator.java
new file mode 100644
index 0000000000..71d8a4d6e1
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/ExpressionEvaluator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.util;
+
+import java.util.Map;
+
+/**
+ * Evaluates rule expressions against a provided context map.
+ *
+ * <p>Implementations must treat {@code context} keys as variable names and
resolve them when
+ * computing boolean or numeric results. Callers are expected to supply any
required variables in
+ * the context map.
+ */
+public interface ExpressionEvaluator {
+ /**
+ * Evaluates an expression that returns a boolean value.
+ *
+ * @param expression expression to evaluate
+ * @param context variable bindings for the expression
+ * @return evaluation result
+ */
+ boolean evaluateBool(String expression, Map<String, Object> context);
+
+ /**
+ * Evaluates an expression that returns a numeric value and coerces it to a
{@code long}.
+ *
+ * @param expression expression to evaluate
+ * @param context variable bindings for the expression
+ * @return evaluation result as a {@code long}
+ */
+ long evaluateLong(String expression, Map<String, Object> context);
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/QLExpressionEvaluator.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/QLExpressionEvaluator.java
new file mode 100644
index 0000000000..cd7609ebca
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/QLExpressionEvaluator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.util;
+
+import com.alibaba.qlexpress4.Express4Runner;
+import com.alibaba.qlexpress4.InitOptions;
+import com.alibaba.qlexpress4.QLOptions;
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+
+public class QLExpressionEvaluator implements ExpressionEvaluator {
+ private static final Express4Runner RUNNER = new
Express4Runner(InitOptions.DEFAULT_OPTIONS);
+
+ @Override
+ public long evaluateLong(String expression, Map<String, Object> context) {
+ return toLong(evaluate(expression, context));
+ }
+
+ @Override
+ public boolean evaluateBool(String expression, Map<String, Object> context) {
+ return (boolean) evaluate(expression, context);
+ }
+
+ private Object evaluate(String expression, Map<String, Object> context) {
+ Preconditions.checkArgument(StringUtils.isNotBlank(expression),
"expression is blank");
+ Preconditions.checkArgument(context != null, "context is null");
+ String formattedExpression = formatExpression(expression, context);
+ return RUNNER
+ .execute(formattedExpression, formatContextKey(context),
QLOptions.DEFAULT_OPTIONS)
+ .getResult();
+ }
+
+ private Map<String, Object> formatContextKey(Map<String, Object> context) {
+ return context.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ entry -> normalizeIdentifier(entry.getKey()), entry ->
entry.getValue()));
+ }
+
+ private String formatExpression(String expression, Map<String, Object>
context) {
+ Map<String, String> replacements =
+ context.keySet().stream()
+ .collect(
+ Collectors.toMap(key -> key, this::normalizeIdentifier, (left,
right) -> left));
+ replacements.entrySet().removeIf(entry ->
entry.getKey().equals(entry.getValue()));
+ if (replacements.isEmpty()) {
+ return expression;
+ }
+
+ String alternation =
+
replacements.keySet().stream().map(Pattern::quote).collect(Collectors.joining("|"));
+ Pattern pattern = Pattern.compile("(?<![A-Za-z0-9_])(" + alternation +
")(?![A-Za-z0-9_])");
+ Matcher matcher = pattern.matcher(expression);
+ StringBuffer buffer = new StringBuffer();
+ while (matcher.find()) {
+ String matched = matcher.group(1);
+ matcher.appendReplacement(
+ buffer, Matcher.quoteReplacement(replacements.getOrDefault(matched,
matched)));
+ }
+ matcher.appendTail(buffer);
+ return buffer.toString();
+ }
+
+ private String normalizeIdentifier(String name) {
+ return name.replace("-", "_");
+ }
+
+ private Long toLong(Object obj) {
+ if (obj instanceof Long) {
+ return (Long) obj;
+ }
+
+ if (obj instanceof Integer) {
+ return ((Integer) obj).longValue();
+ }
+
+ if (obj instanceof BigDecimal) {
+ return ((BigDecimal) obj).longValue();
+ }
+
+ if (obj instanceof Number) {
+ if (obj instanceof Double || obj instanceof Float) {
+ return Math.round(((Number) obj).doubleValue());
+ }
+ return ((Number) obj).longValue();
+ }
+
+ throw new IllegalArgumentException("Object cannot be converted to Long");
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/StatisticsUtils.java
similarity index 52%
copy from
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
copy to
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/StatisticsUtils.java
index d9965978ed..2fa683646b 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/StatisticsUtils.java
@@ -17,22 +17,28 @@
* under the License.
*/
-package org.apache.gravitino.maintenance.optimizer.api.recommender;
+package org.apache.gravitino.maintenance.optimizer.recommender.util;
-/**
- * Encapsulates the scored result and job execution context for a single
strategy evaluation. The
- * recommender ranks evaluations by {@link #score()} before asking the {@link
- * org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter
JobSubmitter} to launch
- * work.
- */
-public interface StrategyEvaluation {
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+
+public class StatisticsUtils {
- /**
- * Score used to rank multiple recommendations of the same strategy. Higher
wins; equal scores
- * preserve the evaluation order returned by the {@code StrategyHandler}.
- */
- long score();
+ private StatisticsUtils() {}
- /** Job execution context for this evaluation. */
- JobExecutionContext jobExecutionContext();
+ public static Map<String, Object> buildStatisticsContext(
+ List<StatisticEntry<?>> tableStatistics) {
+ Map<String, Object> context = new HashMap<>();
+ if (tableStatistics == null) {
+ return context;
+ }
+ for (StatisticEntry<?> statistic : tableStatistics) {
+ if (statistic != null && statistic.name() != null) {
+ context.put(statistic.name(), statistic.value().value());
+ }
+ }
+ return context;
+ }
}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/StrategyUtils.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/StrategyUtils.java
new file mode 100644
index 0000000000..93106f22ff
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/StrategyUtils.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.util;
+
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+
+/** Utility methods and rule keys for interpreting optimizer strategies. */
+public class StrategyUtils {
+
+ /** Prefix for job option keys exposed as rules. */
+ public static final String JOB_ROLE_PREFIX = "job.";
+ /** Rule key for the trigger expression. */
+ public static final String TRIGGER_EXPR = "trigger-expr";
+ /** Rule key for the score expression. */
+ public static final String SCORE_EXPR = "score-expr";
+
+ private static final String DEFAULT_TRIGGER_EXPR = "false";
+ private static final String DEFAULT_SCORE_EXPR = "-1";
+ /**
+ * Resolve the trigger expression for a strategy.
+ *
+ * @param strategy strategy definition
+ * @return trigger expression or {@code false} by default
+ */
+ public static String getTriggerExpression(Strategy strategy) {
+ Object value = strategy.rules().get(TRIGGER_EXPR);
+ if (value == null) {
+ return DEFAULT_TRIGGER_EXPR;
+ }
+ String expression = value.toString();
+ return expression.trim().isEmpty() ? DEFAULT_TRIGGER_EXPR : expression;
+ }
+
+ /**
+ * Resolve the score expression for a strategy.
+ *
+ * @param strategy strategy definition
+ * @return score expression or {@code -1} by default
+ */
+ public static String getScoreExpression(Strategy strategy) {
+ Object value = strategy.rules().get(SCORE_EXPR);
+ if (value == null) {
+ return DEFAULT_SCORE_EXPR;
+ }
+ String expression = value.toString();
+ return expression.trim().isEmpty() ? DEFAULT_SCORE_EXPR : expression;
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/conf/TestOptimizerConfig.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/conf/TestOptimizerConfig.java
index c84d6954c3..903e62d77e 100644
---
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/conf/TestOptimizerConfig.java
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/conf/TestOptimizerConfig.java
@@ -53,4 +53,20 @@ class TestOptimizerConfig {
() -> config.get(OptimizerConfig.GRAVITINO_METALAKE_CONFIG));
Assertions.assertNull(config.get(OptimizerConfig.GRAVITINO_DEFAULT_CATALOG_CONFIG));
}
+
+ @Test
+ void testJobSubmitterConfigsWithPrefix() {
+ Map<String, String> properties =
+ Map.of(
+ OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "spark.master",
+ "yarn",
+ OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "queue",
+ "default",
+ OptimizerConfig.GRAVITINO_URI,
+ "http://example.com");
+ OptimizerConfig config = new OptimizerConfig(properties);
+
+ Assertions.assertEquals(
+ Map.of("spark.master", "yarn", "queue", "default"),
config.jobSubmitterConfigs());
+ }
}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestRecommenderOrdering.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestRecommenderOrdering.java
index 6fcfcbe4fd..c49695d4b6 100644
---
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestRecommenderOrdering.java
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestRecommenderOrdering.java
@@ -22,6 +22,7 @@ package
org.apache.gravitino.maintenance.optimizer.recommender;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.PriorityQueue;
import org.apache.gravitino.NameIdentifier;
import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
@@ -55,7 +56,7 @@ class TestRecommenderOrdering {
List<JobExecutionContext> ordered = new ArrayList<>(scoreQueue.size());
while (!scoreQueue.isEmpty()) {
- ordered.add(scoreQueue.poll().jobExecutionContext());
+ ordered.add(scoreQueue.poll().jobExecutionContext().orElseThrow());
}
return ordered;
}
@@ -68,23 +69,24 @@ class TestRecommenderOrdering {
}
@Override
- public JobExecutionContext jobExecutionContext() {
- return new JobExecutionContext() {
- @Override
- public NameIdentifier nameIdentifier() {
- return identifier;
- }
+ public Optional<JobExecutionContext> jobExecutionContext() {
+ return Optional.of(
+ new JobExecutionContext() {
+ @Override
+ public NameIdentifier nameIdentifier() {
+ return identifier;
+ }
- @Override
- public Map<String, String> jobConfig() {
- return Map.of();
- }
+ @Override
+ public Map<String, String> jobOptions() {
+ return Map.of();
+ }
- @Override
- public String jobTemplateName() {
- return "template";
- }
- };
+ @Override
+ public String jobTemplateName() {
+ return "template";
+ }
+ });
}
};
}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestStrategyFiltering.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestStrategyFiltering.java
index 8479458b45..aa3e0bda6e 100644
---
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestStrategyFiltering.java
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/TestStrategyFiltering.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.maintenance.optimizer.recommender;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -68,7 +69,7 @@ class TestStrategyFiltering {
.forEach(
strategy ->
identifiersByStrategyName
- .computeIfAbsent(strategy.name(), key -> new
java.util.ArrayList<>())
+ .computeIfAbsent(strategy.name(), key -> new
ArrayList<>())
.add(identifier));
}
return identifiersByStrategyName;
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyForTest.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyForTest.java
new file mode 100644
index 0000000000..1934b0f215
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyForTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import
org.apache.gravitino.maintenance.optimizer.recommender.util.StrategyUtils;
+
+public class CompactionStrategyForTest implements Strategy {
+
+ static final String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+ @Override
+ public String name() {
+ return "compaction-policy-for-test";
+ }
+
+ @Override
+ public String strategyType() {
+ return "compaction";
+ }
+
+ @Override
+ public Map<String, Object> rules() {
+ return ImmutableMap.of(
+ "min_datafile_mse",
+ 1000,
+ StrategyUtils.JOB_ROLE_PREFIX + TARGET_FILE_SIZE_BYTES,
+ 1024,
+ StrategyUtils.TRIGGER_EXPR,
+ "datafile_mse > min_datafile_mse",
+ StrategyUtils.SCORE_EXPR,
+ "datafile_mse * delete_file_num");
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return Map.of();
+ }
+
+ @Override
+ public Map<String, String> jobOptions() {
+ return Map.of(TARGET_FILE_SIZE_BYTES, "1024");
+ }
+
+ @Override
+ public String jobTemplateName() {
+ return "compaction-template";
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/TestCompactionStrategyHandler.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/TestCompactionStrategyHandler.java
new file mode 100644
index 0000000000..5e8c545e6b
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/TestCompactionStrategyHandler.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionStrategy;
+import
org.apache.gravitino.maintenance.optimizer.api.common.PartitionStrategy.ScoreMode;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyEvaluation;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyHandlerContext;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategy;
+import
org.apache.gravitino.maintenance.optimizer.recommender.util.StrategyUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+class TestCompactionStrategyHandler {
+
+ private final Strategy strategy = new CompactionStrategyForTest();
+
+ @Test
+ void testShouldTriggerWithoutPartition() {
+ NameIdentifier tableId = NameIdentifier.of("db", "table");
+ Table tableMetadata = Mockito.mock(Table.class);
+ Mockito.when(tableMetadata.partitioning())
+ .thenReturn(new
org.apache.gravitino.rel.expressions.transforms.Transform[0]);
+
+ List<StatisticEntry<?>> stats =
+ Arrays.asList(new StatisticEntryImpl("datafile_mse",
StatisticValues.longValue(2000L)));
+ StrategyHandlerContext context =
+ StrategyHandlerContext.builder(tableId, strategy)
+ .withTableMetadata(tableMetadata)
+ .withTableStatistics(stats)
+ .build();
+ CompactionStrategyHandler handler = new CompactionStrategyHandler();
+ handler.initialize(context);
+ Assertions.assertTrue(handler.shouldTrigger());
+
+ stats = Arrays.asList(new StatisticEntryImpl("datafile_mse",
StatisticValues.longValue(10L)));
+ StrategyHandlerContext lowContext =
+ StrategyHandlerContext.builder(tableId, strategy)
+ .withTableMetadata(tableMetadata)
+ .withTableStatistics(stats)
+ .build();
+ CompactionStrategyHandler lowHandler = new CompactionStrategyHandler();
+ lowHandler.initialize(lowContext);
+ Assertions.assertFalse(lowHandler.shouldTrigger());
+ }
+
+ @Test
+ void testShouldTriggerActionWithPartitions() {
+ NameIdentifier tableId = NameIdentifier.of("db", "table");
+ Table tableMetadata = Mockito.mock(Table.class);
+ Mockito.when(tableMetadata.partitioning())
+ .thenReturn(
+ new org.apache.gravitino.rel.expressions.transforms.Transform[] {
+ Transforms.identity("p")
+ });
+
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStats =
+ Map.of(
+ PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "1"))),
+ List.of(new StatisticEntryImpl("datafile_mse",
StatisticValues.longValue(10L))),
+ PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "2"))),
+ List.of(new StatisticEntryImpl("datafile_mse",
StatisticValues.longValue(2000L))));
+
+ StrategyHandlerContext context =
+ StrategyHandlerContext.builder(tableId, strategy)
+ .withTableMetadata(tableMetadata)
+ .withTableStatistics(Arrays.asList())
+ .withPartitionStatistics(partitionStats)
+ .build();
+
+ CompactionStrategyHandler handler = new CompactionStrategyHandler();
+ handler.initialize(context);
+ Assertions.assertTrue(handler.shouldTrigger());
+
+ Map<PartitionPath, List<StatisticEntry<?>>> allLowStats =
+ Map.of(
+ PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "1"))),
+ List.of(new StatisticEntryImpl("datafile_mse",
StatisticValues.longValue(1L))),
+ PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "2"))),
+ List.of(new StatisticEntryImpl("datafile_mse",
StatisticValues.longValue(5L))));
+ StrategyHandlerContext lowContext =
+ StrategyHandlerContext.builder(tableId, strategy)
+ .withTableMetadata(tableMetadata)
+ .withTableStatistics(Arrays.asList())
+ .withPartitionStatistics(allLowStats)
+ .build();
+
+ CompactionStrategyHandler lowHandler = new CompactionStrategyHandler();
+ lowHandler.initialize(lowContext);
+ Assertions.assertFalse(lowHandler.shouldTrigger());
+ }
+
+ @Test
+ void testJobConfig() {
+ NameIdentifier tableId = NameIdentifier.of("db", "table");
+ Table tableMetadata = Mockito.mock(Table.class);
+ JobExecutionContext config =
+ new CompactionJobContext(
+ tableId,
+ strategy.jobOptions(),
+ strategy.jobTemplateName(),
+ tableMetadata.columns(),
+ tableMetadata.partitioning(),
+ List.of());
+ Assertions.assertTrue(config instanceof CompactionJobContext);
+ CompactionJobContext compactionConfig = (CompactionJobContext) config;
+ Assertions.assertEquals(tableId, compactionConfig.nameIdentifier());
+ Assertions.assertEquals(
+ ImmutableMap.of(CompactionStrategyForTest.TARGET_FILE_SIZE_BYTES,
"1024"),
+ compactionConfig.jobOptions());
+ Assertions.assertTrue(compactionConfig.getPartitions().isEmpty());
+ }
+
+ @Test
+ void testEvaluatePartitionTableScoreMode() {
+ NameIdentifier tableId = NameIdentifier.of("db", "table");
+ Table tableMetadata = Mockito.mock(Table.class);
+ Mockito.when(tableMetadata.partitioning())
+ .thenReturn(
+ new org.apache.gravitino.rel.expressions.transforms.Transform[] {
+ Transforms.identity("p")
+ });
+ Mockito.when(tableMetadata.columns()).thenReturn(new Column[0]);
+
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStats =
+ Map.of(
+ PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "1"))),
+ List.of(new StatisticEntryImpl("datafile_mse",
StatisticValues.longValue(10L))),
+ PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "2"))),
+ List.of(new StatisticEntryImpl("datafile_mse",
StatisticValues.longValue(30L))));
+
+ Assertions.assertEquals(
+ 40L, evaluatePartitionScore(tableId, tableMetadata, partitionStats,
ScoreMode.SUM, null));
+ Assertions.assertEquals(
+ 30L, evaluatePartitionScore(tableId, tableMetadata, partitionStats,
ScoreMode.MAX, null));
+ Assertions.assertEquals(
+ 20L, evaluatePartitionScore(tableId, tableMetadata, partitionStats,
null, null));
+ }
+
+ @Test
+ void testEvaluateMaxPartitionNumFromStrategy() {
+ NameIdentifier tableId = NameIdentifier.of("db", "table");
+ Table tableMetadata = Mockito.mock(Table.class);
+ Mockito.when(tableMetadata.partitioning())
+ .thenReturn(
+ new org.apache.gravitino.rel.expressions.transforms.Transform[] {
+ Transforms.identity("p")
+ });
+ Mockito.when(tableMetadata.columns()).thenReturn(new Column[0]);
+
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStats =
+ Map.of(
+ PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "1"))),
+ List.of(new StatisticEntryImpl("datafile_mse",
StatisticValues.longValue(10L))),
+ PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "2"))),
+ List.of(new StatisticEntryImpl("datafile_mse",
StatisticValues.longValue(20L))),
+ PartitionPath.of(Arrays.asList(new PartitionEntryImpl("p", "3"))),
+ List.of(new StatisticEntryImpl("datafile_mse",
StatisticValues.longValue(30L))));
+
+ Assertions.assertEquals(
+ 30L, evaluatePartitionScore(tableId, tableMetadata, partitionStats,
null, 1));
+ Assertions.assertEquals(
+ 25L, evaluatePartitionScore(tableId, tableMetadata, partitionStats,
null, 2));
+ }
+
+ @Test
+ void testTopPartitionSelectionRespectsLimitAndOrder() {
+ NameIdentifier tableId = NameIdentifier.of("db", "table");
+ Table tableMetadata = Mockito.mock(Table.class);
+ Mockito.when(tableMetadata.partitioning())
+ .thenReturn(
+ new org.apache.gravitino.rel.expressions.transforms.Transform[] {
+ Transforms.identity("p")
+ });
+ Mockito.when(tableMetadata.columns()).thenReturn(new Column[0]);
+
+ PartitionPath lowPartition = PartitionPath.of(Arrays.asList(new
PartitionEntryImpl("p", "1")));
+ PartitionPath highPartition = PartitionPath.of(Arrays.asList(new
PartitionEntryImpl("p", "2")));
+ PartitionPath midPartition = PartitionPath.of(Arrays.asList(new
PartitionEntryImpl("p", "3")));
+
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStats =
+ Map.of(
+ lowPartition,
+ List.of(new StatisticEntryImpl("datafile_mse",
StatisticValues.longValue(10L))),
+ highPartition,
+ List.of(new StatisticEntryImpl("datafile_mse",
StatisticValues.longValue(30L))),
+ midPartition,
+ List.of(new StatisticEntryImpl("datafile_mse",
StatisticValues.longValue(20L))));
+
+ StrategyHandlerContext context =
+ StrategyHandlerContext.builder(tableId, buildStrategy(null, 2))
+ .withTableMetadata(tableMetadata)
+ .withTableStatistics(List.of())
+ .withPartitionStatistics(partitionStats)
+ .build();
+
+ CompactionStrategyHandler handler = new CompactionStrategyHandler();
+ handler.initialize(context);
+ StrategyEvaluation evaluation = handler.evaluate();
+
+ CompactionJobContext jobContext =
+ (CompactionJobContext) evaluation.jobExecutionContext().orElseThrow();
+ List<PartitionPath> selected = jobContext.getPartitions();
+
+ Assertions.assertEquals(2, selected.size(), "Should return top two
partitions");
+ Assertions.assertEquals(highPartition, selected.get(0), "Highest score
should come first");
+ Assertions.assertEquals(midPartition, selected.get(1), "Second highest
score expected");
+ }
+
+ private long evaluatePartitionScore(
+ NameIdentifier tableId,
+ Table tableMetadata,
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStats,
+ ScoreMode scoreMode,
+ Integer maxPartitionNum) {
+ StrategyHandlerContext context =
+ StrategyHandlerContext.builder(tableId, buildStrategy(scoreMode,
maxPartitionNum))
+ .withTableMetadata(tableMetadata)
+ .withTableStatistics(List.of())
+ .withPartitionStatistics(partitionStats)
+ .build();
+
+ CompactionStrategyHandler handler = new CompactionStrategyHandler();
+ handler.initialize(context);
+ StrategyEvaluation evaluation = handler.evaluate();
+ Assertions.assertTrue(evaluation.jobExecutionContext().isPresent());
+ return evaluation.score();
+ }
+
+ private Strategy buildStrategy(ScoreMode scoreMode, Integer maxPartitionNum)
{
+ Map<String, Object> rules = new HashMap<>();
+ rules.put(StrategyUtils.TRIGGER_EXPR, "datafile_mse > 0");
+ rules.put(StrategyUtils.SCORE_EXPR, "datafile_mse");
+ if (scoreMode != null) {
+ rules.put(GravitinoStrategy.PARTITION_TABLE_SCORE_MODE, scoreMode);
+ }
+ if (maxPartitionNum != null) {
+ rules.put(GravitinoStrategy.MAX_PARTITION_NUM, maxPartitionNum);
+ }
+ return new PartitionStrategy() {
+ @Override
+ public String name() {
+ return "compaction-score-mode-test";
+ }
+
+ @Override
+ public String strategyType() {
+ return "compaction";
+ }
+
+ @Override
+ public Map<String, Object> rules() {
+ return rules;
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return Map.of();
+ }
+
+ @Override
+ public Map<String, String> jobOptions() {
+ return Map.of();
+ }
+
+ @Override
+ public String jobTemplateName() {
+ return "compaction-template";
+ }
+
+ @Override
+ public ScoreMode partitionTableScoreMode() {
+ Object value = rules.get(GravitinoStrategy.PARTITION_TABLE_SCORE_MODE);
+ return value instanceof ScoreMode ? (ScoreMode) value : ScoreMode.AVG;
+ }
+
+ @Override
+ public int maxPartitionNum() {
+ Object value = rules.get(GravitinoStrategy.MAX_PARTITION_NUM);
+ if (value == null) {
+ return 100;
+ }
+ if (value instanceof Number) {
+ int parsed = ((Number) value).intValue();
+ return parsed > 0 ? parsed : 100;
+ }
+ try {
+ int parsed = Integer.parseInt(value.toString());
+ return parsed > 0 ? parsed : 100;
+ } catch (NumberFormatException e) {
+ return 100;
+ }
+ }
+ };
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
new file mode 100644
index 0000000000..83062a773b
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.dto.rel.ColumnDTO;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionJobContext;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+
+// Requires a running Gravitino server and Spark environment; enable with
GRAVITINO_ENV_IT=true.
+@EnabledIfEnvironmentVariable(named = "GRAVITINO_ENV_IT", matches = "true")
+public class TestBuiltinIcebergRewriteDataFiles {
+
+ private static final String SERVER_URI = "http://localhost:8090";
+ private static final String METALAKE_NAME = "test";
+ private static final String ICEBERG_REST_URI =
"http://localhost:9001/iceberg";
+ private static final String JOB_TEMPLATE_NAME =
"builtin-iceberg-rewrite-data-files";
+ private static final String SPARK_CATALOG_NAME = "rest_catalog";
+ private static final String WAREHOUSE_LOCATION = "";
+
+ @Test
+ void testSubmitRewriteDataFilesJobByConfig() throws Exception {
+ String tableName = "rewrite_table1";
+ String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName;
+ runWithSparkAndMetalake(
+ (spark, metalake) -> {
+ createTableAndInsertData(spark, fullTableName);
+ Map<String, String> jobConf = buildManualJobConfig(tableName);
+ submitCompactionJob(metalake, jobConf);
+ });
+ }
+
+ @Test
+ void
testSubmitBuiltinIcebergRewriteDataFilesJobFromAdapterAndOptimizerConfig()
throws Exception {
+ String tableName = "rewrite_table2";
+ String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName;
+ runWithSparkAndMetalake(
+ (spark, metalake) -> {
+ createTableAndInsertData(spark, fullTableName);
+ OptimizerConfig optimizerConfig = createOptimizerConfig();
+ Map<String, String> jobOptions =
+ Map.of("min-input-files", "1", "target-file-size-bytes",
"1048576");
+ Map<String, String> jobConf =
+ buildCompactionJobConfig(
+ optimizerConfig,
+ tableName,
+ jobOptions,
+ new Column[0],
+ new Transform[0],
+ Collections.emptyList());
+ submitCompactionJob(metalake, jobConf);
+ });
+ }
+
+ @Test
+ void testCompactNonPartitionTable() throws Exception {
+ String tableName = "rewrite_non_partition_table";
+ String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName;
+
+ runWithSparkAndMetalake(
+ (spark, metalake) -> {
+ createTableAndInsertData(spark, fullTableName);
+
+ long beforeFiles = countDataFiles(spark, fullTableName);
+ Assertions.assertTrue(beforeFiles > 1, "Expected multiple data files
before compaction");
+
+ OptimizerConfig optimizerConfig = createOptimizerConfig();
+ Map<String, String> jobOptions = Map.of("min-input-files", "1");
+ Map<String, String> jobConf =
+ buildCompactionJobConfig(
+ optimizerConfig,
+ tableName,
+ jobOptions,
+ new Column[0],
+ new Transform[0],
+ Collections.emptyList());
+ submitCompactionJob(metalake, jobConf);
+
+ refreshTable(spark, fullTableName);
+ long afterFiles = countDataFiles(spark, fullTableName);
+ Assertions.assertTrue(
+ afterFiles < beforeFiles,
+ String.format(
+ "Expected fewer data files after compaction after:%d,
before:%d",
+ afterFiles, beforeFiles));
+ });
+ }
+
+ @Test
+ void testCompactPartitionTable() throws Exception {
+ String tableName = "rewrite_partition_table";
+ String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName;
+
+ runWithSparkAndMetalake(
+ (spark, metalake) -> {
+ createPartitionTableAndInsertData(spark, fullTableName);
+
+ Map<String, Long> beforeCounts = countDataFilesByPartition(spark,
fullTableName);
+ Map<String, Long> beforeMaxSizes = maxFileSizeByPartition(spark,
fullTableName);
+ String targetPartition1 = partitionKey(2024, "1");
+ String targetPartition2 = partitionKey(2024, "2");
+ String otherPartition1 = partitionKey(2025, "1");
+ String otherPartition2 = partitionKey(2025, "2");
+
+ Assertions.assertTrue(
+ beforeCounts.getOrDefault(targetPartition1, 0L) > 1,
+ "Expected multiple data files in " + targetPartition1 + " before
compaction");
+ Assertions.assertTrue(
+ beforeCounts.getOrDefault(targetPartition2, 0L) > 1,
+ "Expected multiple data files in " + targetPartition2 + " before
compaction");
+ Assertions.assertTrue(
+ beforeCounts.getOrDefault(otherPartition1, 0L) > 1,
+ "Expected multiple data files in " + otherPartition1 + " before
compaction");
+ Assertions.assertTrue(
+ beforeCounts.getOrDefault(otherPartition2, 0L) > 1,
+ "Expected multiple data files in " + otherPartition2 + " before
compaction");
+
+ OptimizerConfig optimizerConfig = createOptimizerConfig();
+
+ List<PartitionPath> partitions =
+ Arrays.asList(
+ PartitionPath.of(
+ Arrays.asList(
+ new PartitionEntryImpl("year", "2024"),
+ new PartitionEntryImpl("month", "1"))),
+ PartitionPath.of(
+ Arrays.asList(
+ new PartitionEntryImpl("year", "2024"),
+ new PartitionEntryImpl("month", "2"))));
+ Column[] columns =
+ new Column[] {
+ column("year", Types.IntegerType.get()), column("month",
Types.StringType.get())
+ };
+ Transform[] partitioning =
+ new Transform[] {Transforms.identity("year"),
Transforms.identity("month")};
+
+ Map<String, String> jobOptions = Map.of("min-input-files", "1");
+ Map<String, String> jobConf =
+ buildCompactionJobConfig(
+ optimizerConfig, tableName, jobOptions, columns,
partitioning, partitions);
+ submitCompactionJob(metalake, jobConf);
+
+ refreshTable(spark, fullTableName);
+ Map<String, Long> afterCounts = countDataFilesByPartition(spark,
fullTableName);
+ Map<String, Long> afterMaxSizes = maxFileSizeByPartition(spark,
fullTableName);
+ Assertions.assertTrue(
+ afterMaxSizes.getOrDefault(targetPartition1, 0L)
+ > beforeMaxSizes.getOrDefault(targetPartition1, 0L),
+ "Expected larger data files after compaction for " +
targetPartition1);
+ Assertions.assertTrue(
+ afterMaxSizes.getOrDefault(targetPartition2, 0L)
+ > beforeMaxSizes.getOrDefault(targetPartition2, 0L),
+ "Expected larger data files after compaction for " +
targetPartition2);
+ Assertions.assertEquals(
+ beforeCounts.getOrDefault(otherPartition1, 0L),
+ afterCounts.getOrDefault(otherPartition1, 0L),
+ "Expected no compaction for " + otherPartition1);
+ Assertions.assertEquals(
+ beforeCounts.getOrDefault(otherPartition2, 0L),
+ afterCounts.getOrDefault(otherPartition2, 0L),
+ "Expected no compaction for " + otherPartition2);
+ Assertions.assertEquals(
+ beforeMaxSizes.getOrDefault(otherPartition1, 0L),
+ afterMaxSizes.getOrDefault(otherPartition1, 0L),
+ "Expected no size changes for " + otherPartition1);
+ Assertions.assertEquals(
+ beforeMaxSizes.getOrDefault(otherPartition2, 0L),
+ afterMaxSizes.getOrDefault(otherPartition2, 0L),
+ "Expected no size changes for " + otherPartition2);
+ });
+ }
+
+ private static GravitinoMetalake loadOrCreateMetalake(
+ GravitinoAdminClient client, String metalakeName) {
+ try {
+ return client.loadMetalake(metalakeName);
+ } catch (NoSuchMetalakeException ignored) {
+ return client.createMetalake(metalakeName, "IT metalake", Map.of());
+ }
+ }
+
+ @FunctionalInterface
+ private interface SparkMetalakeConsumer {
+ void accept(SparkSession spark, GravitinoMetalake metalake) throws
Exception;
+ }
+
+ private static void runWithSparkAndMetalake(SparkMetalakeConsumer consumer)
throws Exception {
+ SparkSession spark = createSparkSession();
+ try (GravitinoAdminClient client =
GravitinoAdminClient.builder(SERVER_URI).build()) {
+ GravitinoMetalake metalake = loadOrCreateMetalake(client, METALAKE_NAME);
+ consumer.accept(spark, metalake);
+ } finally {
+ spark.stop();
+ }
+ }
+
+ private static void submitCompactionJob(GravitinoMetalake metalake,
Map<String, String> jobConf) {
+ JobHandle jobHandle = metalake.runJob(JOB_TEMPLATE_NAME, jobConf);
+ System.out.println("Submitted job id: " + jobHandle.jobId());
+ Assertions.assertTrue(StringUtils.isNotBlank(jobHandle.jobId()), "Job id
should not be blank");
+
+ Awaitility.await()
+ .atMost(Duration.ofMinutes(5))
+ .pollInterval(Duration.ofSeconds(2))
+ .until(
+ () -> {
+ JobHandle.Status status =
metalake.getJob(jobHandle.jobId()).jobStatus();
+ return status == JobHandle.Status.SUCCEEDED
+ || status == JobHandle.Status.FAILED
+ || status == JobHandle.Status.CANCELLED;
+ });
+
+ JobHandle.Status finalStatus =
metalake.getJob(jobHandle.jobId()).jobStatus();
+ Assertions.assertEquals(JobHandle.Status.SUCCEEDED, finalStatus, "Job
should succeed");
+ }
+
+ private static Map<String, String> buildManualJobConfig(String tableName) {
+ Map<String, String> jobConf = new HashMap<>();
+ jobConf.put("table_identifier", "db." + tableName);
+ jobConf.put("where_clause", "");
+ jobConf.put("sort_order", "");
+ jobConf.put("strategy", "binpack");
+ jobConf.put("options", "{\"min-input-files\":\"1\"}");
+ jobConf.putAll(createOptimizerConfig().jobSubmitterConfigs());
+ return jobConf;
+ }
+
+ private static Map<String, String> buildCompactionJobConfig(
+ OptimizerConfig optimizerConfig,
+ String tableName,
+ Map<String, String> jobOptions,
+ Column[] columns,
+ Transform[] partitioning,
+ List<PartitionPath> partitions) {
+ CompactionJobContext jobContext =
+ new CompactionJobContext(
+ NameIdentifier.of(SPARK_CATALOG_NAME, "db", tableName),
+ jobOptions,
+ JOB_TEMPLATE_NAME,
+ columns,
+ partitioning,
+ partitions);
+ GravitinoCompactionJobAdapter adapter = new
GravitinoCompactionJobAdapter();
+ return GravitinoJobSubmitter.buildJobConfig(optimizerConfig, jobContext,
adapter);
+ }
+
+ private static long countDataFiles(SparkSession spark, String fullTableName)
{
+ Row row =
+ spark
+ .sql(
+ "SELECT count(*) AS data_file_cnt FROM "
+ + fullTableName
+ + ".files WHERE content = 0")
+ .first();
+ return row.getLong(0);
+ }
+
+ private static void refreshTable(SparkSession spark, String fullTableName) {
+ spark.sql("REFRESH TABLE " + fullTableName);
+ }
+
+ private static Map<String, Long> countDataFilesByPartition(
+ SparkSession spark, String fullTableName) {
+ List<Row> rows =
+ spark
+ .sql(
+ "SELECT partition.year AS year, partition.month AS month,
COUNT(*) AS data_file_cnt "
+ + "FROM "
+ + fullTableName
+ + ".files WHERE content = 0 "
+ + "GROUP BY partition.year, partition.month")
+ .collectAsList();
+ Map<String, Long> counts = new HashMap<>();
+ for (Row row : rows) {
+ int year = row.getInt(0);
+ String month = row.getString(1);
+ long count = row.getLong(2);
+ counts.put(partitionKey(year, month), count);
+ }
+ return counts;
+ }
+
+ private static Map<String, Long> maxFileSizeByPartition(
+ SparkSession spark, String fullTableName) {
+ List<Row> rows =
+ spark
+ .sql(
+ "SELECT partition.year AS year, partition.month AS month, "
+ + "MAX(file_size_in_bytes) AS max_size "
+ + "FROM "
+ + fullTableName
+ + ".files WHERE content = 0 "
+ + "GROUP BY partition.year, partition.month")
+ .collectAsList();
+ Map<String, Long> sizes = new HashMap<>();
+ for (Row row : rows) {
+ int year = row.getInt(0);
+ String month = row.getString(1);
+ long maxSize = row.getLong(2);
+ sizes.put(partitionKey(year, month), maxSize);
+ }
+ return sizes;
+ }
+
+ private static String partitionKey(int year, String month) {
+ return "year=" + year + ",month=" + month;
+ }
+
+ private static Column column(String name,
org.apache.gravitino.rel.types.Type type) {
+ return ColumnDTO.builder().withName(name).withDataType(type).build();
+ }
+
+ private static OptimizerConfig createOptimizerConfig() {
+ Map<String, String> optimizerConfigProps = new HashMap<>();
+ optimizerConfigProps.put(OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX +
"catalog_type", "rest");
+ optimizerConfigProps.put(
+ OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "catalog_uri",
ICEBERG_REST_URI);
+ optimizerConfigProps.put(
+ OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "warehouse_location",
WAREHOUSE_LOCATION);
+ optimizerConfigProps.put(
+ OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "catalog_name",
SPARK_CATALOG_NAME);
+ optimizerConfigProps.put(
+ OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "spark_master",
"local[2]");
+ optimizerConfigProps.put(
+ OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX +
"spark_executor_instances", "1");
+ optimizerConfigProps.put(
+ OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "spark_executor_cores",
"1");
+ optimizerConfigProps.put(
+ OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "spark_executor_memory",
"1g");
+ optimizerConfigProps.put(
+ OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "spark_driver_memory",
"1g");
+ optimizerConfigProps.put(OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX +
"spark_conf", "{}");
+ return new OptimizerConfig(optimizerConfigProps);
+ }
+
+ private static SparkSession createSparkSession() {
+ return SparkSession.builder()
+ .master("local[2]")
+ .appName("builtin-iceberg-rewrite-data-files-it")
+ .config(
+ "spark.sql.extensions",
+
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+ .config("spark.sql.catalog." + SPARK_CATALOG_NAME,
"org.apache.iceberg.spark.SparkCatalog")
+ .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".type", "rest")
+ .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".cache-enabled",
"false")
+ .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".uri",
ICEBERG_REST_URI)
+ .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".warehouse",
WAREHOUSE_LOCATION)
+ .getOrCreate();
+ }
+
+ private static void createPartitionTableAndInsertData(SparkSession spark,
String fullTableName) {
+ spark.sql("CREATE NAMESPACE IF NOT EXISTS " + SPARK_CATALOG_NAME + ".db");
+ spark.sql("DROP TABLE IF EXISTS " + fullTableName);
+ spark.sql(
+ "CREATE TABLE "
+ + fullTableName
+ + " (id INT, data STRING, year INT, month STRING) USING iceberg "
+ + "PARTITIONED BY (year, month)");
+ spark.sql(
+ "ALTER TABLE "
+ + fullTableName
+ + " SET TBLPROPERTIES ('write.target-file-size-bytes'='1024000')");
+ int id = 0;
+ int[] years = new int[] {2024, 2025};
+ String[] months = new String[] {"1", "2"};
+ int rowsPerPartition = 20;
+ for (int year : years) {
+ for (String month : months) {
+ for (int i = 0; i < rowsPerPartition; i++) {
+ spark.sql(
+ "INSERT INTO "
+ + fullTableName
+ + " VALUES ("
+ + id
+ + ", 'value_"
+ + id
+ + "', "
+ + year
+ + ", '"
+ + month
+ + "'"
+ + ")");
+ id++;
+ }
+ }
+ }
+ }
+
+ private static void createTableAndInsertData(SparkSession spark, String
fullTableName) {
+ spark.sql("CREATE NAMESPACE IF NOT EXISTS " + SPARK_CATALOG_NAME + ".db");
+ spark.sql("DROP TABLE IF EXISTS " + fullTableName);
+ spark.sql("CREATE TABLE " + fullTableName + " (id INT, data STRING) USING
iceberg");
+ spark.sql(
+ "ALTER TABLE "
+ + fullTableName
+ + " SET TBLPROPERTIES ('write.target-file-size-bytes'='1024000')");
+ for (int i = 0; i < 10; i++) {
+ spark.sql("INSERT INTO " + fullTableName + " VALUES (" + i + ", 'value_"
+ i + "')");
+ }
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestGravitinoCompactionJobAdapter.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestGravitinoCompactionJobAdapter.java
new file mode 100644
index 0000000000..500138a7fb
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestGravitinoCompactionJobAdapter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionJobContext;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestGravitinoCompactionJobAdapter {
+
+ @Test
+ public void testJobTemplateName() {
+ GravitinoCompactionJobAdapter jobAdapter = new
GravitinoCompactionJobAdapter();
+ Assertions.assertEquals(
+ Map.of(
+ "table_identifier", "db.table",
+ "where_clause", "",
+ "sort_order", "",
+ "strategy", "binpack",
+ "options", "{\"target_file_size_bytes\":\"1073741824\"}"),
+ jobAdapter.jobConfig(mockCompactionJobContext()));
+ }
+
+ private CompactionJobContext mockCompactionJobContext() {
+ String jobTemplateName = "compaction-job-template";
+ Column[] columns = new Column[0];
+ Transform[] partitioning = new Transform[0];
+ Map<String, String> jobOptions = Map.of("target_file_size_bytes",
"1073741824");
+ return new CompactionJobContext(
+ NameIdentifier.of("catalog", "db", "table"),
+ jobOptions,
+ jobTemplateName,
+ columns,
+ partitioning,
+ List.of());
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestGravitinoJobSubmitter.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestGravitinoJobSubmitter.java
new file mode 100644
index 0000000000..621f41f7d5
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestGravitinoJobSubmitter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionStrategyHandler;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestGravitinoJobSubmitter {
+ @Test
+ void loadJobAdapterReturnsCompactionAdapter() {
+ GravitinoJobSubmitter submitter = new GravitinoJobSubmitter();
+ GravitinoJobAdapter adapter =
submitter.loadJobAdapter(CompactionStrategyHandler.NAME);
+ Assertions.assertTrue(adapter instanceof GravitinoCompactionJobAdapter);
+ }
+
+ @Test
+ void loadJobAdapterFallsBackToConfiguredClassName() {
+ String jobTemplateName = "custom";
+ OptimizerConfig config =
+ new OptimizerConfig(
+ Map.of(
+ OptimizerConfig.GRAVITINO_URI,
+ "http://localhost:8090",
+ OptimizerConfig.GRAVITINO_METALAKE,
+ "test-metalake",
+ OptimizerConfig.JOB_ADAPTER_PREFIX + jobTemplateName +
".className",
+ GravitinoCompactionJobAdapter.class.getName()));
+ GravitinoJobSubmitter submitter = new GravitinoJobSubmitter();
+ submitter.initialize(new OptimizerEnv(config));
+
+ GravitinoJobAdapter adapter = submitter.loadJobAdapter(jobTemplateName);
+ Assertions.assertTrue(adapter instanceof GravitinoCompactionJobAdapter);
+ }
+
+ @Test
+ void buildJobConfigMergesWithExpectedPrecedence() {
+ OptimizerConfig config =
+ new OptimizerConfig(
+ Map.of(
+ OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "custom",
"optimizer",
+ OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "override",
"optimizer"));
+ GravitinoJobSubmitter submitter = new GravitinoJobSubmitter();
+ submitter.initialize(new OptimizerEnv(config));
+
+ JobExecutionContext context =
+ new JobExecutionContext() {
+ @Override
+ public NameIdentifier nameIdentifier() {
+ return NameIdentifier.of("db", "table");
+ }
+
+ @Override
+ public Map<String, String> jobOptions() {
+ return Map.of("context", "context", "override", "context");
+ }
+
+ @Override
+ public String jobTemplateName() {
+ return "compaction";
+ }
+ };
+
+ GravitinoJobAdapter adapter =
+ jobExecutionContext ->
+ Map.of("table", "db.table", "options", "map('k','v')", "override",
"adapter");
+
+ Map<String, String> merged = GravitinoJobSubmitter.buildJobConfig(config,
context, adapter);
+
+ Assertions.assertEquals("optimizer", merged.get("custom"));
+ Assertions.assertEquals("adapter", merged.get("override"));
+ Assertions.assertEquals("db.table", merged.get("table"));
+ Assertions.assertEquals("map('k','v')", merged.get("options"));
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestPartitionUtils.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestPartitionUtils.java
new file mode 100644
index 0000000000..6ff4859513
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestPartitionUtils.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.gravitino.dto.rel.ColumnDTO;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestPartitionUtils {
+
+ private static Column column(String name,
org.apache.gravitino.rel.types.Type type) {
+ return ColumnDTO.builder().withName(name).withDataType(type).build();
+ }
+
+ @Test
+ void testIdentityStringPartition() {
+ List<PartitionEntry> partitions = Arrays.asList(new
PartitionEntryImpl("colA", "O'Hara"));
+ Column[] columns = new Column[] {column("colA", Types.StringType.get())};
+ Transform[] transforms = new Transform[] {Transforms.identity("colA")};
+
+ String where = PartitionUtils.getWhereClauseForPartition(partitions,
columns, transforms);
+
+ Assertions.assertEquals("colA = \"O'Hara\"", where);
+ }
+
+ @Test
+ void testIdentityStringPartitionEscapesQuotesAndBackslashes() {
+ List<PartitionEntry> partitions = Arrays.asList(new
PartitionEntryImpl("colA", "a\"b\\c"));
+ Column[] columns = new Column[] {column("colA", Types.StringType.get())};
+ Transform[] transforms = new Transform[] {Transforms.identity("colA")};
+
+ String where = PartitionUtils.getWhereClauseForPartition(partitions,
columns, transforms);
+
+ Assertions.assertEquals("colA = \"a\\\"b\\\\c\"", where);
+ }
+
+ @Test
+ void testIdentityTimestampPartition() {
+ List<PartitionEntry> partitions =
+ Arrays.asList(new PartitionEntryImpl("ts", "2024-01-01 00:00:00"));
+ Column[] columns = new Column[] {column("ts",
Types.TimestampType.withTimeZone())};
+ Transform[] transforms = new Transform[] {Transforms.identity("ts")};
+
+ String where = PartitionUtils.getWhereClauseForPartition(partitions,
columns, transforms);
+
+ Assertions.assertEquals("ts = TIMESTAMP '2024-01-01 00:00:00'", where);
+ }
+
+ @Test
+ void testBucketAndIdentityPartition() {
+ List<PartitionEntry> partitions =
+ Arrays.asList(new PartitionEntryImpl("colA", "abc"), new
PartitionEntryImpl("bucket", "1"));
+ Column[] columns =
+ new Column[] {
+ column("colA", Types.StringType.get()), column("colB",
Types.IntegerType.get())
+ };
+ Transform[] transforms =
+ new Transform[] {Transforms.identity("colA"), Transforms.bucket(2, new
String[] {"colB"})};
+
+ String where = PartitionUtils.getWhereClauseForPartition(partitions,
columns, transforms);
+
+ Assertions.assertEquals("colA = \"abc\" AND bucket(colB, 2) = 1", where);
+ }
+
+ @Test
+ void testTruncateAndYearPartition() {
+ List<PartitionEntry> partitions =
+ Arrays.asList(
+ new PartitionEntryImpl("truncate", "prefix"), new
PartitionEntryImpl("year", "2024"));
+ Column[] columns =
+ new Column[] {
+ column("colC", Types.StringType.get()),
+ column("colD", Types.TimestampType.withoutTimeZone())
+ };
+ Transform[] transforms =
+ new Transform[] {Transforms.truncate(5, "colC"),
Transforms.year("colD")};
+
+ String where = PartitionUtils.getWhereClauseForPartition(partitions,
columns, transforms);
+
+ Assertions.assertEquals("truncate(colC, 5) = \"prefix\" AND year(colD) =
2024", where);
+ }
+
+ @Test
+ void
testWhereClauseCombinesMultiplePartitionsWithParenthesesForStringColumns() {
+ Column[] columns =
+ new Column[] {column("a", Types.StringType.get()), column("b",
Types.StringType.get())};
+
+ List<PartitionEntry> p1 =
+ Arrays.asList(new PartitionEntryImpl("p1", "x1"), new
PartitionEntryImpl("p1", "y1"));
+ List<PartitionEntry> p2 =
+ Arrays.asList(new PartitionEntryImpl("p2", "x2"), new
PartitionEntryImpl("p2", "y2"));
+ List<PartitionPath> partitions = Arrays.asList(PartitionPath.of(p1),
PartitionPath.of(p2));
+
+ Transform[] partitioning = new Transform[] {Transforms.identity("a"),
Transforms.identity("b")};
+
+ String where = PartitionUtils.getWhereClauseForPartitions(partitions,
columns, partitioning);
+
+ Assertions.assertEquals("(a = \"x1\" AND b = \"y1\") OR (a = \"x2\" AND b
= \"y2\")", where);
+ }
+
+ @Test
+ void testWhereClauseEmitsNumericLiteralsWithoutQuotes() {
+ Column[] columns =
+ new Column[] {column("id", Types.LongType.get()), column("score",
Types.DoubleType.get())};
+
+ List<PartitionEntry> p1 =
+ Arrays.asList(new PartitionEntryImpl("p1", "123"), new
PartitionEntryImpl("p1", "45.6"));
+ List<PartitionEntry> p2 =
+ Arrays.asList(new PartitionEntryImpl("p2", "456"), new
PartitionEntryImpl("p2", "78.9"));
+ List<PartitionPath> partitions = Arrays.asList(PartitionPath.of(p1),
PartitionPath.of(p2));
+
+ Transform[] partitioning =
+ new Transform[] {Transforms.identity("id"),
Transforms.identity("score")};
+
+ String where = PartitionUtils.getWhereClauseForPartitions(partitions,
columns, partitioning);
+
+ Assertions.assertEquals("(id = 123 AND score = 45.6) OR (id = 456 AND
score = 78.9)", where);
+ }
+
+ @Test
+ void testIdentityOnDateColumnIsFormattedAsDateLiteral() {
+ Column[] columns = new Column[] {column("dt", Types.DateType.get())};
+
+ List<PartitionEntry> p1 = Arrays.asList(new PartitionEntryImpl("p",
"2024-01-01"));
+ List<PartitionPath> partitions = Arrays.asList(PartitionPath.of(p1));
+
+ Transform[] partitioning = new Transform[] {Transforms.identity("dt")};
+
+ String where = PartitionUtils.getWhereClauseForPartitions(partitions,
columns, partitioning);
+
+ Assertions.assertEquals("(dt = DATE '2024-01-01')", where);
+ }
+
+ @Test
+ void testWhereClauseWithMixedTransformsAndMultiplePartitions() {
+ Column[] columns =
+ new Column[] {
+ column("event.ts", Types.TimestampType.withoutTimeZone()),
+ column("country", Types.StringType.get()),
+ column("user_id", Types.LongType.get()),
+ column("flag", Types.BooleanType.get())
+ };
+
+ Transform[] partitioning =
+ new Transform[] {
+ Transforms.hour(new String[] {"event", "ts"}),
+ Transforms.truncate(3, "country"),
+ Transforms.bucket(16, new String[] {"user_id"}),
+ Transforms.identity("flag")
+ };
+
+ List<PartitionEntry> p1 =
+ Arrays.asList(
+ new PartitionEntryImpl("p", "8"),
+ new PartitionEntryImpl("p", "usa"),
+ new PartitionEntryImpl("p", "5"),
+ new PartitionEntryImpl("p", "false"));
+ List<PartitionEntry> p2 =
+ Arrays.asList(
+ new PartitionEntryImpl("p", "12"),
+ new PartitionEntryImpl("p", "can"),
+ new PartitionEntryImpl("p", "9"),
+ new PartitionEntryImpl("p", "true"));
+ List<PartitionPath> partitions = Arrays.asList(PartitionPath.of(p1),
PartitionPath.of(p2));
+
+ String where = PartitionUtils.getWhereClauseForPartitions(partitions,
columns, partitioning);
+
+ Assertions.assertEquals(
+ "(hour(event.ts) = 8 AND truncate(country, 3) = \"usa\" AND
bucket(user_id, 16) = 5 AND flag = false) "
+ + "OR (hour(event.ts) = 12 AND truncate(country, 3) = \"can\" AND
bucket(user_id, 16) = 9 AND flag = true)",
+ where);
+ }
+
+ @Test
+ void testNullPartitionsThrowsIllegalArgumentException() {
+ Column[] columns = new Column[] {column("a", Types.StringType.get())};
+ Transform[] partitioning = new Transform[] {Transforms.identity("a")};
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> PartitionUtils.getWhereClauseForPartitions(null, columns,
partitioning));
+ }
+
+ @Test
+ void testEmptyColumnsThrowsIllegalArgumentException() {
+ List<PartitionEntry> p1 = Arrays.asList(new PartitionEntryImpl("p", "v"));
+ List<PartitionPath> partitions = Arrays.asList(PartitionPath.of(p1));
+ Column[] columns = new Column[0];
+ Transform[] partitioning = new Transform[] {Transforms.identity("p")};
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> PartitionUtils.getWhereClauseForPartitions(partitions, columns,
partitioning));
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/util/TestQLExpressionEvaluator.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/util/TestQLExpressionEvaluator.java
new file mode 100644
index 0000000000..cf372fcd38
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/util/TestQLExpressionEvaluator.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestQLExpressionEvaluator {
+
+ private final QLExpressionEvaluator evaluator = new QLExpressionEvaluator();
+
+ @Test
+ void testEvaluateLongWithValidExpression() {
+ Map<String, Object> context = new HashMap<>();
+ context.put("a", 10);
+ context.put("b", 20);
+
+ long result = evaluator.evaluateLong("a + b", context);
+ assertEquals(30L, result);
+ }
+
+ @Test
+ void testEvaluateLongWithDecimalResult() {
+ Map<String, Object> context = new HashMap<>();
+ context.put("a", 10);
+ context.put("b", 3);
+
+ long result = evaluator.evaluateLong("a / b", context);
+ assertEquals(3L, result); // Should truncate decimal part
+ }
+
+ @Test
+ void testEvaluateBoolWithTrueCondition() {
+ Map<String, Object> context = new HashMap<>();
+ context.put("x", 5);
+ context.put("y", 10);
+
+ boolean result = evaluator.evaluateBool("x < y", context);
+ assertTrue(result);
+ }
+
+ @Test
+ void testEvaluateBoolWithFalseCondition() {
+ Map<String, Object> context = new HashMap<>();
+ context.put("x", 15);
+ context.put("y", 10);
+
+ boolean result = evaluator.evaluateBool("x < y", context);
+ Assertions.assertFalse(result);
+ }
+
+ @Test
+ void testEvaluateWithMissingVariable() {
+ Map<String, Object> context = new HashMap<>();
+ context.put("a", 10);
+
+ assertThrows(
+ RuntimeException.class,
+ () -> {
+ evaluator.evaluateLong("a + b", context);
+ });
+ }
+
+ @Test
+ void testEvaluateWithInvalidExpression() {
+ Map<String, Object> context = new HashMap<>();
+
+ assertThrows(
+ RuntimeException.class,
+ () -> {
+ evaluator.evaluateLong("invalid expression", context);
+ });
+ }
+
+ @Test
+ void testEvaluateWithConstantExpression() {
+ Assertions.assertThrowsExactly(
+ IllegalArgumentException.class, () -> evaluator.evaluateLong("1 + 1",
null));
+ }
+
+ @Test
+ void testEvaluateWithNullExpression() {
+ Map<String, Object> context = new HashMap<>();
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ evaluator.evaluateLong(null, context);
+ });
+ }
+
+ @Test
+ void testEvaluateWithDifferentVariableTypes() {
+ Map<String, Object> context = new HashMap<>();
+ context.put("intVal", 10);
+ context.put("doubleVal", 5.5);
+ context.put("stringVal", "hello");
+ context.put("boolVal", true);
+
+ // Test numeric operations
+ long numericResult = evaluator.evaluateLong("intVal + doubleVal", context);
+ assertEquals(16L, numericResult); // 10 + 5.5 = 15.5 -> rounded to 16
+
+ // Test boolean operations
+ boolean boolResult = evaluator.evaluateBool("boolVal && (intVal > 5)",
context);
+ assertTrue(boolResult);
+ }
+
+ @Test
+ void testHyphenatedIdentifiersDoNotBreakSubtraction() {
+ Map<String, Object> context = new HashMap<>();
+ context.put("metric-1", 10);
+ context.put("metric2", 4);
+
+ long result = evaluator.evaluateLong("metric-1 - metric2", context);
+ assertEquals(6L, result);
+ }
+
+ @Test
+ void testNegativeLiteralPreserved() {
+ Map<String, Object> context = new HashMap<>();
+ context.put("x", 2);
+
+ long result = evaluator.evaluateLong("x + -1", context);
+ assertEquals(1L, result);
+ }
+
+ @Test
+ void testHyphenatedIdentifierNotMatchedInsideLargerToken() {
+ Map<String, Object> context = new HashMap<>();
+ context.put("metric-1", 5);
+ context.put("metric-1-extra", 7);
+
+ long result = evaluator.evaluateLong("metric-1 + metric-1-extra", context);
+ assertEquals(12L, result);
+ }
+
+ @Test
+ void testHyphenatedIdentifierNextToDotIsNotRewritten() {
+ Map<String, Object> context = new HashMap<>();
+ context.put("metric-1", 3);
+ context.put("a", 2);
+
+ long result = evaluator.evaluateLong("a + metric-1", context);
+ assertEquals(5L, result);
+ }
+}