This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new db33a52b0 [AMORO-3316]: Decouple planer with table runtime (#3318)
db33a52b0 is described below

commit db33a52b0f9fe66f259bbea84c2e03cee39f63cf
Author: baiyangtx <[email protected]>
AuthorDate: Mon Nov 4 11:38:53 2024 +0800

    [AMORO-3316]: Decouple planer with table runtime (#3318)
    
    * Remove table-runtime from planner and evaluator
    
    * Fix Compile error
    
    * Fix UT Compile error
    
    * fix ut compile error
    
    * spotless
    
    * ut error
---
 .../amoro/server/optimizing/OptimizingQueue.java   |   8 +-
 .../optimizing/plan/AbstractPartitionPlan.java     |  27 +++---
 .../optimizing/plan/CommonPartitionEvaluator.java  |  33 ++++---
 .../optimizing/plan/IcebergPartitionPlan.java      |  19 +++-
 .../optimizing/plan/MixedHivePartitionPlan.java    |  46 +++++++--
 .../optimizing/plan/MixedIcebergPartitionPlan.java |  38 ++++++--
 .../optimizing/plan/OptimizingEvaluator.java       |  61 +++++++++---
 .../server/optimizing/plan/OptimizingPlanner.java  | 105 ++++++++++++++++++---
 .../executor/TableRuntimeRefreshExecutor.java      |   2 +-
 .../optimizing/flow/CompleteOptimizingFlow.java    |   2 +-
 .../plan/TestHiveKeyedPartitionPlan.java           |   7 +-
 .../plan/TestHiveUnkeyedPartitionPlan.java         |   7 +-
 .../optimizing/plan/TestIcebergPartitionPlan.java  |  10 +-
 .../optimizing/plan/TestKeyedPartitionPlan.java    |   8 +-
 .../optimizing/plan/TestOptimizingEvaluator.java   |  12 ++-
 .../optimizing/plan/TestOptimizingPlanner.java     |   2 +-
 .../optimizing/plan/TestUnkeyedPartitionPlan.java  |   8 +-
 17 files changed, 306 insertions(+), 89 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index 710ec729d..c024908a7 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -264,13 +264,13 @@ public class OptimizingQueue extends PersistentBase {
     try {
       AmoroTable<?> table = 
tableManager.loadTable(tableRuntime.getTableIdentifier());
       OptimizingPlanner planner =
-          new OptimizingPlanner(
+          OptimizingPlanner.createOptimizingPlanner(
               tableRuntime.refresh(table),
               (MixedTable) table.originalTable(),
               getAvailableCore(),
               maxInputSizePerThread());
       if (planner.isNecessary()) {
-        return new TableOptimizingProcess(planner);
+        return new TableOptimizingProcess(planner, tableRuntime);
       } else {
         tableRuntime.completeEmptyProcess();
         return null;
@@ -371,9 +371,9 @@ public class OptimizingQueue extends PersistentBase {
       }
     }
 
-    public TableOptimizingProcess(OptimizingPlanner planner) {
+    public TableOptimizingProcess(OptimizingPlanner planner, TableRuntime 
tableRuntime) {
       processId = planner.getProcessId();
-      tableRuntime = planner.getTableRuntime();
+      this.tableRuntime = tableRuntime;
       optimizingType = planner.getOptimizingType();
       planTime = planner.getPlanTime();
       targetSnapshotId = planner.getTargetSnapshotId();
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java
index 262270281..27cebc6d0 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java
@@ -18,12 +18,12 @@
 
 package org.apache.amoro.server.optimizing.plan;
 
+import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.config.OptimizingConfig;
 import org.apache.amoro.optimizing.OptimizingInputProperties;
 import org.apache.amoro.optimizing.RewriteFilesInput;
 import org.apache.amoro.server.optimizing.OptimizingType;
 import org.apache.amoro.server.optimizing.RewriteStageTask;
-import org.apache.amoro.server.table.TableRuntime;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -48,7 +48,9 @@ public abstract class AbstractPartitionPlan implements 
PartitionEvaluator {
 
   protected final Pair<Integer, StructLike> partition;
   protected final OptimizingConfig config;
-  protected final TableRuntime tableRuntime;
+  protected final ServerTableIdentifier identifier;
+  protected final long lastMinorOptimizingTime;
+  protected final long lastFullOptimizingTime;
   private CommonPartitionEvaluator evaluator;
   private TaskSplitter taskSplitter;
   protected MixedTable tableObject;
@@ -74,15 +76,20 @@ public abstract class AbstractPartitionPlan implements 
PartitionEvaluator {
   protected final Set<String> reservedDeleteFiles = Sets.newHashSet();
 
   public AbstractPartitionPlan(
-      TableRuntime tableRuntime,
+      ServerTableIdentifier identifier,
       MixedTable table,
+      OptimizingConfig config,
       Pair<Integer, StructLike> partition,
-      long planTime) {
+      long planTime,
+      long lastMinorOptimizingTime,
+      long lastFullOptimizingTime) {
+    this.identifier = identifier;
     this.partition = partition;
     this.tableObject = table;
-    this.config = tableRuntime.getOptimizingConfig();
-    this.tableRuntime = tableRuntime;
+    this.config = config;
     this.planTime = planTime;
+    this.lastMinorOptimizingTime = lastMinorOptimizingTime;
+    this.lastFullOptimizingTime = lastFullOptimizingTime;
   }
 
   @Override
@@ -98,7 +105,8 @@ public abstract class AbstractPartitionPlan implements 
PartitionEvaluator {
   }
 
   protected CommonPartitionEvaluator buildEvaluator() {
-    return new CommonPartitionEvaluator(tableRuntime, partition, planTime);
+    return new CommonPartitionEvaluator(
+        identifier, config, partition, planTime, lastMinorOptimizingTime, 
lastFullOptimizingTime);
   }
 
   @Override
@@ -317,10 +325,7 @@ public abstract class AbstractPartitionPlan implements 
PartitionEvaluator {
           MixedTableUtil.getMixedTablePartitionSpecById(tableObject, 
partition.first());
       String partitionPath = spec.partitionToPath(partition.second());
       return new RewriteStageTask(
-          tableRuntime.getTableIdentifier().getId(),
-          partitionPath,
-          input,
-          properties.getProperties());
+          identifier.getId(), partitionPath, input, 
properties.getProperties());
     }
   }
 
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
index 8038e9761..1c48718a7 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
@@ -18,9 +18,9 @@
 
 package org.apache.amoro.server.optimizing.plan;
 
+import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.config.OptimizingConfig;
 import org.apache.amoro.server.optimizing.OptimizingType;
-import org.apache.amoro.server.table.TableRuntime;
 import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -40,10 +40,12 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
   private static final Logger LOG = 
LoggerFactory.getLogger(CommonPartitionEvaluator.class);
 
   private final Set<String> deleteFileSet = Sets.newHashSet();
-  protected final TableRuntime tableRuntime;
 
   private final Pair<Integer, StructLike> partition;
+  protected final ServerTableIdentifier identifier;
   protected final OptimizingConfig config;
+  protected final long lastFullOptimizingTime;
+  protected final long lastMinorOptimizingTime;
   protected final long fragmentSize;
   protected final long minTargetSize;
   protected final long planTime;
@@ -83,10 +85,15 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
   private String name;
 
   public CommonPartitionEvaluator(
-      TableRuntime tableRuntime, Pair<Integer, StructLike> partition, long 
planTime) {
+      ServerTableIdentifier identifier,
+      OptimizingConfig config,
+      Pair<Integer, StructLike> partition,
+      long planTime,
+      long lastMinorOptimizingTime,
+      long lastFullOptimizingTime) {
+    this.identifier = identifier;
+    this.config = config;
     this.partition = partition;
-    this.tableRuntime = tableRuntime;
-    this.config = tableRuntime.getOptimizingConfig();
     this.fragmentSize = config.getTargetSize() / config.getFragmentRatio();
     this.minTargetSize = (long) (config.getTargetSize() * 
config.getMinTargetSizeRatio());
     if (minTargetSize > config.getTargetSize() - fragmentSize) {
@@ -95,10 +102,11 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
               + "the another merge file.");
     }
     this.planTime = planTime;
+    this.lastMinorOptimizingTime = lastMinorOptimizingTime;
+    this.lastFullOptimizingTime = lastFullOptimizingTime;
     this.reachFullInterval =
         config.getFullTriggerInterval() >= 0
-            && planTime - tableRuntime.getLastFullOptimizingTime()
-                > config.getFullTriggerInterval();
+            && planTime - lastFullOptimizingTime > 
config.getFullTriggerInterval();
   }
 
   @Override
@@ -343,7 +351,7 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
 
   protected boolean reachMinorInterval() {
     return config.getMinorLeastInterval() >= 0
-        && planTime - tableRuntime.getLastMinorOptimizingTime() > 
config.getMinorLeastInterval();
+        && planTime - lastMinorOptimizingTime > config.getMinorLeastInterval();
   }
 
   protected boolean reachFullInterval() {
@@ -363,9 +371,7 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
 
   protected String name() {
     if (name == null) {
-      name =
-          String.format(
-              "partition %s of %s", partition, 
tableRuntime.getTableIdentifier().toString());
+      name = String.format("partition %s of %s", partition, 
identifier.toString());
     }
     return name;
   }
@@ -509,9 +515,8 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
         .add("fragmentSize", fragmentSize)
         .add("undersizedSegmentSize", minTargetSize)
         .add("planTime", planTime)
-        .add("lastMinorOptimizeTime", 
tableRuntime.getLastMinorOptimizingTime())
-        .add("lastFullOptimizeTime", tableRuntime.getLastFullOptimizingTime())
-        .add("lastFullOptimizeTime", tableRuntime.getLastFullOptimizingTime())
+        .add("lastMinorOptimizeTime", lastMinorOptimizingTime)
+        .add("lastFullOptimizeTime", lastFullOptimizingTime)
         .add("fragmentFileCount", fragmentFileCount)
         .add("fragmentFileSize", fragmentFileSize)
         .add("fragmentFileRecords", fragmentFileRecords)
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/IcebergPartitionPlan.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/IcebergPartitionPlan.java
index 590efa190..239584f85 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/IcebergPartitionPlan.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/IcebergPartitionPlan.java
@@ -18,9 +18,10 @@
 
 package org.apache.amoro.server.optimizing.plan;
 
+import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.config.OptimizingConfig;
 import org.apache.amoro.optimizing.IcebergRewriteExecutorFactory;
 import org.apache.amoro.optimizing.OptimizingInputProperties;
-import org.apache.amoro.server.table.TableRuntime;
 import org.apache.amoro.table.MixedTable;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.util.Pair;
@@ -31,11 +32,21 @@ import java.util.stream.Collectors;
 public class IcebergPartitionPlan extends AbstractPartitionPlan {
 
   protected IcebergPartitionPlan(
-      TableRuntime tableRuntime,
+      ServerTableIdentifier identifier,
+      OptimizingConfig config,
       MixedTable table,
       Pair<Integer, StructLike> partition,
-      long planTime) {
-    super(tableRuntime, table, partition, planTime);
+      long planTime,
+      long lastMinorOptimizingTime,
+      long lastFullOptimizingTime) {
+    super(
+        identifier,
+        table,
+        config,
+        partition,
+        planTime,
+        lastMinorOptimizingTime,
+        lastFullOptimizingTime);
   }
 
   @Override
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedHivePartitionPlan.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedHivePartitionPlan.java
index dba9672e1..a3b9af7bc 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedHivePartitionPlan.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedHivePartitionPlan.java
@@ -18,12 +18,13 @@
 
 package org.apache.amoro.server.optimizing.plan;
 
+import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.config.OptimizingConfig;
 import org.apache.amoro.data.DataFileType;
 import org.apache.amoro.data.PrimaryKeyedFile;
 import org.apache.amoro.hive.utils.HiveTableUtil;
 import org.apache.amoro.optimizing.OptimizingInputProperties;
 import org.apache.amoro.properties.HiveTableProperties;
-import org.apache.amoro.server.table.TableRuntime;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.table.MixedTable;
 import org.apache.iceberg.ContentFile;
@@ -40,12 +41,22 @@ public class MixedHivePartitionPlan extends 
MixedIcebergPartitionPlan {
   private String customHiveSubdirectory;
 
   public MixedHivePartitionPlan(
-      TableRuntime tableRuntime,
+      ServerTableIdentifier identifier,
       MixedTable table,
+      OptimizingConfig config,
       Pair<Integer, StructLike> partition,
       String hiveLocation,
-      long planTime) {
-    super(tableRuntime, table, partition, planTime);
+      long planTime,
+      long lastMinorOptimizingTime,
+      long lastFullOptimizingTime) {
+    super(
+        identifier,
+        table,
+        config,
+        partition,
+        planTime,
+        lastMinorOptimizingTime,
+        lastFullOptimizingTime);
     this.hiveLocation = hiveLocation;
   }
 
@@ -89,7 +100,15 @@ public class MixedHivePartitionPlan extends 
MixedIcebergPartitionPlan {
   @Override
   protected CommonPartitionEvaluator buildEvaluator() {
     return new MixedHivePartitionEvaluator(
-        tableRuntime, partition, partitionProperties, hiveLocation, planTime, 
isKeyedTable());
+        identifier,
+        config,
+        partition,
+        partitionProperties,
+        hiveLocation,
+        planTime,
+        isKeyedTable(),
+        lastMinorOptimizingTime,
+        lastFullOptimizingTime);
   }
 
   @Override
@@ -121,13 +140,24 @@ public class MixedHivePartitionPlan extends 
MixedIcebergPartitionPlan {
     private boolean filesNotInHiveLocation = false;
 
     public MixedHivePartitionEvaluator(
-        TableRuntime tableRuntime,
+        ServerTableIdentifier identifier,
+        OptimizingConfig config,
         Pair<Integer, StructLike> partition,
         Map<String, String> partitionProperties,
         String hiveLocation,
         long planTime,
-        boolean keyedTable) {
-      super(tableRuntime, partition, partitionProperties, planTime, 
keyedTable);
+        boolean keyedTable,
+        long lastMinorOptimizingTime,
+        long lastFullOptimizingTime) {
+      super(
+          identifier,
+          config,
+          partition,
+          partitionProperties,
+          planTime,
+          keyedTable,
+          lastMinorOptimizingTime,
+          lastFullOptimizingTime);
       this.hiveLocation = hiveLocation;
       String optimizedTime =
           
partitionProperties.get(HiveTableProperties.PARTITION_PROPERTIES_KEY_TRANSIENT_TIME);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedIcebergPartitionPlan.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedIcebergPartitionPlan.java
index 9892b3d92..282214fff 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedIcebergPartitionPlan.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedIcebergPartitionPlan.java
@@ -18,12 +18,13 @@
 
 package org.apache.amoro.server.optimizing.plan;
 
+import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.config.OptimizingConfig;
 import org.apache.amoro.data.DataFileType;
 import org.apache.amoro.data.DataTreeNode;
 import org.apache.amoro.data.PrimaryKeyedFile;
 import org.apache.amoro.hive.optimizing.MixFormatRewriteExecutorFactory;
 import org.apache.amoro.optimizing.OptimizingInputProperties;
-import org.apache.amoro.server.table.TableRuntime;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -49,11 +50,21 @@ public class MixedIcebergPartitionPlan extends 
AbstractPartitionPlan {
   protected final Map<String, String> partitionProperties;
 
   public MixedIcebergPartitionPlan(
-      TableRuntime tableRuntime,
+      ServerTableIdentifier identifier,
       MixedTable table,
+      OptimizingConfig config,
       Pair<Integer, StructLike> partition,
-      long planTime) {
-    super(tableRuntime, table, partition, planTime);
+      long planTime,
+      long lastMinorOptimizingTime,
+      long lastFullOptimizingTime) {
+    super(
+        identifier,
+        table,
+        config,
+        partition,
+        planTime,
+        lastMinorOptimizingTime,
+        lastFullOptimizingTime);
     this.partitionProperties = TablePropertyUtil.getPartitionProperties(table, 
partition.second());
   }
 
@@ -101,7 +112,14 @@ public class MixedIcebergPartitionPlan extends 
AbstractPartitionPlan {
   @Override
   protected CommonPartitionEvaluator buildEvaluator() {
     return new MixedIcebergPartitionEvaluator(
-        tableRuntime, partition, partitionProperties, planTime, 
isKeyedTable());
+        identifier,
+        config,
+        partition,
+        partitionProperties,
+        planTime,
+        isKeyedTable(),
+        lastMinorOptimizingTime,
+        lastFullOptimizingTime);
   }
 
   protected static class MixedIcebergPartitionEvaluator extends 
CommonPartitionEvaluator {
@@ -110,12 +128,16 @@ public class MixedIcebergPartitionPlan extends 
AbstractPartitionPlan {
     private final boolean reachBaseRefreshInterval;
 
     public MixedIcebergPartitionEvaluator(
-        TableRuntime tableRuntime,
+        ServerTableIdentifier identifier,
+        OptimizingConfig config,
         Pair<Integer, StructLike> partition,
         Map<String, String> partitionProperties,
         long planTime,
-        boolean keyedTable) {
-      super(tableRuntime, partition, planTime);
+        boolean keyedTable,
+        long lastMinorOptimizingTime,
+        long lastFullOptimizingTime) {
+      super(
+          identifier, config, partition, planTime, lastMinorOptimizingTime, 
lastFullOptimizingTime);
       this.keyedTable = keyedTable;
       String optimizedTime = 
partitionProperties.get(TableProperties.PARTITION_BASE_OPTIMIZED_TIME);
       long lastBaseOptimizedTime = optimizedTime == null ? 0 : 
Long.parseLong(optimizedTime);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
index f404592fc..5914d57bb 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
@@ -18,7 +18,9 @@
 
 package org.apache.amoro.server.optimizing.plan;
 
+import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
+import org.apache.amoro.config.OptimizingConfig;
 import org.apache.amoro.hive.table.SupportHive;
 import org.apache.amoro.hive.utils.TableTypeUtil;
 import org.apache.amoro.server.optimizing.scan.IcebergTableFileScanHelper;
@@ -59,25 +61,44 @@ public class OptimizingEvaluator {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(OptimizingEvaluator.class);
 
+  protected final ServerTableIdentifier identifier;
+  protected final OptimizingConfig config;
   protected final MixedTable mixedTable;
-  protected final TableRuntime tableRuntime;
   protected final TableSnapshot currentSnapshot;
+  protected final long lastFullOptimizingTime;
+  protected final long lastMinorOptimizingTime;
   protected final int maxPendingPartitions;
   protected boolean isInitialized = false;
-
   protected Map<String, PartitionEvaluator> needOptimizingPlanMap = 
Maps.newHashMap();
   protected Map<String, PartitionEvaluator> partitionPlanMap = 
Maps.newHashMap();
 
-  public OptimizingEvaluator(
+  public static OptimizingEvaluator createOptimizingEvaluator(
       TableRuntime tableRuntime, MixedTable table, int maxPendingPartitions) {
-    this.tableRuntime = tableRuntime;
-    this.mixedTable = table;
-    this.currentSnapshot = IcebergTableUtil.getSnapshot(table, tableRuntime);
-    this.maxPendingPartitions = maxPendingPartitions;
+    return new OptimizingEvaluator(
+        tableRuntime.getTableIdentifier(),
+        tableRuntime.getOptimizingConfig(),
+        table,
+        IcebergTableUtil.getSnapshot(table, tableRuntime),
+        maxPendingPartitions,
+        tableRuntime.getLastMinorOptimizingTime(),
+        tableRuntime.getLastFullOptimizingTime());
   }
 
-  public TableRuntime getTableRuntime() {
-    return tableRuntime;
+  public OptimizingEvaluator(
+      ServerTableIdentifier identifier,
+      OptimizingConfig config,
+      MixedTable table,
+      TableSnapshot currentSnapshot,
+      int maxPendingPartitions,
+      long lastMinorOptimizingTime,
+      long lastFullOptimizingTime) {
+    this.identifier = identifier;
+    this.config = config;
+    this.mixedTable = table;
+    this.currentSnapshot = currentSnapshot;
+    this.maxPendingPartitions = maxPendingPartitions;
+    this.lastFullOptimizingTime = lastFullOptimizingTime;
+    this.lastMinorOptimizingTime = lastMinorOptimizingTime;
   }
 
   protected void initEvaluator() {
@@ -150,25 +171,37 @@ public class OptimizingEvaluator {
 
   protected PartitionEvaluator buildEvaluator(Pair<Integer, StructLike> 
partition) {
     if (TableFormat.ICEBERG.equals(mixedTable.format())) {
-      return new CommonPartitionEvaluator(tableRuntime, partition, 
System.currentTimeMillis());
+      return new CommonPartitionEvaluator(
+          identifier,
+          config,
+          partition,
+          System.currentTimeMillis(),
+          lastMinorOptimizingTime,
+          lastFullOptimizingTime);
     } else {
       Map<String, String> partitionProperties = partitionProperties(partition);
       if (TableTypeUtil.isHive(mixedTable)) {
         String hiveLocation = (((SupportHive) mixedTable).hiveLocation());
         return new MixedHivePartitionPlan.MixedHivePartitionEvaluator(
-            tableRuntime,
+            identifier,
+            config,
             partition,
             partitionProperties,
             hiveLocation,
             System.currentTimeMillis(),
-            mixedTable.isKeyedTable());
+            mixedTable.isKeyedTable(),
+            lastMinorOptimizingTime,
+            lastFullOptimizingTime);
       } else {
         return new MixedIcebergPartitionPlan.MixedIcebergPartitionEvaluator(
-            tableRuntime,
+            identifier,
+            config,
             partition,
             partitionProperties,
             System.currentTimeMillis(),
-            mixedTable.isKeyedTable());
+            mixedTable.isKeyedTable(),
+            lastMinorOptimizingTime,
+            lastFullOptimizingTime);
       }
     }
   }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
index cf6fc9e71..13bb8f1d7 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
@@ -18,7 +18,9 @@
 
 package org.apache.amoro.server.optimizing.plan;
 
+import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
+import org.apache.amoro.config.OptimizingConfig;
 import org.apache.amoro.hive.table.SupportHive;
 import org.apache.amoro.hive.utils.TableTypeUtil;
 import org.apache.amoro.server.AmoroServiceConstants;
@@ -26,6 +28,8 @@ import org.apache.amoro.server.optimizing.OptimizingType;
 import org.apache.amoro.server.optimizing.RewriteStageTask;
 import org.apache.amoro.server.table.KeyedTableSnapshot;
 import org.apache.amoro.server.table.TableRuntime;
+import org.apache.amoro.server.table.TableSnapshot;
+import org.apache.amoro.server.utils.IcebergTableUtil;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.table.MixedTable;
 import org.apache.amoro.utils.ExpressionUtil;
@@ -49,24 +53,21 @@ public class OptimizingPlanner extends OptimizingEvaluator {
   private static final Logger LOG = 
LoggerFactory.getLogger(OptimizingPlanner.class);
 
   private final Expression partitionFilter;
-
   protected long processId;
   private final double availableCore;
   private final long planTime;
   private OptimizingType optimizingType;
   private final PartitionPlannerFactory partitionPlannerFactory;
   private List<RewriteStageTask> tasks;
-
   private List<AbstractPartitionPlan> actualPartitionPlans;
   private final long maxInputSizePerThread;
 
-  public OptimizingPlanner(
+  public static OptimizingPlanner createOptimizingPlanner(
       TableRuntime tableRuntime,
       MixedTable table,
       double availableCore,
       long maxInputSizePerThread) {
-    super(tableRuntime, table, Integer.MAX_VALUE);
-    this.partitionFilter =
+    Expression partitionFilter =
         tableRuntime.getPendingInput() == null
             ? Expressions.alwaysTrue()
             : 
tableRuntime.getPendingInput().getPartitions().entrySet().stream()
@@ -76,10 +77,52 @@ public class OptimizingPlanner extends OptimizingEvaluator {
                             table, entry.getKey(), entry.getValue()))
                 .reduce(Expressions::or)
                 .orElse(Expressions.alwaysTrue());
+    long planTime = System.currentTimeMillis();
+
+    return new OptimizingPlanner(
+        tableRuntime.getTableIdentifier(),
+        tableRuntime.getOptimizingConfig(),
+        table,
+        IcebergTableUtil.getSnapshot(table, tableRuntime),
+        partitionFilter,
+        Math.max(tableRuntime.getNewestProcessId() + 1, planTime),
+        availableCore,
+        maxInputSizePerThread,
+        tableRuntime.getLastMinorOptimizingTime(),
+        tableRuntime.getLastFullOptimizingTime());
+  }
+
+  public OptimizingPlanner(
+      ServerTableIdentifier identifier,
+      OptimizingConfig config,
+      MixedTable table,
+      TableSnapshot snapshot,
+      Expression partitionFilter,
+      long processId,
+      double availableCore,
+      long maxInputSizePerThread,
+      long lastMinorOptimizingTime,
+      long lastFullOptimizingTime) {
+    super(
+        identifier,
+        config,
+        table,
+        snapshot,
+        Integer.MAX_VALUE,
+        lastMinorOptimizingTime,
+        lastFullOptimizingTime);
+    this.partitionFilter = partitionFilter;
     this.availableCore = availableCore;
     this.planTime = System.currentTimeMillis();
-    this.processId = Math.max(tableRuntime.getNewestProcessId() + 1, planTime);
-    this.partitionPlannerFactory = new PartitionPlannerFactory(mixedTable, 
tableRuntime, planTime);
+    this.processId = processId;
+    this.partitionPlannerFactory =
+        new PartitionPlannerFactory(
+            identifier,
+            config,
+            mixedTable,
+            planTime,
+            lastMinorOptimizingTime,
+            lastFullOptimizingTime);
     this.maxInputSizePerThread = maxInputSizePerThread;
   }
 
@@ -150,7 +193,7 @@ public class OptimizingPlanner extends OptimizingEvaluator {
       initEvaluator();
     }
     if (!super.isNecessary()) {
-      LOG.debug("Table {} skip planning", tableRuntime.getTableIdentifier());
+      LOG.debug("Table {} skip planning", identifier);
       return cacheAndReturnTasks(Collections.emptyList());
     }
 
@@ -188,7 +231,7 @@ public class OptimizingPlanner extends OptimizingEvaluator {
     long endTime = System.nanoTime();
     LOG.info(
         "{} finish plan, type = {}, get {} tasks, cost {} ns, {} ms 
maxInputSize {} actualInputSize {}",
-        tableRuntime.getTableIdentifier(),
+        identifier,
         getOptimizingType(),
         tasks.size(),
         endTime - startTime,
@@ -216,16 +259,27 @@ public class OptimizingPlanner extends 
OptimizingEvaluator {
   }
 
   private static class PartitionPlannerFactory {
+    protected final OptimizingConfig config;
+    protected final ServerTableIdentifier identifier;
+    protected final long lastMinorOptimizingTime;
+    protected final long lastFullOptimizingTime;
     private final MixedTable mixedTable;
-    private final TableRuntime tableRuntime;
     private final String hiveLocation;
     private final long planTime;
 
     public PartitionPlannerFactory(
-        MixedTable mixedTable, TableRuntime tableRuntime, long planTime) {
+        ServerTableIdentifier identifier,
+        OptimizingConfig config,
+        MixedTable mixedTable,
+        long planTime,
+        long lastMinorOptimizingTime,
+        long lastFullOptimizingTime) {
+      this.identifier = identifier;
+      this.config = config;
       this.mixedTable = mixedTable;
-      this.tableRuntime = tableRuntime;
       this.planTime = planTime;
+      this.lastFullOptimizingTime = lastFullOptimizingTime;
+      this.lastMinorOptimizingTime = lastMinorOptimizingTime;
       if (TableTypeUtil.isHive(mixedTable)) {
         this.hiveLocation = (((SupportHive) mixedTable).hiveLocation());
       } else {
@@ -235,13 +289,34 @@ public class OptimizingPlanner extends 
OptimizingEvaluator {
 
     public PartitionEvaluator buildPartitionPlanner(Pair<Integer, StructLike> 
partition) {
       if (TableFormat.ICEBERG.equals(mixedTable.format())) {
-        return new IcebergPartitionPlan(tableRuntime, mixedTable, partition, 
planTime);
+        return new IcebergPartitionPlan(
+            identifier,
+            config,
+            mixedTable,
+            partition,
+            planTime,
+            lastMinorOptimizingTime,
+            lastFullOptimizingTime);
       } else {
         if (TableTypeUtil.isHive(mixedTable)) {
           return new MixedHivePartitionPlan(
-              tableRuntime, mixedTable, partition, hiveLocation, planTime);
+              identifier,
+              mixedTable,
+              config,
+              partition,
+              hiveLocation,
+              planTime,
+              lastMinorOptimizingTime,
+              lastFullOptimizingTime);
         } else {
-          return new MixedIcebergPartitionPlan(tableRuntime, mixedTable, 
partition, planTime);
+          return new MixedIcebergPartitionPlan(
+              identifier,
+              mixedTable,
+              config,
+              partition,
+              planTime,
+              lastMinorOptimizingTime,
+              lastFullOptimizingTime);
         }
       }
     }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
index b290323ab..36261a09b 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
@@ -53,7 +53,7 @@ public class TableRuntimeRefreshExecutor extends 
BaseTableExecutor {
   private void tryEvaluatingPendingInput(TableRuntime tableRuntime, MixedTable 
table) {
     if (tableRuntime.isOptimizingEnabled() && 
!tableRuntime.getOptimizingStatus().isProcessing()) {
       OptimizingEvaluator evaluator =
-          new OptimizingEvaluator(tableRuntime, table, maxPendingPartitions);
+          OptimizingEvaluator.createOptimizingEvaluator(tableRuntime, table, 
maxPendingPartitions);
       if (evaluator.isNecessary()) {
         OptimizingEvaluator.PendingInput pendingInput = 
evaluator.getOptimizingPendingInput();
         logger.debug(
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
index 41765bf5a..0a8ed6474 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
@@ -192,7 +192,7 @@ public class CompleteOptimizingFlow {
     Mockito.when(tableRuntime.getOptimizingConfig()).thenAnswer(f -> 
optimizingConfig());
     Mockito.when(tableRuntime.getTableIdentifier())
         .thenReturn(ServerTableIdentifier.of(1L, "a", "b", "c", 
table.format()));
-    return new OptimizingPlanner(
+    return OptimizingPlanner.createOptimizingPlanner(
         tableRuntime,
         table,
         availableCore,
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
index 82ee12a7c..b1cbfbd67 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
@@ -81,11 +81,14 @@ public class TestHiveKeyedPartitionPlan extends 
TestKeyedPartitionPlan {
     SupportHive hiveTable = (SupportHive) getMixedTable();
     String hiveLocation = hiveTable.hiveLocation();
     return new MixedHivePartitionPlan(
-        getTableRuntime(),
+        getTableRuntime().getTableIdentifier(),
         getMixedTable(),
+        getTableRuntime().getOptimizingConfig(),
         getPartition(),
         hiveLocation,
-        System.currentTimeMillis());
+        System.currentTimeMillis(),
+        getTableRuntime().getLastMinorOptimizingTime(),
+        getTableRuntime().getLastFullOptimizingTime());
   }
 
   @Test
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
index 6109486e7..23c2039f2 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
@@ -80,11 +80,14 @@ public class TestHiveUnkeyedPartitionPlan extends 
TestUnkeyedPartitionPlan {
     SupportHive hiveTable = (SupportHive) getMixedTable();
     String hiveLocation = hiveTable.hiveLocation();
     return new MixedHivePartitionPlan(
-        getTableRuntime(),
+        getTableRuntime().getTableIdentifier(),
         getMixedTable(),
+        getTableRuntime().getOptimizingConfig(),
         getPartition(),
         hiveLocation,
-        System.currentTimeMillis());
+        System.currentTimeMillis(),
+        getTableRuntime().getLastMinorOptimizingTime(),
+        getTableRuntime().getLastFullOptimizingTime());
   }
 
   @Test
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
index 857935e2b..98ae4a4fc 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
@@ -27,6 +27,7 @@ import 
org.apache.amoro.optimizing.IcebergRewriteExecutorFactory;
 import org.apache.amoro.optimizing.OptimizingInputProperties;
 import org.apache.amoro.server.optimizing.scan.IcebergTableFileScanHelper;
 import org.apache.amoro.server.optimizing.scan.TableFileScanHelper;
+import org.apache.amoro.server.table.TableRuntime;
 import org.apache.amoro.server.utils.IcebergTableUtil;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.junit.Test;
@@ -72,8 +73,15 @@ public class TestIcebergPartitionPlan extends 
TestUnkeyedPartitionPlan {
 
   @Override
   protected AbstractPartitionPlan getPartitionPlan() {
+    TableRuntime tableRuntime = getTableRuntime();
     return new IcebergPartitionPlan(
-        getTableRuntime(), getMixedTable(), getPartition(), 
System.currentTimeMillis());
+        tableRuntime.getTableIdentifier(),
+        tableRuntime.getOptimizingConfig(),
+        getMixedTable(),
+        getPartition(),
+        System.currentTimeMillis(),
+        tableRuntime.getLastMinorOptimizingTime(),
+        tableRuntime.getLastFullOptimizingTime());
   }
 
   @Override
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
index 6a169d7db..3b7509faf 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
@@ -210,7 +210,13 @@ public class TestKeyedPartitionPlan extends 
MixedTablePlanTestBase {
   @Override
   protected AbstractPartitionPlan getPartitionPlan() {
     return new MixedIcebergPartitionPlan(
-        getTableRuntime(), getMixedTable(), getPartition(), 
System.currentTimeMillis());
+        getTableRuntime().getTableIdentifier(),
+        getMixedTable(),
+        getTableRuntime().getOptimizingConfig(),
+        getPartition(),
+        System.currentTimeMillis(),
+        getTableRuntime().getLastMinorOptimizingTime(),
+        getTableRuntime().getLastFullOptimizingTime());
   }
 
   @Override
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
index 395aa653d..920de8f64 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
@@ -27,6 +27,8 @@ import 
org.apache.amoro.server.optimizing.OptimizingTestHelpers;
 import org.apache.amoro.server.optimizing.scan.KeyedTableFileScanHelper;
 import org.apache.amoro.server.optimizing.scan.TableFileScanHelper;
 import org.apache.amoro.server.optimizing.scan.UnkeyedTableFileScanHelper;
+import org.apache.amoro.server.table.TableSnapshot;
+import org.apache.amoro.server.utils.IcebergTableUtil;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -110,7 +112,15 @@ public class TestOptimizingEvaluator extends 
MixedTablePlanTestBase {
   }
 
   protected OptimizingEvaluator buildOptimizingEvaluator() {
-    return new OptimizingEvaluator(getTableRuntime(), getMixedTable(), 100);
+    TableSnapshot snapshot = IcebergTableUtil.getSnapshot(getMixedTable(), 
tableRuntime);
+    return new OptimizingEvaluator(
+        tableRuntime.getTableIdentifier(),
+        tableRuntime.getOptimizingConfig(),
+        getMixedTable(),
+        snapshot,
+        100,
+        tableRuntime.getLastMinorOptimizingTime(),
+        tableRuntime.getLastFullOptimizingTime());
   }
 
   protected void assertEmptyInput(OptimizingEvaluator.PendingInput input) {
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
index b49061608..ae5545f6e 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
@@ -96,7 +96,7 @@ public class TestOptimizingPlanner extends 
TestOptimizingEvaluator {
 
   @Override
   protected OptimizingPlanner buildOptimizingEvaluator() {
-    return new OptimizingPlanner(
+    return OptimizingPlanner.createOptimizingPlanner(
         getTableRuntime(),
         getMixedTable(),
         1,
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
index 6224bd24e..c240cce26 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
@@ -75,7 +75,13 @@ public class TestUnkeyedPartitionPlan extends 
MixedTablePlanTestBase {
   @Override
   protected AbstractPartitionPlan getPartitionPlan() {
     return new MixedIcebergPartitionPlan(
-        getTableRuntime(), getMixedTable(), getPartition(), 
System.currentTimeMillis());
+        getTableRuntime().getTableIdentifier(),
+        getMixedTable(),
+        getTableRuntime().getOptimizingConfig(),
+        getPartition(),
+        System.currentTimeMillis(),
+        getTableRuntime().getLastMinorOptimizingTime(),
+        getTableRuntime().getLastFullOptimizingTime());
   }
 
   @Override


Reply via email to