This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new ab713b151 [AMORO-3775] Add support for metric-based refresh event 
trigger in TableRuntimeRefreshExecutor (#3776)
ab713b151 is described below

commit ab713b1513cb98ea325a3ef3b5ff67dce4a89e85
Author: Jzjsnow <[email protected]>
AuthorDate: Mon Dec 22 16:46:44 2025 +0800

    [AMORO-3775] Add support for metric-based refresh event trigger in 
TableRuntimeRefreshExecutor (#3776)
    
    [AMORO-3775] Add support for metadata-based refresh event in 
TableRuntimeRefreshExecutor
---
 .../inline/TableRuntimeRefreshExecutor.java        |  15 +-
 .../amoro/server/table/DefaultTableRuntime.java    |   1 +
 .../amoro/server/table/TableConfigurations.java    |  12 +-
 .../amoro/server/utils/IcebergTableUtil.java       |  38 +-
 .../plan/TestHiveKeyedPartitionPlan.java           |   3 +-
 .../plan/TestHiveUnkeyedPartitionPlan.java         |   3 +-
 .../optimizing/plan/TestIcebergPartitionPlan.java  |   3 +-
 .../optimizing/plan/TestKeyedPartitionPlan.java    |   3 +-
 .../optimizing/plan/TestUnkeyedPartitionPlan.java  |   3 +-
 .../org/apache/amoro/config/OptimizingConfig.java  |  38 +-
 .../evaluation/MetadataBasedEvaluationEvent.java   | 103 ++++
 .../MixedAndIcebergTableStatsProvider.java         |  62 +++
 .../optimizing/evaluation/TableStatsProvider.java  |  46 ++
 .../plan/AbstractOptimizingEvaluator.java          |   5 +-
 .../optimizing/plan/AbstractOptimizingPlanner.java |   6 +-
 .../optimizing/plan/AbstractPartitionPlan.java     |  13 +-
 .../optimizing/plan/CommonPartitionEvaluator.java  |  57 ++-
 .../optimizing/plan/IcebergOptimizerEvaluator.java |   9 +-
 .../optimizing/plan/IcebergOptimizingPlanner.java  |   9 +-
 .../optimizing/plan/IcebergPartitionPlan.java      |   6 +-
 .../plan/MixedIcebergOptimizingEvaluator.java      |   9 +-
 .../plan/MixedIcebergOptimizingPlanner.java        |   9 +-
 .../optimizing/plan/MixedIcebergPartitionPlan.java |  20 +-
 .../org/apache/amoro/table/TableProperties.java    |  13 +
 .../TestMetadataBasedEvaluationEvent.java          | 549 +++++++++++++++++++++
 .../plan/MixedHiveOptimizingEvaluator.java         |   9 +-
 .../plan/MixedHiveOptimizingPlanner.java           |   9 +-
 .../optimizing/plan/MixedHivePartitionPlan.java    |  15 +-
 28 files changed, 1017 insertions(+), 51 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
index bbc63125b..18071a2cd 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
@@ -20,7 +20,9 @@ package org.apache.amoro.server.scheduler.inline;
 
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.TableRuntime;
+import org.apache.amoro.config.OptimizingConfig;
 import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.optimizing.evaluation.MetadataBasedEvaluationEvent;
 import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
 import org.apache.amoro.process.ProcessStatus;
 import org.apache.amoro.server.optimizing.OptimizingProcess;
@@ -60,8 +62,19 @@ public class TableRuntimeRefreshExecutor extends 
PeriodicTableScheduler {
 
   private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, 
MixedTable table) {
     // only evaluate pending input when optimizing is enabled and in idle state
-    if (tableRuntime.getTableConfiguration().getOptimizingConfig().isEnabled()
+    OptimizingConfig optimizingConfig = tableRuntime.getOptimizingConfig();
+    if (optimizingConfig.isEnabled()
         && tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {
+
+      if (optimizingConfig.isMetadataBasedTriggerEnabled()
+          && !MetadataBasedEvaluationEvent.isEvaluatingNecessary(
+              optimizingConfig, table, tableRuntime.getLastPlanTime())) {
+        logger.debug(
+            "{} optimizing is not necessary due to metadata based trigger",
+            tableRuntime.getTableIdentifier());
+        return;
+      }
+
       AbstractOptimizingEvaluator evaluator =
           IcebergTableUtil.createOptimizingEvaluator(tableRuntime, table, 
maxPendingPartitions);
       if (evaluator.isNecessary()) {
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index c2532a6bc..c1506218d 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -554,6 +554,7 @@ public class DefaultTableRuntime extends 
AbstractTableRuntime
             currentChangeSnapshotId);
         state.setCurrentChangeSnapshotId(currentChangeSnapshotId);
         state.setCurrentSnapshotId(currentSnapshotId);
+        return true;
       }
     } else {
       long currentSnapshotId = doRefreshSnapshots((UnkeyedTable) table);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
index f5b239ff4..c891a24f4 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
@@ -331,7 +331,17 @@ public class TableConfigurations {
             PropertyUtil.propertyAsLong(
                 properties,
                 TableProperties.SELF_OPTIMIZING_MIN_PLAN_INTERVAL,
-                TableProperties.SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT));
+                TableProperties.SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT))
+        .setEvaluationFallbackInterval(
+            PropertyUtil.propertyAsLong(
+                properties,
+                TableProperties.SELF_OPTIMIZING_EVALUATION_FALLBACK_INTERVAL,
+                
TableProperties.SELF_OPTIMIZING_EVALUATION_FALLBACK_INTERVAL_DEFAULT))
+        .setEvaluationMseTolerance(
+            PropertyUtil.propertyAsLong(
+                properties,
+                
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE,
+                
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT));
   }
 
   /**
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java
index f82dca86c..0b96dddd8 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java
@@ -228,15 +228,37 @@ public class IcebergTableUtil {
     OptimizingConfig config = tableRuntime.getOptimizingConfig();
     long lastMinor = tableRuntime.getLastMinorOptimizingTime();
     long lastFull = tableRuntime.getLastFullOptimizingTime();
+    long lastMajor = tableRuntime.getLastMajorOptimizingTime();
     if (TableFormat.ICEBERG.equals(table.format())) {
       return new IcebergOptimizerEvaluator(
-          identifier, config, table, snapshot, maxPendingPartitions, 
lastMinor, lastFull);
+          identifier,
+          config,
+          table,
+          snapshot,
+          maxPendingPartitions,
+          lastMinor,
+          lastFull,
+          lastMajor);
     } else if (TableFormat.MIXED_ICEBERG.equals(table.format())) {
       return new MixedIcebergOptimizingEvaluator(
-          identifier, config, table, snapshot, maxPendingPartitions, 
lastMinor, lastFull);
+          identifier,
+          config,
+          table,
+          snapshot,
+          maxPendingPartitions,
+          lastMinor,
+          lastFull,
+          lastMajor);
     } else if (TableFormat.MIXED_HIVE.equals(table.format())) {
       return new MixedHiveOptimizingEvaluator(
-          identifier, config, table, snapshot, maxPendingPartitions, 
lastMinor, lastFull);
+          identifier,
+          config,
+          table,
+          snapshot,
+          maxPendingPartitions,
+          lastMinor,
+          lastFull,
+          lastMajor);
     }
     throw new IllegalStateException("Un-supported table-format:" + 
table.format().toString());
   }
@@ -267,6 +289,7 @@ public class IcebergTableUtil {
     OptimizingConfig config = tableRuntime.getOptimizingConfig();
     long lastMinor = tableRuntime.getLastMinorOptimizingTime();
     long lastFull = tableRuntime.getLastFullOptimizingTime();
+    long lastMajor = tableRuntime.getLastMajorOptimizingTime();
     TableSnapshot snapshot = IcebergTableUtil.getSnapshot(table, tableRuntime);
     if (TableFormat.ICEBERG.equals(table.format())) {
       return new IcebergOptimizingPlanner(
@@ -279,7 +302,8 @@ public class IcebergTableUtil {
           availableCore,
           maxInputSizePerThread,
           lastMinor,
-          lastFull);
+          lastFull,
+          lastMajor);
     } else if (TableFormat.MIXED_ICEBERG.equals(table.format())) {
       return new MixedIcebergOptimizingPlanner(
           identifier,
@@ -291,7 +315,8 @@ public class IcebergTableUtil {
           availableCore,
           maxInputSizePerThread,
           lastMinor,
-          lastFull);
+          lastFull,
+          lastMajor);
     } else if (TableFormat.MIXED_HIVE.equals(table.format())) {
       return new MixedHiveOptimizingPlanner(
           identifier,
@@ -303,7 +328,8 @@ public class IcebergTableUtil {
           availableCore,
           maxInputSizePerThread,
           lastMinor,
-          lastFull);
+          lastFull,
+          lastMajor);
     }
     throw new IllegalStateException("Unsupported table format:" + 
table.format().toString());
   }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
index 387b18231..b6eb39a10 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
@@ -91,7 +91,8 @@ public class TestHiveKeyedPartitionPlan extends 
TestKeyedPartitionPlan {
         hiveLocation,
         System.currentTimeMillis(),
         getTableRuntime().getLastMinorOptimizingTime(),
-        getTableRuntime().getLastFullOptimizingTime());
+        getTableRuntime().getLastFullOptimizingTime(),
+        getTableRuntime().getLastMajorOptimizingTime());
   }
 
   @Test
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
index 2d0e4cfc1..cede17360 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
@@ -90,7 +90,8 @@ public class TestHiveUnkeyedPartitionPlan extends 
TestUnkeyedPartitionPlan {
         hiveLocation,
         System.currentTimeMillis(),
         getTableRuntime().getLastMinorOptimizingTime(),
-        getTableRuntime().getLastFullOptimizingTime());
+        getTableRuntime().getLastFullOptimizingTime(),
+        getTableRuntime().getLastMajorOptimizingTime());
   }
 
   @Test
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
index 3d67b62c8..7ad399709 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
@@ -83,7 +83,8 @@ public class TestIcebergPartitionPlan extends 
TestUnkeyedPartitionPlan {
         getPartition(),
         System.currentTimeMillis(),
         tableRuntime.getLastMinorOptimizingTime(),
-        tableRuntime.getLastFullOptimizingTime());
+        tableRuntime.getLastFullOptimizingTime(),
+        tableRuntime.getLastMajorOptimizingTime());
   }
 
   @Override
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
index 1984f31d8..d80a1e57e 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
@@ -222,7 +222,8 @@ public class TestKeyedPartitionPlan extends 
MixedTablePlanTestBase {
         getPartition(),
         System.currentTimeMillis(),
         getTableRuntime().getLastMinorOptimizingTime(),
-        getTableRuntime().getLastFullOptimizingTime());
+        getTableRuntime().getLastFullOptimizingTime(),
+        getTableRuntime().getLastMajorOptimizingTime());
   }
 
   @Override
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
index 558ace42b..0ad80622c 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
@@ -88,7 +88,8 @@ public class TestUnkeyedPartitionPlan extends 
MixedTablePlanTestBase {
         getPartition(),
         System.currentTimeMillis(),
         getTableRuntime().getLastMinorOptimizingTime(),
-        getTableRuntime().getLastFullOptimizingTime());
+        getTableRuntime().getLastFullOptimizingTime(),
+        getTableRuntime().getLastMajorOptimizingTime());
   }
 
   @Override
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java 
b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
index c5f8ef77e..0c743ac6b 100644
--- a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
+++ b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
@@ -91,6 +91,12 @@ public class OptimizingConfig {
   // self-optimizing.min-plan-interval
   private long minPlanInterval;
 
+  // self-optimizing.evaluation.file-size.mse-tolerance
+  private long evaluationMseTolerance;
+
+  // self-optimizing.evaluation.fallback-interval
+  private long evaluationFallbackInterval;
+
   public OptimizingConfig() {}
 
   public boolean isEnabled() {
@@ -290,6 +296,28 @@ public class OptimizingConfig {
     return this;
   }
 
+  public long getEvaluationFallbackInterval() {
+    return evaluationFallbackInterval;
+  }
+
+  public OptimizingConfig setEvaluationFallbackInterval(long 
evaluationFallbackInterval) {
+    this.evaluationFallbackInterval = evaluationFallbackInterval;
+    return this;
+  }
+
+  public boolean isMetadataBasedTriggerEnabled() {
+    return evaluationFallbackInterval >= 0;
+  }
+
+  public long getEvaluationMseTolerance() {
+    return evaluationMseTolerance;
+  }
+
+  public OptimizingConfig setEvaluationMseTolerance(long 
evaluationMseTolerance) {
+    this.evaluationMseTolerance = evaluationMseTolerance;
+    return this;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -320,7 +348,9 @@ public class OptimizingConfig {
         && baseRefreshInterval == that.baseRefreshInterval
         && hiveRefreshInterval == that.hiveRefreshInterval
         && Objects.equal(optimizerGroup, that.optimizerGroup)
-        && Objects.equal(minPlanInterval, that.minPlanInterval);
+        && Objects.equal(minPlanInterval, that.minPlanInterval)
+        && Objects.equal(evaluationMseTolerance, that.evaluationMseTolerance)
+        && Objects.equal(evaluationFallbackInterval, 
that.evaluationFallbackInterval);
   }
 
   @Override
@@ -347,7 +377,9 @@ public class OptimizingConfig {
         baseHashBucket,
         baseRefreshInterval,
         hiveRefreshInterval,
-        minPlanInterval);
+        minPlanInterval,
+        evaluationMseTolerance,
+        evaluationFallbackInterval);
   }
 
   @Override
@@ -373,6 +405,8 @@ public class OptimizingConfig {
         .add("baseHashBucket", baseHashBucket)
         .add("baseRefreshInterval", baseRefreshInterval)
         .add("hiveRefreshInterval", hiveRefreshInterval)
+        .add("evaluationMseTolerance", evaluationMseTolerance)
+        .add("evaluationFallbackInterval", evaluationFallbackInterval)
         .toString();
   }
 }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/MetadataBasedEvaluationEvent.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/MetadataBasedEvaluationEvent.java
new file mode 100644
index 000000000..d22d92bc9
--- /dev/null
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/MetadataBasedEvaluationEvent.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizing.evaluation;
+
+import static org.apache.amoro.TableFormat.ICEBERG;
+import static org.apache.amoro.TableFormat.MIXED_HIVE;
+import static org.apache.amoro.TableFormat.MIXED_ICEBERG;
+
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.config.OptimizingConfig;
+import org.apache.amoro.table.MixedTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetadataBasedEvaluationEvent {
+  private static final Logger logger = 
LoggerFactory.getLogger(MetadataBasedEvaluationEvent.class);
+
+  public static boolean isReachFallbackInterval(OptimizingConfig config, long 
lastPlanTime) {
+    long fallbackInterval = config.getEvaluationFallbackInterval();
+    return fallbackInterval >= 0 && System.currentTimeMillis() - lastPlanTime 
>= fallbackInterval;
+  }
+
+  public static boolean isEvaluatingNecessary(
+      OptimizingConfig config, MixedTable table, long lastPlanTime) {
+    // Step 1: Perform periodic scheduling according to the fallback interval 
to avoid false
+    // positives or
+    // missed triggers based on metadata metric-driven evaluation
+    if (isReachFallbackInterval(config, lastPlanTime)) {
+      logger.debug("Maximum interval for evaluating table {} has reached.", 
table.id());
+      return true;
+    }
+
+    TableStatsProvider.BasicFileStats basicStats = getTableStats(table);
+    // Currently only supports ICEBERG and mixed formats (MIXED_ICEBERG, 
MIXED_HIVE).
+    // For other formats this will return null.
+    if (basicStats != null) {
+      // Step 2: Empty table should skip evaluating pending input
+      //    TableStatsProvider provider = getTableStatsProvider(table);
+      if (basicStats.dataFileCnt == 0) {
+        logger.debug("Table {} contains no data files, skip evaluating pending 
input.", table.id());
+        return false;
+      }
+
+      // Step 3: If the condition `delete file=0 && avg file size > target 
size * ratio` is
+      // satisfied,
+      // then evaluating the pending input is considered unnecessary and will 
be skipped.
+      long minTargetSize = (long) (config.getTargetSize() * 
config.getMinTargetSizeRatio());
+      double avgFileSize =
+          basicStats.dataFileCnt > 0
+              ? (double) basicStats.totalFileSize / basicStats.dataFileCnt
+              : 0;
+
+      if (basicStats.deleteFileCnt == 0 && avgFileSize > minTargetSize) {
+        logger.debug(
+            "Table {} contains only appended data and no deleted files 
(average file size: {}), skip evaluating pending input.",
+            table.id(),
+            avgFileSize);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static TableStatsProvider.BasicFileStats getTableStats(MixedTable 
table) {
+    TableFormat format = table.format();
+    if (format.equals(ICEBERG) || format.equals(MIXED_ICEBERG) || 
format.equals(MIXED_HIVE)) {
+      TableStatsProvider provider = MixedAndIcebergTableStatsProvider.INSTANCE;
+      return provider.collect(table);
+    } else {
+      // Unsupported table format for metadata-based evaluation
+      // PAIMON and HUDI formats are currently not supported because obtaining 
basic statistics
+      // requires traversing manifest files, which would add extra planning 
overhead.
+      // In the future, if there is a more efficient way to get these stats,
+      // we can extend TableStatsProvider to support these formats.
+      return null;
+    }
+  }
+
+  public static boolean isPartitionPendingNecessary(
+      OptimizingConfig config, long partitionSquaredErrorSum, long 
partitionFileCount) {
+    long mseTolerance = config.getEvaluationMseTolerance();
+    return partitionFileCount > 1
+        && (mseTolerance == 0
+            || (double) partitionSquaredErrorSum / partitionFileCount
+                >= (double) mseTolerance * mseTolerance);
+  }
+}
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/MixedAndIcebergTableStatsProvider.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/MixedAndIcebergTableStatsProvider.java
new file mode 100644
index 000000000..8ec04e12b
--- /dev/null
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/MixedAndIcebergTableStatsProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizing.evaluation;
+
+import org.apache.amoro.table.MixedTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.util.PropertyUtil;
+
+import java.util.Map;
+
+public class MixedAndIcebergTableStatsProvider extends TableStatsProvider {
+
+  public static final MixedAndIcebergTableStatsProvider INSTANCE =
+      new MixedAndIcebergTableStatsProvider();
+
+  private MixedAndIcebergTableStatsProvider() {}
+
+  @Override
+  public BasicFileStats collect(MixedTable table) {
+    BasicFileStats stats = new BasicFileStats();
+    if (table.isUnkeyedTable()) {
+      acceptSnapshotIfPresent(stats, table.asUnkeyedTable().currentSnapshot());
+    } else {
+      acceptSnapshotIfPresent(stats, 
table.asKeyedTable().baseTable().currentSnapshot());
+      acceptSnapshotIfPresent(stats, 
table.asKeyedTable().changeTable().currentSnapshot());
+    }
+    return stats;
+  }
+
+  private void acceptSnapshotIfPresent(BasicFileStats stats, Snapshot 
snapshot) {
+    if (snapshot != null) {
+      stats.accept(snapshot.summary());
+    }
+  }
+
+  public static class BasicFileStats extends TableStatsProvider.BasicFileStats 
{
+    public void accept(Map<String, String> summary) {
+      deleteFileCnt +=
+          PropertyUtil.propertyAsInt(summary, 
SnapshotSummary.TOTAL_DELETE_FILES_PROP, 0);
+      dataFileCnt += PropertyUtil.propertyAsInt(summary, 
SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+      totalFileSize +=
+          PropertyUtil.propertyAsLong(summary, 
SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);
+    }
+  }
+}
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/TableStatsProvider.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/TableStatsProvider.java
new file mode 100644
index 000000000..99411eff2
--- /dev/null
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/TableStatsProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizing.evaluation;
+
+import 
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
+import org.apache.amoro.table.MixedTable;
+
+public abstract class TableStatsProvider {
+  /** Collect basic file statistics for the given table. */
+  public abstract BasicFileStats collect(MixedTable table);
+
+  /** Statistics representation of basic file metrics. */
+  public static class BasicFileStats {
+    /** Count of delete files. */
+    int deleteFileCnt = 0;
+    /** Count of data files. */
+    int dataFileCnt = 0;
+    /** Total size of all files in bytes. */
+    long totalFileSize = 0;
+
+    @VisibleForTesting
+    BasicFileStats() {}
+
+    public void accept(int dataFileCnt, int deleteFileCnt, long totalFileSize) 
{
+      this.dataFileCnt += dataFileCnt;
+      this.deleteFileCnt += deleteFileCnt;
+      this.totalFileSize += totalFileSize;
+    }
+  }
+}
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java
index ed108c94c..5f9061987 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java
@@ -61,6 +61,7 @@ public abstract class AbstractOptimizingEvaluator {
   protected final TableSnapshot currentSnapshot;
   protected final long lastFullOptimizingTime;
   protected final long lastMinorOptimizingTime;
+  protected final long lastMajorOptimizingTime;
   protected final int maxPendingPartitions;
   protected boolean isInitialized = false;
   protected Map<String, PartitionEvaluator> needOptimizingPlanMap = 
Maps.newHashMap();
@@ -73,7 +74,8 @@ public abstract class AbstractOptimizingEvaluator {
       TableSnapshot currentSnapshot,
       int maxPendingPartitions,
       long lastMinorOptimizingTime,
-      long lastFullOptimizingTime) {
+      long lastFullOptimizingTime,
+      long lastMajorOptimizingTime) {
     this.identifier = identifier;
     this.config = config;
     this.mixedTable = table;
@@ -81,6 +83,7 @@ public abstract class AbstractOptimizingEvaluator {
     this.maxPendingPartitions = maxPendingPartitions;
     this.lastFullOptimizingTime = lastFullOptimizingTime;
     this.lastMinorOptimizingTime = lastMinorOptimizingTime;
+    this.lastMajorOptimizingTime = lastMajorOptimizingTime;
   }
 
   protected void initEvaluator() {
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
index 69380c2c9..05bf82fab 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
@@ -65,7 +65,8 @@ public abstract class AbstractOptimizingPlanner extends 
AbstractOptimizingEvalua
       double availableCore,
       long maxInputSizePerThread,
       long lastMinorOptimizingTime,
-      long lastFullOptimizingTime) {
+      long lastFullOptimizingTime,
+      long lastMajorOptimizingTime) {
     super(
         identifier,
         config,
@@ -73,7 +74,8 @@ public abstract class AbstractOptimizingPlanner extends 
AbstractOptimizingEvalua
         snapshot,
         Integer.MAX_VALUE,
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
     this.partitionFilter = partitionFilter;
     this.availableCore = availableCore;
     this.planTime = System.currentTimeMillis();
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
index 0c0c6a422..313076339 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
@@ -51,6 +51,7 @@ public abstract class AbstractPartitionPlan implements 
PartitionEvaluator {
   protected final ServerTableIdentifier identifier;
   protected final long lastMinorOptimizingTime;
   protected final long lastFullOptimizingTime;
+  protected final long lastMajorOptimizingTime;
   private CommonPartitionEvaluator evaluator;
   private TaskSplitter taskSplitter;
   protected MixedTable tableObject;
@@ -82,7 +83,8 @@ public abstract class AbstractPartitionPlan implements 
PartitionEvaluator {
       Pair<Integer, StructLike> partition,
       long planTime,
       long lastMinorOptimizingTime,
-      long lastFullOptimizingTime) {
+      long lastFullOptimizingTime,
+      long lastMajorOptimizingTime) {
     this.identifier = identifier;
     this.partition = partition;
     this.tableObject = table;
@@ -90,6 +92,7 @@ public abstract class AbstractPartitionPlan implements 
PartitionEvaluator {
     this.planTime = planTime;
     this.lastMinorOptimizingTime = lastMinorOptimizingTime;
     this.lastFullOptimizingTime = lastFullOptimizingTime;
+    this.lastMajorOptimizingTime = lastMajorOptimizingTime;
   }
 
   @Override
@@ -106,7 +109,13 @@ public abstract class AbstractPartitionPlan implements 
PartitionEvaluator {
 
   protected CommonPartitionEvaluator buildEvaluator() {
     return new CommonPartitionEvaluator(
-        identifier, config, partition, planTime, lastMinorOptimizingTime, 
lastFullOptimizingTime);
+        identifier,
+        config,
+        partition,
+        planTime,
+        lastMinorOptimizingTime,
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
   }
 
   @Override
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/CommonPartitionEvaluator.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/CommonPartitionEvaluator.java
index 7f1f44b19..2cd11cbd5 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/CommonPartitionEvaluator.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/CommonPartitionEvaluator.java
@@ -22,6 +22,7 @@ import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.config.OptimizingConfig;
 import org.apache.amoro.optimizing.HealthScoreInfo;
 import org.apache.amoro.optimizing.OptimizingType;
+import org.apache.amoro.optimizing.evaluation.MetadataBasedEvaluationEvent;
 import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -47,6 +48,7 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
   protected final OptimizingConfig config;
   protected final long lastFullOptimizingTime;
   protected final long lastMinorOptimizingTime;
+  protected final long lastMajorOptimizingTime;
   protected final long fragmentSize;
   protected final long minTargetSize;
   protected final long planTime;
@@ -80,6 +82,9 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
   protected long posDeleteFileSize = 0L;
   protected long posDeleteFileRecords = 0L;
 
+  // mse stat
+  protected long fileSizeSquaredErrorSum = 0L;
+
   private long cost = -1;
   private Boolean necessary = null;
   private OptimizingType optimizingType = null;
@@ -91,7 +96,8 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
       Pair<Integer, StructLike> partition,
       long planTime,
       long lastMinorOptimizingTime,
-      long lastFullOptimizingTime) {
+      long lastFullOptimizingTime,
+      long lastMajorOptimizingTime) {
     this.identifier = identifier;
     this.config = config;
     this.partition = partition;
@@ -104,6 +110,7 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
     }
     this.planTime = planTime;
     this.lastMinorOptimizingTime = lastMinorOptimizingTime;
+    this.lastMajorOptimizingTime = lastMajorOptimizingTime;
     this.lastFullOptimizingTime = lastFullOptimizingTime;
     this.reachFullInterval =
         config.getFullTriggerInterval() >= 0
@@ -128,6 +135,11 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
     if (!config.isEnabled()) {
       return false;
     }
+    if (config.isMetadataBasedTriggerEnabled() && 
config.getEvaluationMseTolerance() > 0) {
+      // Update the file size squared error sum
+      updateFileSizeSquaredErrorSum(dataFile);
+    }
+
     if (isFragmentFile(dataFile)) {
       return addFragmentFile(dataFile, deletes);
     } else if (isUndersizedSegmentFile(dataFile)) {
@@ -208,6 +220,31 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
     return false;
   }
 
+  private void updateFileSizeSquaredErrorSum(DataFile dataFile) {
+    // Only accumulate squared error for files smaller than `minTargetSize`
+    // For files larger than or equal to minTargetSize, diffSize will be 0, 
contributing nothing to
+    // the error sum
+    long diffSize = minTargetSize - Math.min(dataFile.fileSizeInBytes(), 
minTargetSize);
+    if (diffSize <= 0) {
+      return;
+    }
+
+    // Prevent overflow for diffSize * diffSize by saturating to Long.MAX_VALUE
+    final long prod;
+    if (diffSize > Long.MAX_VALUE / diffSize) {
+      prod = Long.MAX_VALUE;
+    } else {
+      prod = diffSize * diffSize;
+    }
+
+    // Prevent overflow when adding to fileSizeSquaredErrorSum (saturate to 
Long.MAX_VALUE)
+    if (fileSizeSquaredErrorSum > Long.MAX_VALUE - prod) {
+      fileSizeSquaredErrorSum = Long.MAX_VALUE;
+    } else {
+      fileSizeSquaredErrorSum += prod;
+    }
+  }
+
   protected boolean fileShouldFullOptimizing(DataFile dataFile, 
List<ContentFile<?>> deleteFiles) {
     if (config.isFullRewriteAllFiles()) {
       return true;
@@ -288,6 +325,19 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
   @Override
   public boolean isNecessary() {
     if (necessary == null) {
+      long lastPlanTime =
+          Math.max(
+              Math.max(lastMinorOptimizingTime, lastMajorOptimizingTime), 
lastFullOptimizingTime);
+      if (config.isMetadataBasedTriggerEnabled()
+          && !MetadataBasedEvaluationEvent.isReachFallbackInterval(config, 
lastPlanTime)) {
+        long fileCount = fragmentFileCount + undersizedSegmentFileCount;
+        if (!MetadataBasedEvaluationEvent.isPartitionPendingNecessary(
+            config, fileSizeSquaredErrorSum, fileCount)) {
+          LOG.debug("{} not necessary due to metadata-based evaluation, {}", 
name(), this);
+          necessary = false;
+          return false;
+        }
+      }
       if (isFullOptimizing()) {
         necessary = isFullNecessary();
       } else {
@@ -509,6 +559,10 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
     return posDeleteFileRecords;
   }
 
+  public long getFileSizeSquaredErrorSum() {
+    return fileSizeSquaredErrorSum;
+  }
+
   public static class Weight implements PartitionEvaluator.Weight {
 
     private final long cost;
@@ -532,6 +586,7 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
         .add("undersizedSegmentSize", minTargetSize)
         .add("planTime", planTime)
         .add("lastMinorOptimizeTime", lastMinorOptimizingTime)
+        .add("lastMajorOptimizeTime", lastMajorOptimizingTime)
         .add("lastFullOptimizeTime", lastFullOptimizingTime)
         .add("fragmentFileCount", fragmentFileCount)
         .add("fragmentFileSize", fragmentFileSize)
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizerEvaluator.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizerEvaluator.java
index 0f11a020d..cf6f5aec8 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizerEvaluator.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizerEvaluator.java
@@ -33,7 +33,8 @@ public class IcebergOptimizerEvaluator extends 
AbstractOptimizingEvaluator {
       TableSnapshot currentSnapshot,
       int maxPendingPartitions,
       long lastMinorOptimizingTime,
-      long lastFullOptimizingTime) {
+      long lastFullOptimizingTime,
+      long lastMajorOptimizingTime) {
     super(
         identifier,
         config,
@@ -41,7 +42,8 @@ public class IcebergOptimizerEvaluator extends 
AbstractOptimizingEvaluator {
         currentSnapshot,
         maxPendingPartitions,
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
   }
 
   @Override
@@ -52,6 +54,7 @@ public class IcebergOptimizerEvaluator extends 
AbstractOptimizingEvaluator {
         partition,
         System.currentTimeMillis(),
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
   }
 }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizingPlanner.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizingPlanner.java
index 21833283f..aed5ec65d 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizingPlanner.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizingPlanner.java
@@ -37,7 +37,8 @@ public class IcebergOptimizingPlanner extends 
AbstractOptimizingPlanner {
       double availableCore,
       long maxInputSizePerThread,
       long lastMinorOptimizingTime,
-      long lastFullOptimizingTime) {
+      long lastFullOptimizingTime,
+      long lastMajorOptimizingTime) {
     super(
         identifier,
         config,
@@ -48,7 +49,8 @@ public class IcebergOptimizingPlanner extends 
AbstractOptimizingPlanner {
         availableCore,
         maxInputSizePerThread,
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
   }
 
   @Override
@@ -60,6 +62,7 @@ public class IcebergOptimizingPlanner extends 
AbstractOptimizingPlanner {
         partition,
         planTime,
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
   }
 }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
index 998d08624..f1eb92104 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
@@ -40,7 +40,8 @@ public class IcebergPartitionPlan extends 
AbstractPartitionPlan {
       Pair<Integer, StructLike> partition,
       long planTime,
       long lastMinorOptimizingTime,
-      long lastFullOptimizingTime) {
+      long lastFullOptimizingTime,
+      long lastMajorOptimizingTime) {
     super(
         identifier,
         table,
@@ -48,7 +49,8 @@ public class IcebergPartitionPlan extends 
AbstractPartitionPlan {
         partition,
         planTime,
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
   }
 
   @Override
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingEvaluator.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingEvaluator.java
index 9f6bd279d..0c33e0b9b 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingEvaluator.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingEvaluator.java
@@ -36,7 +36,8 @@ public class MixedIcebergOptimizingEvaluator extends 
AbstractOptimizingEvaluator
       TableSnapshot currentSnapshot,
       int maxPendingPartitions,
       long lastMinorOptimizingTime,
-      long lastFullOptimizingTime) {
+      long lastFullOptimizingTime,
+      long lastMajorOptimizingTime) {
     super(
         identifier,
         config,
@@ -44,7 +45,8 @@ public class MixedIcebergOptimizingEvaluator extends 
AbstractOptimizingEvaluator
         currentSnapshot,
         maxPendingPartitions,
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
   }
 
   protected Map<String, String> partitionProperties(Pair<Integer, StructLike> 
partition) {
@@ -62,6 +64,7 @@ public class MixedIcebergOptimizingEvaluator extends 
AbstractOptimizingEvaluator
         System.currentTimeMillis(),
         mixedTable.isKeyedTable(),
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
   }
 }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingPlanner.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingPlanner.java
index c33072262..8e18b2f68 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingPlanner.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingPlanner.java
@@ -37,7 +37,8 @@ public class MixedIcebergOptimizingPlanner extends 
AbstractOptimizingPlanner {
       double availableCore,
       long maxInputSizePerThread,
       long lastMinorOptimizingTime,
-      long lastFullOptimizingTime) {
+      long lastFullOptimizingTime,
+      long lastMajorOptimizingTime) {
     super(
         identifier,
         config,
@@ -48,7 +49,8 @@ public class MixedIcebergOptimizingPlanner extends 
AbstractOptimizingPlanner {
         availableCore,
         maxInputSizePerThread,
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
   }
 
   @Override
@@ -60,6 +62,7 @@ public class MixedIcebergOptimizingPlanner extends 
AbstractOptimizingPlanner {
         partition,
         planTime,
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
   }
 }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
index 5d77f76ff..ae116be6e 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
@@ -56,7 +56,8 @@ public class MixedIcebergPartitionPlan extends 
AbstractPartitionPlan {
       Pair<Integer, StructLike> partition,
       long planTime,
       long lastMinorOptimizingTime,
-      long lastFullOptimizingTime) {
+      long lastFullOptimizingTime,
+      long lastMajorOptimizingTime) {
     super(
         identifier,
         table,
@@ -64,7 +65,8 @@ public class MixedIcebergPartitionPlan extends 
AbstractPartitionPlan {
         partition,
         planTime,
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
     this.partitionProperties = TablePropertyUtil.getPartitionProperties(table, 
partition.second());
   }
 
@@ -121,7 +123,8 @@ public class MixedIcebergPartitionPlan extends 
AbstractPartitionPlan {
         planTime,
         isKeyedTable(),
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
   }
 
   public static class MixedIcebergPartitionEvaluator extends 
CommonPartitionEvaluator {
@@ -137,9 +140,16 @@ public class MixedIcebergPartitionPlan extends 
AbstractPartitionPlan {
         long planTime,
         boolean keyedTable,
         long lastMinorOptimizingTime,
-        long lastFullOptimizingTime) {
+        long lastFullOptimizingTime,
+        long lastMajorOptimizingTime) {
       super(
-          identifier, config, partition, planTime, lastMinorOptimizingTime, 
lastFullOptimizingTime);
+          identifier,
+          config,
+          partition,
+          planTime,
+          lastMinorOptimizingTime,
+          lastFullOptimizingTime,
+          lastMajorOptimizingTime);
       this.keyedTable = keyedTable;
       String optimizedTime = 
partitionProperties.get(TableProperties.PARTITION_BASE_OPTIMIZED_TIME);
       long lastBaseOptimizedTime = optimizedTime == null ? 0 : 
Long.parseLong(optimizedTime);
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
index 8252147b3..0d804cdec 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
@@ -133,6 +133,19 @@ public class TableProperties {
       "self-optimizing.min-plan-interval";
   public static final long SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT = 60000;
 
+  /** metadata-based evaluation related properties */
+  public static final String SELF_OPTIMIZING_EVALUATION_FALLBACK_INTERVAL =
+      "self-optimizing.evaluation.fallback-interval"; // fallback evaluation 
interval in
+  // milliseconds
+
+  public static final int SELF_OPTIMIZING_EVALUATION_FALLBACK_INTERVAL_DEFAULT 
=
+      -1; // metadata-based evaluation not in effect
+
+  public static final String 
SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE =
+      "self-optimizing.evaluation.file-size.mse-tolerance"; // Mean Squared 
Error (MSE) tolerance
+  // for file size evaluation
+  public static final long 
SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT = 0;
+
   /** table clean related properties */
   public static final String ENABLE_TABLE_EXPIRE = "table-expire.enabled";
 
diff --git 
a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/evaluation/TestMetadataBasedEvaluationEvent.java
 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/evaluation/TestMetadataBasedEvaluationEvent.java
new file mode 100644
index 000000000..49229d481
--- /dev/null
+++ 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/evaluation/TestMetadataBasedEvaluationEvent.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizing.evaluation;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.catalog.BasicCatalogTestHelper;
+import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.catalog.TableTestBase;
+import org.apache.amoro.config.OptimizingConfig;
+import org.apache.amoro.data.ChangeAction;
+import org.apache.amoro.iceberg.Constants;
+import org.apache.amoro.io.IcebergDataTestHelpers;
+import org.apache.amoro.io.MixedDataTestHelpers;
+import org.apache.amoro.io.writer.RecordWithAction;
+import org.apache.amoro.optimizing.plan.CommonPartitionEvaluator;
+import org.apache.amoro.optimizing.plan.MixedIcebergPartitionPlan;
+import org.apache.amoro.optimizing.plan.PartitionEvaluator;
+import org.apache.amoro.optimizing.scan.IcebergTableFileScanHelper;
+import org.apache.amoro.optimizing.scan.KeyedTableFileScanHelper;
+import org.apache.amoro.optimizing.scan.TableFileScanHelper;
+import org.apache.amoro.optimizing.scan.UnkeyedTableFileScanHelper;
+import org.apache.amoro.properties.HiveTableProperties;
+import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.amoro.table.KeyedTable;
+import org.apache.amoro.table.KeyedTableSnapshot;
+import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.TableProperties;
+import org.apache.amoro.table.UnkeyedTable;
+import org.apache.amoro.utils.TablePropertyUtil;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@RunWith(Parameterized.class)
+public class TestMetadataBasedEvaluationEvent extends TableTestBase {
+
+  public TestMetadataBasedEvaluationEvent(
+      CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
+    super(catalogTestHelper, tableTestHelper);
+  }
+
+  public static final Schema TABLE_SCHEMA =
+      new Schema(
+          Lists.newArrayList(
+              Types.NestedField.required(1, "id", Types.IntegerType.get()),
+              Types.NestedField.required(2, "name", Types.StringType.get()),
+              Types.NestedField.required(3, "ts", Types.LongType.get()),
+              Types.NestedField.required(4, "op_time", 
Types.TimestampType.withoutZone())),
+          Sets.newHashSet(1, 2, 3, 4));
+  public static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(TABLE_SCHEMA).day("op_time").build();
+
+  @Parameterized.Parameters(name = "{0}, {1}")
+  public static Object[] parameters() {
+    return new Object[][] {
+      {
+        new BasicCatalogTestHelper(TableFormat.ICEBERG),
+        new BasicTableTestHelper(TABLE_SCHEMA, true, SPEC)
+      },
+      {new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG), new 
BasicTableTestHelper(true, true)}
+    };
+  }
+
+  public void initData() throws IOException {
+    if (getMixedTable().isKeyedTable()) {
+      writeBaseStore(getMixedTable().asKeyedTable(), initRecords(1, 4, 0, 
"2022-01-01T12:00:00"));
+      writeBaseStore(getMixedTable().asKeyedTable(), initRecords(5, 8, 0, 
"2022-01-01T12:00:00"));
+
+    } else {
+      write(getMixedTable().asUnkeyedTable(), initRecords(1, "aaa", 0, 1, 
ChangeAction.INSERT));
+      write(getMixedTable().asUnkeyedTable(), initRecords(2, "bbb", 0, 1, 
ChangeAction.INSERT));
+    }
+  }
+
+  private List<RecordWithAction> initRecords(
+      int id, String name, long ts, int day, ChangeAction action) {
+    ImmutableList.Builder<RecordWithAction> builder = ImmutableList.builder();
+    builder.add(
+        new RecordWithAction(
+            MixedDataTestHelpers.createRecord(
+                id, name, ts, String.format("2022-01-%02dT12:00:00", day)),
+            action));
+
+    return builder.build();
+  }
+
+  private List<Record> initRecords(int from, int to, long ts, String opTime) {
+    ImmutableList.Builder<Record> builder = ImmutableList.builder();
+    for (int i = from; i <= to; i++) {
+      builder.add(tableTestHelper().generateTestRecord(i, i + "", ts, opTime));
+    }
+
+    return builder.build();
+  }
+
+  private void write(UnkeyedTable table, List<RecordWithAction> list) throws 
IOException {
+    WriteResult result = IcebergDataTestHelpers.delta(table, list);
+
+    RowDelta rowDelta = table.newRowDelta();
+    Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
+    Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
+    rowDelta.commit();
+  }
+
+  private void writeBaseStore(KeyedTable keyedTable, List<Record> records) {
+    List<DataFile> baseFiles =
+        tableTestHelper()
+            .writeBaseStore(keyedTable, keyedTable.beginTransaction(""), 
records, false);
+    AppendFiles baseAppend = keyedTable.baseTable().newAppend();
+    baseFiles.forEach(baseAppend::appendFile);
+    baseAppend.commit();
+  }
+
+  private void writeChangeStore(KeyedTable keyedTable, List<DataFile> 
dataFiles) {
+    AppendFiles appendFiles = keyedTable.changeTable().newAppend();
+    dataFiles.forEach(appendFiles::appendFile);
+    appendFiles.commit();
+  }
+
+  public void addChangeStoreData() throws IOException {
+    MixedTable mixedTable = getMixedTable();
+    if (mixedTable.isUnkeyedTable()) {
+      write(mixedTable.asUnkeyedTable(), initRecords(1, "aaa", 0, 1, 
ChangeAction.DELETE));
+      write(mixedTable.asUnkeyedTable(), initRecords(2, "ccc", 0, 1, 
ChangeAction.UPDATE_AFTER));
+    } else {
+      writeChangeStore(
+          mixedTable.asKeyedTable(),
+          tableTestHelper()
+              .writeChangeStore(
+                  mixedTable.asKeyedTable(),
+                  mixedTable.asKeyedTable().beginTransaction(""),
+                  ChangeAction.INSERT,
+                  initRecords(1, 2, 0, "2022-01-01T12:00:00"),
+                  false));
+      writeChangeStore(
+          mixedTable.asKeyedTable(),
+          tableTestHelper()
+              .writeChangeStore(
+                  mixedTable.asKeyedTable(),
+                  mixedTable.asKeyedTable().beginTransaction(""),
+                  ChangeAction.INSERT,
+                  initRecords(3, 8, 0, "2022-01-01T12:00:00"),
+                  false));
+      writeChangeStore(
+          mixedTable.asKeyedTable(),
+          tableTestHelper()
+              .writeChangeStore(
+                  mixedTable.asKeyedTable(),
+                  mixedTable.asKeyedTable().beginTransaction(""),
+                  ChangeAction.DELETE,
+                  initRecords(1, 1, 0, "2022-01-01T12:00:00"),
+                  false));
+      writeChangeStore(
+          mixedTable.asKeyedTable(),
+          tableTestHelper()
+              .writeChangeStore(
+                  mixedTable.asKeyedTable(),
+                  mixedTable.asKeyedTable().beginTransaction(""),
+                  ChangeAction.DELETE,
+                  initRecords(2, 2, 0, "2022-01-01T12:00:00"),
+                  false));
+      writeChangeStore(
+          mixedTable.asKeyedTable(),
+          tableTestHelper()
+              .writeChangeStore(
+                  mixedTable.asKeyedTable(),
+                  mixedTable.asKeyedTable().beginTransaction(""),
+                  ChangeAction.DELETE,
+                  initRecords(3, 4, 0, "2022-01-01T12:00:00"),
+                  false));
+    }
+  }
+
+  @Test
+  public void test_evaluating_metadataBasedTriggerEnabled() {
+    OptimizingConfig config = getDefaultOptimizingConfig();
+    Assert.assertFalse(config.isMetadataBasedTriggerEnabled());
+
+    config.setEvaluationFallbackInterval(Long.MAX_VALUE);
+    Assert.assertTrue(config.isMetadataBasedTriggerEnabled());
+  }
+
+  @Test
+  public void test_evaluating_emptyTable() {
+    // Temporarily set self-optimizing.evaluation.fallback-interval to 
Long.MAX_VALUE to prevent
+    // triggering due to reaching the fallback interval.
+    OptimizingConfig config =
+        
getDefaultOptimizingConfig().setEvaluationFallbackInterval(Long.MAX_VALUE);
+    MixedTable table = getMixedTable();
+
+    // Verify the empty table stats
+    TableStatsProvider.BasicFileStats stats =
+        MixedAndIcebergTableStatsProvider.INSTANCE.collect(table);
+    Assert.assertEquals(0, stats.dataFileCnt);
+    Assert.assertEquals(0, stats.totalFileSize);
+    Assert.assertEquals(0, stats.deleteFileCnt);
+
+    // Verify the empty table evaluation
+    Assert.assertTrue(config.isMetadataBasedTriggerEnabled());
+    
Assert.assertFalse(MetadataBasedEvaluationEvent.isReachFallbackInterval(config, 
0L));
+    
Assert.assertFalse(MetadataBasedEvaluationEvent.isEvaluatingNecessary(config, 
table, 0L));
+  }
+
+  @Test
+  public void test_evaluating_nonEmptyTable() throws IOException {
+    initData();
+    OptimizingConfig config = getDefaultOptimizingConfig();
+    MixedTable table = getMixedTable();
+
+    // Verify the nonEmpty table stats
+    TableStatsProvider.BasicFileStats stats =
+        MixedAndIcebergTableStatsProvider.INSTANCE.collect(table);
+    Assert.assertTrue(stats.dataFileCnt > 0);
+    Assert.assertTrue(stats.totalFileSize > 0);
+    Assert.assertEquals(0, stats.deleteFileCnt);
+
+    // 1. Test for metadata-based trigger disabled.
+    config.setEvaluationFallbackInterval(-1);
+    Assert.assertFalse(config.isMetadataBasedTriggerEnabled());
+
+    // 2. Test for metadata-based trigger enabled.
+    // Temporarily set self-optimizing.evaluation.fallback-interval to 
Long.MAX_VALUE to prevent
+    // triggering due to reaching the fallback interval.
+    config.setEvaluationFallbackInterval(Long.MAX_VALUE);
+    Assert.assertTrue(config.isMetadataBasedTriggerEnabled());
+    
Assert.assertFalse(MetadataBasedEvaluationEvent.isReachFallbackInterval(config, 
0L));
+
+    // 2.1 Test for evaluating pendingInput necessary.
+    config.setTargetSize(134217728);
+    
Assert.assertTrue(MetadataBasedEvaluationEvent.isEvaluatingNecessary(config, 
table, 0L));
+
+    // 2.2 Test for evaluating pendingInput not necessary
+    // Temporarily set self-optimizing.target-size to a lower value than the 
average file size
+    config.setTargetSize(100);
+    
Assert.assertFalse(MetadataBasedEvaluationEvent.isEvaluatingNecessary(config, 
table, 0L));
+
+    // 2.3 Test for evaluating pendingInput necessary because the fallback 
interval has been
+    // reached.
+    config.setEvaluationFallbackInterval(0L);
+    Assert.assertTrue(config.isMetadataBasedTriggerEnabled());
+    
Assert.assertTrue(MetadataBasedEvaluationEvent.isEvaluatingNecessary(config, 
table, 0L));
+  }
+
+  @Test
+  public void test_evaluating_pendingInput_nonEmptyTable() throws IOException {
+    initData();
+    // Set metadata-based trigger enabled and fallback interval not reached.
+    OptimizingConfig config =
+        
getDefaultOptimizingConfig().setEvaluationFallbackInterval(Long.MAX_VALUE);
+    MixedTable table = getMixedTable();
+
+    // 1. Test file size square error sum updates during partition plans 
initialization using
+    // default mse tolerance (=0) , expecting no updates.
+    TableFileScanHelper tableFileScanHelper = initTableFileScanHelper();
+    Map<String, PartitionEvaluator> partitionPlanMap =
+        initPartitionPlans(tableFileScanHelper, config);
+
+    Assert.assertEquals(1, partitionPlanMap.size());
+
+    long sizeSquaredErrorSum =
+        ((CommonPartitionEvaluator) new 
ArrayList<>(partitionPlanMap.values()).get(0))
+            .getFileSizeSquaredErrorSum();
+    Assert.assertEquals(0L, sizeSquaredErrorSum);
+
+    List<PartitionEvaluator> necessaryPartitions =
+        partitionPlanMap.values().stream()
+            .filter(PartitionEvaluator::isNecessary)
+            .collect(Collectors.toList());
+    Assert.assertEquals(1, necessaryPartitions.size());
+
+    // 2. Set mse tolerance > 0 to enabled file size square error sum update 
during initializing
+    // Partition plans.
+    config.setEvaluationMseTolerance(120000000);
+    partitionPlanMap = initPartitionPlans(tableFileScanHelper, config);
+    Assert.assertEquals(1, partitionPlanMap.size());
+
+    sizeSquaredErrorSum =
+        ((CommonPartitionEvaluator) new 
ArrayList<>(partitionPlanMap.values()).get(0))
+            .getFileSizeSquaredErrorSum();
+    Assert.assertTrue(sizeSquaredErrorSum > 0);
+
+    necessaryPartitions =
+        partitionPlanMap.values().stream()
+            .filter(PartitionEvaluator::isNecessary)
+            .collect(Collectors.toList());
+    Assert.assertEquals(0, necessaryPartitions.size());
+
+    // 3. Test the file size variance updated after adding change data.
+    addChangeStoreData();
+    partitionPlanMap = initPartitionPlans(initTableFileScanHelper(), config);
+    Assert.assertEquals(1, partitionPlanMap.size());
+
+    long sizeSquaredErrorSumUpdated1 =
+        ((CommonPartitionEvaluator) new 
ArrayList<>(partitionPlanMap.values()).get(0))
+            .getFileSizeSquaredErrorSum();
+    Assert.assertNotEquals(sizeSquaredErrorSum, sizeSquaredErrorSumUpdated1);
+
+    necessaryPartitions =
+        partitionPlanMap.values().stream()
+            .filter(PartitionEvaluator::isNecessary)
+            .collect(Collectors.toList());
+    Assert.assertEquals(0, necessaryPartitions.size());
+
+    // 4. Set mse tolerance smaller for partitions to test necessary pending.
+    config.setEvaluationMseTolerance(100000000);
+    partitionPlanMap = initPartitionPlans(initTableFileScanHelper(), config);
+
+    long sizeSquaredErrorSumUpdated2 =
+        ((CommonPartitionEvaluator) new 
ArrayList<>(partitionPlanMap.values()).get(0))
+            .getFileSizeSquaredErrorSum();
+    Assert.assertEquals(sizeSquaredErrorSumUpdated2, 
sizeSquaredErrorSumUpdated1);
+
+    necessaryPartitions =
+        partitionPlanMap.values().stream()
+            .filter(PartitionEvaluator::isNecessary)
+            .collect(Collectors.toList());
+    Assert.assertEquals(1, necessaryPartitions.size());
+  }
+
+  private TableFileScanHelper initTableFileScanHelper() {
+    MixedTable mixedTable = getMixedTable();
+    TableFileScanHelper tableFileScanHelper;
+    if (TableFormat.ICEBERG.equals(mixedTable.format())) {
+      tableFileScanHelper =
+          new IcebergTableFileScanHelper(
+              mixedTable.asUnkeyedTable(),
+              mixedTable.asUnkeyedTable().currentSnapshot().snapshotId());
+    } else {
+      if (mixedTable.isUnkeyedTable()) {
+        tableFileScanHelper =
+            new UnkeyedTableFileScanHelper(
+                mixedTable.asUnkeyedTable(),
+                mixedTable.asUnkeyedTable().currentSnapshot().snapshotId());
+      } else {
+        Snapshot currentSnapshot = 
mixedTable.asKeyedTable().baseTable().currentSnapshot();
+        Snapshot changeSnapshot = 
mixedTable.asKeyedTable().changeTable().currentSnapshot();
+
+        tableFileScanHelper =
+            new KeyedTableFileScanHelper(
+                mixedTable.asKeyedTable(),
+                new KeyedTableSnapshot(
+                    currentSnapshot != null
+                        ? currentSnapshot.snapshotId()
+                        : Constants.INVALID_SNAPSHOT_ID,
+                    changeSnapshot != null
+                        ? changeSnapshot.snapshotId()
+                        : Constants.INVALID_SNAPSHOT_ID));
+      }
+    }
+    tableFileScanHelper.withPartitionFilter(Expressions.alwaysTrue());
+
+    return tableFileScanHelper;
+  }
+
+  private Map<String, PartitionEvaluator> initPartitionPlans(
+      TableFileScanHelper tableFileScanHelper, OptimizingConfig config) {
+    Map<String, PartitionEvaluator> partitionPlanMap = Maps.newHashMap();
+
+    long count = 0;
+    try (CloseableIterable<TableFileScanHelper.FileScanResult> results =
+        tableFileScanHelper.scan()) {
+      for (TableFileScanHelper.FileScanResult fileScanResult : results) {
+        PartitionSpec partitionSpec = 
tableFileScanHelper.getSpec(fileScanResult.file().specId());
+        StructLike partition = fileScanResult.file().partition();
+        String partitionPath = partitionSpec.partitionToPath(partition);
+        PartitionEvaluator evaluator =
+            partitionPlanMap.computeIfAbsent(
+                partitionPath,
+                ignore -> buildEvaluator(Pair.of(partitionSpec.specId(), 
partition), config));
+        evaluator.addFile(fileScanResult.file(), fileScanResult.deleteFiles());
+        count++;
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+    return partitionPlanMap;
+  }
+
+  private PartitionEvaluator buildEvaluator(
+      Pair<Integer, StructLike> partition, OptimizingConfig config) {
+    if (getMixedTable().isUnkeyedTable()) {
+      return new CommonPartitionEvaluator(
+          ServerTableIdentifier.of(TableTestHelper.TEST_TABLE_ID, 
TableFormat.ICEBERG),
+          config,
+          partition,
+          System.currentTimeMillis(),
+          0L,
+          0L,
+          0L);
+    } else {
+      Map<String, String> partitionProperties =
+          TablePropertyUtil.getPartitionProperties(getMixedTable(), 
partition.second());
+      return new MixedIcebergPartitionPlan.MixedIcebergPartitionEvaluator(
+          ServerTableIdentifier.of(TableTestHelper.TEST_TABLE_ID, 
TableFormat.MIXED_ICEBERG),
+          config,
+          partition,
+          partitionProperties,
+          System.currentTimeMillis(),
+          getMixedTable().isKeyedTable(),
+          0L,
+          0L,
+          0L);
+    }
+  }
+
+  @Test
+  public void test_setFileSizeMSETolerance() {
+    OptimizingConfig config = new OptimizingConfig();
+    Assert.assertEquals(0, config.getEvaluationMseTolerance());
+
+    config.setEvaluationMseTolerance(1000);
+    Assert.assertEquals(1000, config.getEvaluationMseTolerance());
+
+    config.setEvaluationMseTolerance(140000000);
+    Assert.assertEquals(140000000, config.getEvaluationMseTolerance());
+  }
+
+  @Test
+  public void testBasicStatsAccept() {
+    MixedAndIcebergTableStatsProvider.BasicFileStats stats =
+        new MixedAndIcebergTableStatsProvider.BasicFileStats();
+    // Initial state should be zeros
+    Assert.assertEquals(0, stats.deleteFileCnt);
+    Assert.assertEquals(0, stats.dataFileCnt);
+    Assert.assertEquals(0, stats.totalFileSize);
+    // Create first summary map
+    Map<String, String> summary1 = new HashMap<>();
+    summary1.put("total-delete-files", "5");
+    summary1.put("total-data-files", "10");
+    summary1.put("total-files-size", "1024");
+    stats.accept(summary1);
+    Assert.assertEquals(5, stats.deleteFileCnt);
+    Assert.assertEquals(10, stats.dataFileCnt);
+    Assert.assertEquals(1024, stats.totalFileSize);
+    // Create second summary map to test accumulation
+    Map<String, String> summary2 = new HashMap<>();
+    summary2.put("total-delete-files", "3");
+    summary2.put("total-data-files", "7");
+    summary2.put("total-files-size", "2048");
+    stats.accept(summary2);
+    // Values should be accumulated
+    Assert.assertEquals(8, stats.deleteFileCnt); // 5 + 3
+    Assert.assertEquals(17, stats.dataFileCnt); // 10 + 7
+    Assert.assertEquals(3072, stats.totalFileSize); // 1024 + 2048
+  }
+
+  @Test
+  public void testBasicTableStatsAcceptWithMissingProperties() {
+    MixedAndIcebergTableStatsProvider.BasicFileStats stats =
+        new MixedAndIcebergTableStatsProvider.BasicFileStats();
+    // Summary map with missing properties should use default values
+    Map<String, String> summary = new HashMap<>();
+    // Only provide one property, others should default to 0
+    summary.put("total-data-files", "15");
+    stats.accept(summary);
+    Assert.assertEquals(0, stats.deleteFileCnt); // default value
+    Assert.assertEquals(15, stats.dataFileCnt);
+    Assert.assertEquals(0, stats.totalFileSize); // default value
+  }
+
+  @Test
+  public void testBasicTableStatsAcceptWithInvalidValues() {
+    MixedAndIcebergTableStatsProvider.BasicFileStats stats =
+        new MixedAndIcebergTableStatsProvider.BasicFileStats();
+    // Summary map with invalid numeric values should use default values
+    Map<String, String> summary = new HashMap<>();
+    summary.put("total-delete-files", "invalid");
+    summary.put("total-data-files", "20");
+    summary.put("total-files-size", "not-a-number");
+
+    // Invalid values should throw NumberFormatException
+    Assert.assertThrows(NumberFormatException.class, () -> 
stats.accept(summary));
+  }
+
+  OptimizingConfig getDefaultOptimizingConfig() {
+    return new OptimizingConfig()
+        .setEnabled(TableProperties.ENABLE_SELF_OPTIMIZING_DEFAULT)
+        
.setAllowPartialCommit(TableProperties.SELF_OPTIMIZING_ALLOW_PARTIAL_COMMIT_DEFAULT)
+        
.setMaxExecuteRetryCount(TableProperties.SELF_OPTIMIZING_EXECUTE_RETRY_NUMBER_DEFAULT)
+        .setOptimizerGroup(TableProperties.SELF_OPTIMIZING_GROUP_DEFAULT)
+        
.setFragmentRatio(TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT)
+        
.setMinTargetSizeRatio(TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO_DEFAULT)
+        .setMaxFileCount(TableProperties.SELF_OPTIMIZING_MAX_FILE_CNT_DEFAULT)
+        .setOpenFileCost(TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT)
+        .setTargetSize(TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT)
+        .setMaxTaskSize(TableProperties.SELF_OPTIMIZING_MAX_TASK_SIZE_DEFAULT)
+        .setTargetQuota(TableProperties.SELF_OPTIMIZING_QUOTA_DEFAULT)
+        
.setMinorLeastFileCount(TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT_DEFAULT)
+        
.setMinorLeastInterval(TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_INTERVAL_DEFAULT)
+        .setMajorDuplicateRatio(
+            
TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO_DEFAULT)
+        
.setFullTriggerInterval(TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL_DEFAULT)
+        
.setFullRewriteAllFiles(TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT)
+        .setFilter(TableProperties.SELF_OPTIMIZING_FILTER_DEFAULT)
+        .setBaseHashBucket(TableProperties.BASE_FILE_INDEX_HASH_BUCKET_DEFAULT)
+        .setBaseRefreshInterval(TableProperties.BASE_REFRESH_INTERVAL_DEFAULT)
+        
.setHiveRefreshInterval(HiveTableProperties.REFRESH_HIVE_INTERVAL_DEFAULT)
+        
.setMinPlanInterval(TableProperties.SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT)
+        .setEvaluationFallbackInterval(
+            
TableProperties.SELF_OPTIMIZING_EVALUATION_FALLBACK_INTERVAL_DEFAULT)
+        .setEvaluationMseTolerance(
+            
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT);
+  }
+}
diff --git 
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingEvaluator.java
 
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingEvaluator.java
index fdee3d7c4..8ff159d96 100644
--- 
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingEvaluator.java
+++ 
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingEvaluator.java
@@ -38,7 +38,8 @@ public class MixedHiveOptimizingEvaluator extends 
MixedIcebergOptimizingEvaluato
       TableSnapshot currentSnapshot,
       int maxPendingPartitions,
       long lastMinorOptimizingTime,
-      long lastFullOptimizingTime) {
+      long lastFullOptimizingTime,
+      long lastMajorOptimizingTime) {
     super(
         identifier,
         config,
@@ -46,7 +47,8 @@ public class MixedHiveOptimizingEvaluator extends 
MixedIcebergOptimizingEvaluato
         currentSnapshot,
         maxPendingPartitions,
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
   }
 
   @Override
@@ -62,6 +64,7 @@ public class MixedHiveOptimizingEvaluator extends 
MixedIcebergOptimizingEvaluato
         System.currentTimeMillis(),
         mixedTable.isKeyedTable(),
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
   }
 }
diff --git 
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingPlanner.java
 
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingPlanner.java
index aaf6f648a..68a03cb0d 100644
--- 
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingPlanner.java
+++ 
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingPlanner.java
@@ -42,7 +42,8 @@ public class MixedHiveOptimizingPlanner extends 
AbstractOptimizingPlanner {
       double availableCore,
       long maxInputSizePerThread,
       long lastMinorOptimizingTime,
-      long lastFullOptimizingTime) {
+      long lastFullOptimizingTime,
+      long lastMajorOptimizingTime) {
     super(
         identifier,
         config,
@@ -53,7 +54,8 @@ public class MixedHiveOptimizingPlanner extends 
AbstractOptimizingPlanner {
         availableCore,
         maxInputSizePerThread,
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
     this.hiveLocation = (((SupportHive) mixedTable).hiveLocation());
   }
 
@@ -67,6 +69,7 @@ public class MixedHiveOptimizingPlanner extends 
AbstractOptimizingPlanner {
         hiveLocation,
         planTime,
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
   }
 }
diff --git 
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
 
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
index ac17e565c..760c441bf 100644
--- 
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
+++ 
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
@@ -52,7 +52,8 @@ public class MixedHivePartitionPlan extends 
MixedIcebergPartitionPlan {
       String hiveLocation,
       long planTime,
       long lastMinorOptimizingTime,
-      long lastFullOptimizingTime) {
+      long lastFullOptimizingTime,
+      long lastMajorOptimizingTime) {
     super(
         identifier,
         table,
@@ -60,7 +61,8 @@ public class MixedHivePartitionPlan extends 
MixedIcebergPartitionPlan {
         partition,
         planTime,
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
     this.hiveLocation = hiveLocation;
   }
 
@@ -112,7 +114,8 @@ public class MixedHivePartitionPlan extends 
MixedIcebergPartitionPlan {
         planTime,
         isKeyedTable(),
         lastMinorOptimizingTime,
-        lastFullOptimizingTime);
+        lastFullOptimizingTime,
+        lastMajorOptimizingTime);
   }
 
   @Override
@@ -154,7 +157,8 @@ public class MixedHivePartitionPlan extends 
MixedIcebergPartitionPlan {
         long planTime,
         boolean keyedTable,
         long lastMinorOptimizingTime,
-        long lastFullOptimizingTime) {
+        long lastFullOptimizingTime,
+        long lastMajorOptimizingTime) {
       super(
           identifier,
           config,
@@ -163,7 +167,8 @@ public class MixedHivePartitionPlan extends 
MixedIcebergPartitionPlan {
           planTime,
           keyedTable,
           lastMinorOptimizingTime,
-          lastFullOptimizingTime);
+          lastFullOptimizingTime,
+          lastMajorOptimizingTime);
       this.hiveLocation = hiveLocation;
       String optimizedTime =
           
partitionProperties.get(HiveTableProperties.PARTITION_PROPERTIES_KEY_TRANSIENT_TIME);

Reply via email to