This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 715320f89 [AMORO-2553] Add delete file cache for Iceberg optimizer 
(#3793)
715320f89 is described below

commit 715320f8908784b86e69ceca269ac5d62ade4238
Author: ZhouJinsong <[email protected]>
AuthorDate: Wed Oct 15 17:38:33 2025 +0800

    [AMORO-2553] Add delete file cache for Iceberg optimizer (#3793)
    
    * Add delete cache for iceberg rewrite executor
    
    * Fix some compile errors
    
    * Add merge method to structLikeMap to improve compaction performance
    
    * Access getOrReadEqDeletes via reflection
    
    * Add configuration for delete cache
    
    * Fix a unit test error
    
    * Change delete-cache-enabled to false by default
    
    * Fix optimizer config name -ce
    
    * Fix some issues according to the comments.
---
 .../server/manager/AbstractOptimizerContainer.java |  30 +++-
 .../server/optimizing/UnKeyedTableCommit.java      |  12 +-
 .../optimizing/flow/CompleteOptimizingFlow.java    |   9 +-
 .../checker/FullOptimizingMove2HiveChecker.java    |  10 +-
 .../checker/FullOptimizingWrite2HiveChecker.java   |  10 +-
 .../plan/TestHiveKeyedPartitionPlan.java           |   7 +-
 .../plan/TestHiveUnkeyedPartitionPlan.java         |   7 +-
 .../optimizing/plan/TestIcebergPartitionPlan.java  |   5 +-
 .../optimizing/plan/TestKeyedPartitionPlan.java    |   4 +-
 .../optimizing/plan/TestOptimizingPlanner.java     |   4 +-
 .../optimizing/plan/TestUnkeyedPartitionPlan.java  |   4 +-
 .../java/org/apache/amoro/OptimizerProperties.java |   8 +
 .../amoro/io/reader/CombinedDeleteFilter.java      |  94 +++++-----
 .../org/apache/amoro/io/reader/DeleteCache.java    | 191 +++++++++++++++++++++
 .../reader/GenericCombinedIcebergDataReader.java   |  10 +-
 .../optimizing/AbstractRewriteFilesExecutor.java   |   7 +-
 .../amoro/optimizing/IcebergRewriteExecutor.java   |  11 +-
 .../optimizing/IcebergRewriteExecutorFactory.java  |   6 +-
 .../optimizing/MixedIcebergRewriteExecutor.java    |   9 +-
 .../MixedIcebergRewriteExecutorFactory.java        |   7 +-
 .../optimizing/OptimizingExecutorFactory.java      |   2 +-
 .../optimizing/OptimizingInputProperties.java      | 117 -------------
 .../apache/amoro/optimizing/TaskProperties.java    |  66 +++++++
 .../optimizing/plan/AbstractPartitionPlan.java     |   8 +-
 .../optimizing/plan/IcebergPartitionPlan.java      |  11 +-
 .../optimizing/plan/MixedIcebergPartitionPlan.java |  10 +-
 .../java/org/apache/amoro/utils/map/SimpleMap.java |  23 ++-
 .../apache/amoro/utils/map/StructLikeBaseMap.java  |   7 +
 .../amoro/utils/map/StructLikeMemoryMap.java       |   9 +
 .../apache/amoro/io/TestIcebergCombinedReader.java |  51 ++++--
 .../io/TestIcebergCombinedReaderVariousTypes.java  |   6 +-
 .../optimizing/IcebergRewriteExecutorTest.java     |   5 +-
 .../hive/optimizing/MixedHiveRewriteExecutor.java  |  12 +-
 .../MixedHiveRewriteExecutorFactory.java           |  11 +-
 .../optimizing/plan/MixedHivePartitionPlan.java    |  13 +-
 .../amoro/optimizer/common/OptimizerConfig.java    |  62 ++++++-
 .../amoro/optimizer/common/OptimizerExecutor.java  |  46 +++--
 .../optimizer/common/TestOptimizerExecutor.java    |   5 +-
 docs/admin-guides/managing-optimizers.md           |  14 +-
 39 files changed, 622 insertions(+), 301 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
index 9e8017429..16628406d 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
@@ -23,6 +23,7 @@ import org.apache.amoro.resource.InternalResourceContainer;
 import org.apache.amoro.resource.Resource;
 import org.apache.amoro.resource.ResourceStatus;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.utils.PropertyUtil;
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.IOException;
@@ -91,7 +92,7 @@ public abstract class AbstractOptimizerContainer implements 
InternalResourceCont
           .append(" -hb ")
           
.append(resource.getProperties().get(OptimizerProperties.OPTIMIZER_HEART_BEAT_INTERVAL));
     }
-    if (org.apache.iceberg.util.PropertyUtil.propertyAsBoolean(
+    if (PropertyUtil.propertyAsBoolean(
         resource.getProperties(),
         OptimizerProperties.OPTIMIZER_EXTEND_DISK_STORAGE,
         OptimizerProperties.OPTIMIZER_EXTEND_DISK_STORAGE_DEFAULT)) {
@@ -105,6 +106,33 @@ public abstract class AbstractOptimizerContainer 
implements InternalResourceCont
                 
resource.getProperties().get(OptimizerProperties.OPTIMIZER_MEMORY_STORAGE_SIZE));
       }
     }
+    if (PropertyUtil.propertyAsBoolean(
+        resource.getProperties(),
+        OptimizerProperties.OPTIMIZER_CACHE_ENABLED,
+        OptimizerProperties.OPTIMIZER_CACHE_ENABLED_DEFAULT)) {
+      stringBuilder.append(" -ce ");
+      if (resource
+          .getProperties()
+          .containsKey(OptimizerProperties.OPTIMIZER_CACHE_MAX_TOTAL_SIZE)) {
+        stringBuilder
+            .append(" -cmts ")
+            .append(
+                
resource.getProperties().get(OptimizerProperties.OPTIMIZER_CACHE_MAX_TOTAL_SIZE));
+      }
+      if (resource
+          .getProperties()
+          .containsKey(OptimizerProperties.OPTIMIZER_CACHE_MAX_ENTRY_SIZE)) {
+        stringBuilder
+            .append(" -cmes ")
+            .append(
+                
resource.getProperties().get(OptimizerProperties.OPTIMIZER_CACHE_MAX_ENTRY_SIZE));
+      }
+      if 
(resource.getProperties().containsKey(OptimizerProperties.OPTIMIZER_CACHE_TIMEOUT))
 {
+        stringBuilder
+            .append(" -ct ")
+            
.append(resource.getProperties().get(OptimizerProperties.OPTIMIZER_CACHE_TIMEOUT));
+      }
+    }
     if (StringUtils.isNotEmpty(resource.getResourceId())) {
       stringBuilder.append(" -id ").append(resource.getResourceId());
     }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
index a7bea9742..d71ae484b 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
@@ -31,9 +31,9 @@ import org.apache.amoro.hive.utils.HiveTableUtil;
 import org.apache.amoro.hive.utils.TableTypeUtil;
 import org.apache.amoro.iceberg.Constants;
 import org.apache.amoro.op.SnapshotSummary;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
 import org.apache.amoro.optimizing.RewriteFilesOutput;
 import org.apache.amoro.optimizing.RewriteStageTask;
+import org.apache.amoro.optimizing.TaskProperties;
 import org.apache.amoro.properties.HiveTableProperties;
 import org.apache.amoro.server.optimizing.TaskRuntime.Status;
 import org.apache.amoro.server.utils.IcebergTableUtil;
@@ -43,6 +43,7 @@ import org.apache.amoro.table.UnkeyedTable;
 import org.apache.amoro.utils.ContentFiles;
 import org.apache.amoro.utils.IcebergThreadPools;
 import org.apache.amoro.utils.MixedTableUtil;
+import org.apache.amoro.utils.PropertyUtil;
 import org.apache.amoro.utils.TableFileUtil;
 import org.apache.amoro.utils.TablePropertyUtil;
 import org.apache.commons.collections.CollectionUtils;
@@ -328,8 +329,13 @@ public class UnKeyedTableCommit {
   }
 
   protected boolean needMoveFile2Hive() {
-    return 
OptimizingInputProperties.parse(tasks.stream().findAny().get().getProperties())
-        .getMoveFile2HiveLocation();
+    return PropertyUtil.propertyAsBoolean(
+        tasks.stream()
+            .findAny()
+            .orElseThrow(() -> new RuntimeException("The tasks is empty"))
+            .getProperties(),
+        TaskProperties.MOVE_FILE_TO_HIVE_LOCATION,
+        false);
   }
 
   protected void correctHiveData(Set<DataFile> addedDataFiles, Set<DeleteFile> 
addedDeleteFiles)
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
index e12c53403..f26773395 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
@@ -32,7 +32,6 @@ import 
org.apache.amoro.hive.optimizing.MixedHiveRewriteExecutor;
 import org.apache.amoro.iceberg.Constants;
 import org.apache.amoro.optimizing.IcebergRewriteExecutor;
 import org.apache.amoro.optimizing.OptimizingExecutor;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
 import org.apache.amoro.optimizing.RewriteFilesOutput;
 import org.apache.amoro.optimizing.RewriteStageTask;
 import org.apache.amoro.optimizing.plan.AbstractOptimizingPlanner;
@@ -45,7 +44,6 @@ import org.apache.amoro.server.utils.IcebergTableUtil;
 import org.apache.amoro.table.MixedTable;
 import org.apache.amoro.utils.MixedDataFiles;
 import org.apache.amoro.utils.TablePropertyUtil;
-import org.apache.amoro.utils.map.StructLikeCollections;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
@@ -213,13 +211,10 @@ public class CompleteOptimizingFlow {
       TaskRuntime<RewriteStageTask> taskRuntime) {
     if (table.format() == TableFormat.ICEBERG) {
       return new IcebergRewriteExecutor(
-          taskRuntime.getTaskDescriptor().getInput(), table, 
StructLikeCollections.DEFAULT);
+          taskRuntime.getTaskDescriptor().getInput(), table, 
taskRuntime.getProperties());
     } else {
       return new MixedHiveRewriteExecutor(
-          taskRuntime.getTaskDescriptor().getInput(),
-          table,
-          StructLikeCollections.DEFAULT,
-          
OptimizingInputProperties.parse(taskRuntime.getProperties()).getOutputDir());
+          taskRuntime.getTaskDescriptor().getInput(), table, 
taskRuntime.getProperties());
     }
   }
 
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingMove2HiveChecker.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingMove2HiveChecker.java
index 9b99be1d8..dab0707ae 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingMove2HiveChecker.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingMove2HiveChecker.java
@@ -18,13 +18,14 @@
 
 package org.apache.amoro.server.optimizing.flow.checker;
 
-import org.apache.amoro.optimizing.OptimizingInputProperties;
 import org.apache.amoro.optimizing.OptimizingType;
 import org.apache.amoro.optimizing.RewriteStageTask;
+import org.apache.amoro.optimizing.TaskProperties;
 import org.apache.amoro.optimizing.plan.AbstractOptimizingPlanner;
 import org.apache.amoro.server.optimizing.UnKeyedTableCommit;
 import org.apache.amoro.server.optimizing.flow.view.TableDataView;
 import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.utils.PropertyUtil;
 import org.apache.commons.collections.CollectionUtils;
 
 import javax.annotation.Nullable;
@@ -45,8 +46,9 @@ public class FullOptimizingMove2HiveChecker extends 
AbstractHiveChecker {
       @Nullable UnKeyedTableCommit latestCommit) {
     return CollectionUtils.isNotEmpty(latestTaskDescriptors)
         && latestPlanner.getOptimizingType() == OptimizingType.FULL
-        && OptimizingInputProperties.parse(
-                latestTaskDescriptors.stream().findAny().get().getProperties())
-            .getMoveFile2HiveLocation();
+        && PropertyUtil.propertyAsBoolean(
+            latestTaskDescriptors.stream().findAny().get().getProperties(),
+            TaskProperties.MOVE_FILE_TO_HIVE_LOCATION,
+            false);
   }
 }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingWrite2HiveChecker.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingWrite2HiveChecker.java
index 105e68a65..98179406d 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingWrite2HiveChecker.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingWrite2HiveChecker.java
@@ -18,9 +18,9 @@
 
 package org.apache.amoro.server.optimizing.flow.checker;
 
-import org.apache.amoro.optimizing.OptimizingInputProperties;
 import org.apache.amoro.optimizing.OptimizingType;
 import org.apache.amoro.optimizing.RewriteStageTask;
+import org.apache.amoro.optimizing.TaskProperties;
 import org.apache.amoro.optimizing.plan.AbstractOptimizingPlanner;
 import org.apache.amoro.server.optimizing.UnKeyedTableCommit;
 import org.apache.amoro.server.optimizing.flow.view.TableDataView;
@@ -45,9 +45,11 @@ public class FullOptimizingWrite2HiveChecker extends 
AbstractHiveChecker {
       @Nullable UnKeyedTableCommit latestCommit) {
     return CollectionUtils.isNotEmpty(latestTaskDescriptors)
         && latestPlanner.getOptimizingType() == OptimizingType.FULL
-        && OptimizingInputProperties.parse(
-                    
latestTaskDescriptors.stream().findAny().get().getProperties())
-                .getOutputDir()
+        && latestTaskDescriptors.stream()
+                .findAny()
+                .get()
+                .getProperties()
+                .get(TaskProperties.OUTPUT_DIR)
             != null;
   }
 }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
index 412defbd9..387b18231 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
@@ -28,7 +28,7 @@ import org.apache.amoro.hive.catalog.HiveTableTestHelper;
 import org.apache.amoro.hive.optimizing.MixedHiveRewriteExecutorFactory;
 import org.apache.amoro.hive.optimizing.plan.MixedHivePartitionPlan;
 import org.apache.amoro.hive.table.SupportHive;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
+import org.apache.amoro.optimizing.TaskProperties;
 import org.apache.amoro.optimizing.plan.AbstractPartitionPlan;
 import org.apache.amoro.properties.HiveTableProperties;
 import org.apache.amoro.server.optimizing.OptimizingTestHelpers;
@@ -72,7 +72,7 @@ public class TestHiveKeyedPartitionPlan extends 
TestKeyedPartitionPlan {
   @Override
   protected void assertTaskProperties(Map<String, String> expect, Map<String, 
String> actual) {
     actual = Maps.newHashMap(actual);
-    String outputDir = actual.remove(OptimizingInputProperties.OUTPUT_DIR);
+    String outputDir = actual.remove(TaskProperties.OUTPUT_DIR);
     if (outputDir != null) {
       Assert.assertTrue(Long.parseLong(outputDir.split("_")[1]) > 0);
     }
@@ -136,8 +136,7 @@ public class TestHiveKeyedPartitionPlan extends 
TestKeyedPartitionPlan {
   protected Map<String, String> buildTaskProperties() {
     Map<String, String> properties = Maps.newHashMap();
     properties.put(
-        OptimizingInputProperties.TASK_EXECUTOR_FACTORY_IMPL,
-        MixedHiveRewriteExecutorFactory.class.getName());
+        TaskProperties.TASK_EXECUTOR_FACTORY_IMPL, 
MixedHiveRewriteExecutorFactory.class.getName());
     return properties;
   }
 }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
index 82097d92c..2d0e4cfc1 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
@@ -27,7 +27,7 @@ import org.apache.amoro.hive.catalog.HiveTableTestHelper;
 import org.apache.amoro.hive.optimizing.MixedHiveRewriteExecutorFactory;
 import org.apache.amoro.hive.optimizing.plan.MixedHivePartitionPlan;
 import org.apache.amoro.hive.table.SupportHive;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
+import org.apache.amoro.optimizing.TaskProperties;
 import org.apache.amoro.optimizing.plan.AbstractPartitionPlan;
 import org.apache.amoro.properties.HiveTableProperties;
 import org.apache.amoro.server.optimizing.OptimizingTestHelpers;
@@ -71,7 +71,7 @@ public class TestHiveUnkeyedPartitionPlan extends 
TestUnkeyedPartitionPlan {
   @Override
   protected void assertTaskProperties(Map<String, String> expect, Map<String, 
String> actual) {
     actual = Maps.newHashMap(actual);
-    String outputDir = actual.remove(OptimizingInputProperties.OUTPUT_DIR);
+    String outputDir = actual.remove(TaskProperties.OUTPUT_DIR);
     if (outputDir != null) {
       Assert.assertTrue(Long.parseLong(outputDir.split("_")[1]) > 0);
     }
@@ -136,8 +136,7 @@ public class TestHiveUnkeyedPartitionPlan extends 
TestUnkeyedPartitionPlan {
   protected Map<String, String> buildTaskProperties() {
     Map<String, String> properties = Maps.newHashMap();
     properties.put(
-        OptimizingInputProperties.TASK_EXECUTOR_FACTORY_IMPL,
-        MixedHiveRewriteExecutorFactory.class.getName());
+        TaskProperties.TASK_EXECUTOR_FACTORY_IMPL, 
MixedHiveRewriteExecutorFactory.class.getName());
     return properties;
   }
 }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
index 2effe4c84..3d67b62c8 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
@@ -24,7 +24,7 @@ import org.apache.amoro.TableTestHelper;
 import org.apache.amoro.catalog.BasicCatalogTestHelper;
 import org.apache.amoro.catalog.CatalogTestHelper;
 import org.apache.amoro.optimizing.IcebergRewriteExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
+import org.apache.amoro.optimizing.TaskProperties;
 import org.apache.amoro.optimizing.plan.AbstractPartitionPlan;
 import org.apache.amoro.optimizing.plan.IcebergPartitionPlan;
 import org.apache.amoro.optimizing.scan.IcebergTableFileScanHelper;
@@ -95,8 +95,7 @@ public class TestIcebergPartitionPlan extends 
TestUnkeyedPartitionPlan {
   protected Map<String, String> buildTaskProperties() {
     Map<String, String> properties = Maps.newHashMap();
     properties.put(
-        OptimizingInputProperties.TASK_EXECUTOR_FACTORY_IMPL,
-        IcebergRewriteExecutorFactory.class.getName());
+        TaskProperties.TASK_EXECUTOR_FACTORY_IMPL, 
IcebergRewriteExecutorFactory.class.getName());
     return properties;
   }
 }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
index 4a4b38472..1984f31d8 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
@@ -25,8 +25,8 @@ import org.apache.amoro.catalog.BasicCatalogTestHelper;
 import org.apache.amoro.catalog.CatalogTestHelper;
 import org.apache.amoro.data.ChangeAction;
 import org.apache.amoro.optimizing.MixedIcebergRewriteExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
 import org.apache.amoro.optimizing.RewriteStageTask;
+import org.apache.amoro.optimizing.TaskProperties;
 import org.apache.amoro.optimizing.plan.AbstractPartitionPlan;
 import org.apache.amoro.optimizing.plan.MixedIcebergPartitionPlan;
 import org.apache.amoro.optimizing.scan.KeyedTableFileScanHelper;
@@ -235,7 +235,7 @@ public class TestKeyedPartitionPlan extends 
MixedTablePlanTestBase {
   protected Map<String, String> buildTaskProperties() {
     Map<String, String> properties = Maps.newHashMap();
     properties.put(
-        OptimizingInputProperties.TASK_EXECUTOR_FACTORY_IMPL,
+        TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
         MixedIcebergRewriteExecutorFactory.class.getName());
     return properties;
   }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
index be189c29b..051798b7b 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
@@ -25,9 +25,9 @@ import org.apache.amoro.TableTestHelper;
 import org.apache.amoro.catalog.BasicCatalogTestHelper;
 import org.apache.amoro.catalog.CatalogTestHelper;
 import org.apache.amoro.optimizing.MixedIcebergRewriteExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
 import org.apache.amoro.optimizing.OptimizingType;
 import org.apache.amoro.optimizing.RewriteStageTask;
+import org.apache.amoro.optimizing.TaskProperties;
 import org.apache.amoro.optimizing.plan.AbstractOptimizingPlanner;
 import org.apache.amoro.optimizing.scan.TableFileScanHelper;
 import org.apache.amoro.server.utils.IcebergTableUtil;
@@ -113,7 +113,7 @@ public class TestOptimizingPlanner extends 
TestOptimizingEvaluator {
   protected Map<String, String> buildTaskProperties() {
     Map<String, String> properties = Maps.newHashMap();
     properties.put(
-        OptimizingInputProperties.TASK_EXECUTOR_FACTORY_IMPL,
+        TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
         MixedIcebergRewriteExecutorFactory.class.getName());
     return properties;
   }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
index 33d7aa33b..558ace42b 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
@@ -24,7 +24,7 @@ import org.apache.amoro.TableTestHelper;
 import org.apache.amoro.catalog.BasicCatalogTestHelper;
 import org.apache.amoro.catalog.CatalogTestHelper;
 import org.apache.amoro.optimizing.MixedIcebergRewriteExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
+import org.apache.amoro.optimizing.TaskProperties;
 import org.apache.amoro.optimizing.plan.AbstractPartitionPlan;
 import org.apache.amoro.optimizing.plan.MixedIcebergPartitionPlan;
 import org.apache.amoro.optimizing.scan.TableFileScanHelper;
@@ -106,7 +106,7 @@ public class TestUnkeyedPartitionPlan extends 
MixedTablePlanTestBase {
   protected Map<String, String> buildTaskProperties() {
     Map<String, String> properties = Maps.newHashMap();
     properties.put(
-        OptimizingInputProperties.TASK_EXECUTOR_FACTORY_IMPL,
+        TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
         MixedIcebergRewriteExecutorFactory.class.getName());
     return properties;
   }
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java 
b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
index f83912d9c..bea9b90f9 100644
--- a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
+++ b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
@@ -39,4 +39,12 @@ public class OptimizerProperties {
   public static final String OPTIMIZER_MEMORY_STORAGE_SIZE = 
"memory-storage-size";
   public static final String MAX_INPUT_FILE_SIZE_PER_THREAD = 
"max-input-file-size-per-thread";
   public static final Long MAX_INPUT_FILE_SIZE_PER_THREAD_DEFAULT = 512 * 1024 
* 1024L; // 512MB
+  public static final String OPTIMIZER_CACHE_ENABLED = "cache-enabled";
+  public static final boolean OPTIMIZER_CACHE_ENABLED_DEFAULT = false;
+  public static final String OPTIMIZER_CACHE_MAX_ENTRY_SIZE = 
"cache-max-entry-size";
+  public static final String OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT = "64mb";
+  public static final String OPTIMIZER_CACHE_MAX_TOTAL_SIZE = 
"cache-max-total-size";
+  public static final String OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT = "128mb";
+  public static final String OPTIMIZER_CACHE_TIMEOUT = "cache-timeout";
+  public static final String OPTIMIZER_CACHE_TIMEOUT_DEFAULT = "10min";
 }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
index 830976ea2..a18b2ab81 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
@@ -23,7 +23,6 @@ import org.apache.amoro.io.CloseablePredicate;
 import org.apache.amoro.optimizing.RewriteFilesInput;
 import 
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
 import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
-import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Multimap;
@@ -31,6 +30,7 @@ import 
org.apache.amoro.shade.guava32.com.google.common.collect.Multimaps;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
 import org.apache.amoro.shade.guava32.com.google.common.hash.BloomFilter;
 import org.apache.amoro.utils.ContentFiles;
+import org.apache.amoro.utils.DynMethods;
 import org.apache.amoro.utils.map.StructLikeBaseMap;
 import org.apache.amoro.utils.map.StructLikeCollections;
 import org.apache.iceberg.Accessor;
@@ -41,6 +41,8 @@ import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.BaseDeleteLoader;
+import org.apache.iceberg.data.DeleteLoader;
 import org.apache.iceberg.data.InternalRecordWrapper;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.data.avro.DataReader;
@@ -64,11 +66,12 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 /**
@@ -91,6 +94,12 @@ public abstract class CombinedDeleteFilter<T extends 
StructLike> {
   private static final Accessor<StructLike> POSITION_ACCESSOR =
       
POS_DELETE_SCHEMA.accessorForField(MetadataColumns.DELETE_FILE_POS.fieldId());
 
+  // BaseDeleteLoader#getOrReadEqDeletes is a private method, accessed it via 
reflection
+  private static final DynMethods.UnboundMethod READ_EQ_DELETES_METHOD =
+      DynMethods.builder("getOrReadEqDeletes")
+          .hiddenImpl(BaseDeleteLoader.class, DeleteFile.class, Schema.class)
+          .build();
+
   @VisibleForTesting public static long FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT 
= 1000000L;
 
   private final RewriteFilesInput input;
@@ -117,11 +126,14 @@ public abstract class CombinedDeleteFilter<T extends 
StructLike> {
 
   private final long dataRecordCnt;
   private final boolean filterEqDelete;
+  private final DeleteLoader deleteLoader;
+  private final String deleteGroup;
 
   protected CombinedDeleteFilter(
       RewriteFilesInput rewriteFilesInput,
       Schema tableSchema,
-      StructLikeCollections structLikeCollections) {
+      StructLikeCollections structLikeCollections,
+      String deleteGroup) {
     this.input = rewriteFilesInput;
     this.dataRecordCnt =
         
Arrays.stream(rewriteFilesInput.dataFiles()).mapToLong(ContentFile::recordCount).sum();
@@ -157,6 +169,14 @@ public abstract class CombinedDeleteFilter<T extends 
StructLike> {
       this.structLikeCollections = structLikeCollections;
     }
     this.filterEqDelete = filterEqDelete();
+    this.deleteGroup = deleteGroup;
+    if (Boolean.parseBoolean(
+        System.getProperty(
+            DeleteCache.DELETE_CACHE_ENABLED, 
DeleteCache.DELETE_CACHE_ENABLED_DEFAULT))) {
+      this.deleteLoader = new CachingDeleteLoader(this::getInputFile);
+    } else {
+      this.deleteLoader = new BaseDeleteLoader(this::getInputFile);
+    }
   }
 
   /**
@@ -311,44 +331,22 @@ public abstract class CombinedDeleteFilter<T extends 
StructLike> {
     Schema deleteSchema = deleteSchemaEntry.getValue();
     Iterable<DeleteFile> eqDeletes = eqDeleteFilesByDeleteIds.get(ids);
 
-    InternalRecordWrapper internalRecordWrapper =
-        new InternalRecordWrapper(deleteSchema.asStruct());
-
-    CloseableIterable<RecordWithLsn> deleteRecords =
-        CloseableIterable.transform(
-            CloseableIterable.concat(
-                Iterables.transform(
-                    eqDeletes,
-                    s ->
-                        CloseableIterable.transform(
-                            openFile(s, deleteSchema),
-                            r -> new RecordWithLsn(s.dataSequenceNumber(), 
r)))),
-            RecordWithLsn::recordCopy);
-
     StructLikeBaseMap<Long> structLikeMap =
         structLikeCollections.createStructLikeMap(deleteSchema.asStruct());
+    structMapCloseable.add(structLikeMap);
 
-    // init map
-    try (CloseableIterable<RecordWithLsn> deletes = deleteRecords) {
-      Iterator<RecordWithLsn> it =
-          getFileIO() == null ? deletes.iterator() : 
getFileIO().doAs(deletes::iterator);
-      while (it.hasNext()) {
-        RecordWithLsn recordWithLsn = it.next();
-        StructLike deletePK = 
internalRecordWrapper.copyFor(recordWithLsn.getRecord());
-        if (filterEqDelete && !bloomFilter.mightContain(deletePK)) {
+    for (DeleteFile deleteFile : eqDeletes) {
+      for (StructLike record : openEqualityDeletes(deleteFile, deleteSchema)) {
+        if (filterEqDelete && !bloomFilter.mightContain(record)) {
           continue;
         }
-        Long lsn = recordWithLsn.getLsn();
-        Long old = structLikeMap.get(deletePK);
-        if (old == null || old.compareTo(lsn) <= 0) {
-          structLikeMap.put(deletePK, lsn);
-        }
+        Long lsn = deleteFile.dataSequenceNumber();
+        structLikeMap.merge(record, lsn, Math::max);
       }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
     }
-    structMapCloseable.add(structLikeMap);
 
+    InternalRecordWrapper internalRecordWrapper =
+        new InternalRecordWrapper(deleteSchema.asStruct());
     return structForDelete -> {
       StructProjection deleteProjection =
           StructProjection.create(requiredSchema, 
deleteSchema).wrap(structForDelete.getPk());
@@ -444,6 +442,10 @@ public abstract class CombinedDeleteFilter<T extends 
StructLike> {
     return openFile(file, POS_DELETE_SCHEMA);
   }
 
+  private Iterable<StructLike> openEqualityDeletes(DeleteFile file, Schema 
deleteSchema) {
+    return READ_EQ_DELETES_METHOD.invoke(deleteLoader, file, deleteSchema);
+  }
+
   private CloseableIterable<Record> openFile(ContentFile<?> contentFile, 
Schema deleteSchema) {
     InputFile input = getInputFile(contentFile);
     switch (contentFile.format()) {
@@ -477,26 +479,22 @@ public abstract class CombinedDeleteFilter<T extends 
StructLike> {
     }
   }
 
-  static class RecordWithLsn {
-    private final Long lsn;
-    private Record record;
-
-    public RecordWithLsn(Long lsn, Record record) {
-      this.lsn = lsn;
-      this.record = record;
-    }
+  class CachingDeleteLoader extends BaseDeleteLoader {
+    private final DeleteCache cache;
 
-    public Long getLsn() {
-      return lsn;
+    public CachingDeleteLoader(Function<DeleteFile, InputFile> loadInputFile) {
+      super(loadInputFile);
+      this.cache = DeleteCache.getInstance();
     }
 
-    public Record getRecord() {
-      return record;
+    @Override
+    protected boolean canCache(long size) {
+      return cache != null && size < cache.maxEntrySize();
     }
 
-    public RecordWithLsn recordCopy() {
-      record = record.copy();
-      return this;
+    @Override
+    protected <V> V getOrLoad(String key, Supplier<V> valueSupplier, long 
valueSize) {
+      return cache.getOrLoad(CombinedDeleteFilter.this.deleteGroup, key, 
valueSupplier, valueSize);
     }
   }
 }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/DeleteCache.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/DeleteCache.java
new file mode 100644
index 000000000..33d50da02
--- /dev/null
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/DeleteCache.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.io.reader;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.amoro.config.ConfigHelpers;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.utils.MemorySize;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * A cache for reducing the computation and IO overhead for iceberg delete 
files.
+ *
+ * <p>The cache is configured and controlled through JVM system properties. It 
supports both limits
+ * on the total cache size and maximum size for individual entries. 
Additionally, it implements
+ * automatic eviction of entries after a specified duration of inactivity.
+ *
+ * <p>The cache is accessed and populated via {@link #getOrLoad(String, 
String, Supplier, long)}. If
+ * the value is not present in the cache, it is computed using the provided 
supplier and stored in
+ * the cache, subject to the defined size constraints. When a key is added, it 
must be associated
+ * with a particular group ID. Once the group is no longer needed, it is 
recommended to explicitly
+ * invalidate its state by calling {@link #invalidate(String)} instead of 
relying on automatic
+ * eviction.
+ *
+ * <p>Note that this class employs the singleton pattern to ensure only one 
cache exists per JVM.
+ */
+public class DeleteCache {
+  private static final Logger LOG = LoggerFactory.getLogger(DeleteCache.class);
+  public static final String DELETE_CACHE_ENABLED = "delete-cache-enabled";
+  public static final String DELETE_CACHE_ENABLED_DEFAULT = "false";
+  public static final String DELETE_CACHE_MAX_ENTRY_SIZE = 
"delete-cache-max-entry-size";
+  public static final String DELETE_CACHE_MAX_ENTRY_SIZE_DEFAULT = "64mb";
+  public static final String DELETE_CACHE_MAX_TOTAL_SIZE = 
"delete-cache-max-total-size";
+  public static final String DELETE_CACHE_MAX_TOTAL_SIZE_DEFAULT = "128mb";
+  public static final String DELETE_CACHE_TIMEOUT = "delete-cache-timeout";
+  public static final String DELETE_CACHE_TIMEOUT_DEFAULT = "10min";
+  private static final int MAX_GROUPS = 5;
+  private static volatile DeleteCache INSTANCE;
+
+  private final Duration timeout;
+  private final long maxEntrySize;
+  private final long maxTotalSize;
+  private final List<String> groups = new CopyOnWriteArrayList<>();
+  private volatile Cache<String, CacheValue> state;
+
+  private DeleteCache(Duration timeout, long maxEntrySize, long maxTotalSize) {
+    this.timeout = timeout;
+    this.maxEntrySize = maxEntrySize;
+    this.maxTotalSize = maxTotalSize;
+  }
+
+  public static DeleteCache getInstance() {
+    if (INSTANCE == null) {
+      long maxEntrySize =
+          MemorySize.parseBytes(
+              System.getProperty(DELETE_CACHE_MAX_ENTRY_SIZE, 
DELETE_CACHE_MAX_ENTRY_SIZE_DEFAULT));
+      long maxTotalSize =
+          MemorySize.parseBytes(
+              System.getProperty(DELETE_CACHE_MAX_TOTAL_SIZE, 
DELETE_CACHE_MAX_TOTAL_SIZE_DEFAULT));
+      Duration timeout =
+          ConfigHelpers.TimeUtils.parseDuration(
+              System.getProperty(DELETE_CACHE_TIMEOUT, 
DELETE_CACHE_TIMEOUT_DEFAULT));
+      initialInstance(timeout, maxEntrySize, maxTotalSize);
+    }
+    return INSTANCE;
+  }
+
+  public static synchronized void initialInstance(
+      Duration timeout, long maxEntrySize, long maxTotalSize) {
+    if (INSTANCE == null) {
+      INSTANCE = new DeleteCache(timeout, maxEntrySize, maxTotalSize);
+    } else {
+      LOG.warn("Cache is already initialed.");
+    }
+  }
+
+  public <V> V getOrLoad(String group, String key, Supplier<V> valueSupplier, 
long valueSize) {
+    if (valueSize > maxEntrySize) {
+      LOG.debug("{} exceeds max entry size: {} > {}", key, valueSize, 
maxEntrySize);
+      return valueSupplier.get();
+    }
+    if (!groups.contains(group)) {
+      if (groups.size() > MAX_GROUPS) {
+        String removed = groups.remove(groups.size() - 1);
+        if (removed != null) {
+          invalidate(removed);
+        }
+      }
+      groups.add(group);
+    }
+    String internalKey = group + "_" + key;
+    CacheValue value = state().get(internalKey, loadFunc(valueSupplier, 
valueSize));
+    Preconditions.checkNotNull(value, "Loaded value must not be null");
+    return value.get();
+  }
+
+  public void invalidate(String group) {
+    if (state != null && group != null) {
+      List<String> internalKeys = findInternalKeys(group);
+      LOG.info("Invalidating {} keys associated with {}", internalKeys.size(), 
group);
+      internalKeys.forEach(internalKey -> state.invalidate(internalKey));
+      LOG.info("Current cache stats {}", state.stats());
+    }
+  }
+
+  private List<String> findInternalKeys(String group) {
+    return state.asMap().keySet().stream()
+        .filter(internalKey -> internalKey.startsWith(group))
+        .collect(Collectors.toList());
+  }
+
+  private <V> Function<String, CacheValue> loadFunc(Supplier<V> valueSupplier, 
long valueSize) {
+    return key -> {
+      long start = System.currentTimeMillis();
+      V value = valueSupplier.get();
+      long end = System.currentTimeMillis();
+      LOG.debug("Loaded {} with size {} in {} ms", key, valueSize, (end - 
start));
+      return new CacheValue(value, valueSize);
+    };
+  }
+
+  private Cache<String, CacheValue> state() {
+    if (state == null) {
+      synchronized (this) {
+        if (state == null) {
+          LOG.info("Initializing cache state");
+          this.state = initState();
+        }
+      }
+    }
+    return state;
+  }
+
+  private Cache<String, CacheValue> initState() {
+    return Caffeine.newBuilder()
+        .expireAfterAccess(timeout)
+        .maximumWeight(maxTotalSize)
+        .weigher((key, value) -> ((CacheValue) value).weight())
+        .recordStats()
+        .removalListener((key, value, cause) -> LOG.debug("Evicted {} ({})", 
key, cause))
+        .build();
+  }
+
+  public long maxEntrySize() {
+    return maxEntrySize;
+  }
+
+  static class CacheValue {
+    private final Object value;
+    private final long size;
+
+    CacheValue(Object value, long size) {
+      this.value = value;
+      this.size = size;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <V> V get() {
+      return (V) value;
+    }
+
+    public int weight() {
+      return (int) Math.min(size, Integer.MAX_VALUE);
+    }
+  }
+}
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/GenericCombinedIcebergDataReader.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/GenericCombinedIcebergDataReader.java
index c16d23394..7da19ee9d 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/GenericCombinedIcebergDataReader.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/GenericCombinedIcebergDataReader.java
@@ -84,7 +84,8 @@ public class GenericCombinedIcebergDataReader implements 
OptimizingDataReader {
       BiFunction<Type, Object, Object> convertConstant,
       boolean reuseContainer,
       StructLikeCollections structLikeCollections,
-      RewriteFilesInput rewriteFilesInput) {
+      RewriteFilesInput rewriteFilesInput,
+      String deleteGroup) {
     this.tableSchema = tableSchema;
     this.spec = spec;
     this.encryptionManager = encryptionManager;
@@ -95,7 +96,7 @@ public class GenericCombinedIcebergDataReader implements 
OptimizingDataReader {
     this.reuseContainer = reuseContainer;
     this.input = rewriteFilesInput;
     this.deleteFilter =
-        new GenericDeleteFilter(rewriteFilesInput, tableSchema, 
structLikeCollections);
+        new GenericDeleteFilter(rewriteFilesInput, tableSchema, 
structLikeCollections, deleteGroup);
   }
 
   @Override
@@ -301,8 +302,9 @@ public class GenericCombinedIcebergDataReader implements 
OptimizingDataReader {
     public GenericDeleteFilter(
         RewriteFilesInput rewriteFilesInput,
         Schema tableSchema,
-        StructLikeCollections structLikeCollections) {
-      super(rewriteFilesInput, tableSchema, structLikeCollections);
+        StructLikeCollections structLikeCollections,
+        String deleteGroup) {
+      super(rewriteFilesInput, tableSchema, structLikeCollections, 
deleteGroup);
     }
 
     @Override
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
index 931fadc07..4446619f9 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
@@ -68,6 +68,8 @@ public abstract class AbstractRewriteFilesExecutor
 
   protected final RewriteFilesInput input;
 
+  protected final Map<String, String> properties;
+
   protected MixedTable table;
 
   protected OptimizingDataReader dataReader;
@@ -77,11 +79,12 @@ public abstract class AbstractRewriteFilesExecutor
   protected StructLikeCollections structLikeCollections;
 
   public AbstractRewriteFilesExecutor(
-      RewriteFilesInput input, MixedTable table, StructLikeCollections 
structLikeCollections) {
+      RewriteFilesInput input, MixedTable table, Map<String, String> 
properties) {
     this.input = input;
     this.table = table;
     this.io = table.io();
-    this.structLikeCollections = structLikeCollections;
+    this.properties = properties;
+    this.structLikeCollections = 
TaskProperties.getStructLikeCollections(properties);
     dataReader = dataReader();
   }
 
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
index 10cbe1a26..606be5754 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
@@ -22,7 +22,6 @@ import 
org.apache.amoro.io.reader.GenericCombinedIcebergDataReader;
 import org.apache.amoro.io.writer.GenericIcebergPartitionedFanoutWriter;
 import org.apache.amoro.io.writer.IcebergFanoutPosDeleteWriter;
 import org.apache.amoro.table.MixedTable;
-import org.apache.amoro.utils.map.StructLikeCollections;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.TableProperties;
@@ -37,18 +36,19 @@ import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
 
 import java.util.Arrays;
+import java.util.Map;
 import java.util.UUID;
 
 /** OptimizingExecutor for iceberg format. */
 public class IcebergRewriteExecutor extends AbstractRewriteFilesExecutor {
-
   public IcebergRewriteExecutor(
-      RewriteFilesInput input, MixedTable table, StructLikeCollections 
structLikeCollections) {
-    super(input, table, structLikeCollections);
+      RewriteFilesInput input, MixedTable table, Map<String, String> 
properties) {
+    super(input, table, properties);
   }
 
   @Override
   protected OptimizingDataReader dataReader() {
+    String processId = TaskProperties.getProcessId(properties);
     return new GenericCombinedIcebergDataReader(
         io,
         table.schema(),
@@ -59,7 +59,8 @@ public class IcebergRewriteExecutor extends 
AbstractRewriteFilesExecutor {
         IdentityPartitionConverters::convertConstant,
         false,
         structLikeCollections,
-        input);
+        input,
+        processId);
   }
 
   @Override
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutorFactory.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutorFactory.java
index 48be84c01..fd60ab9ef 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutorFactory.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutorFactory.java
@@ -32,9 +32,7 @@ public class IcebergRewriteExecutorFactory implements 
OptimizingExecutorFactory<
   }
 
   @Override
-  public OptimizingExecutor createExecutor(RewriteFilesInput input) {
-    OptimizingInputProperties optimizingConfig = 
OptimizingInputProperties.parse(properties);
-    return new IcebergRewriteExecutor(
-        input, input.getTable(), optimizingConfig.getStructLikeCollections());
+  public OptimizingExecutor<RewriteFilesOutput> 
createExecutor(RewriteFilesInput input) {
+    return new IcebergRewriteExecutor(input, input.getTable(), properties);
   }
 }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutor.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutor.java
index 02861ba67..f274e300a 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutor.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutor.java
@@ -22,7 +22,6 @@ import org.apache.amoro.data.PrimaryKeyedFile;
 import org.apache.amoro.io.writer.GenericTaskWriters;
 import org.apache.amoro.io.writer.MixedTreeNodePosDeleteWriter;
 import org.apache.amoro.table.MixedTable;
-import org.apache.amoro.utils.map.StructLikeCollections;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.io.DeleteWriteResult;
@@ -31,15 +30,13 @@ import org.apache.iceberg.io.FileWriter;
 import org.apache.iceberg.io.TaskWriter;
 
 import java.util.List;
+import java.util.Map;
 
 public class MixedIcebergRewriteExecutor extends AbstractRewriteFilesExecutor {
 
   public MixedIcebergRewriteExecutor(
-      RewriteFilesInput input,
-      MixedTable table,
-      StructLikeCollections structLikeCollections,
-      String outputDir) {
-    super(input, table, structLikeCollections);
+      RewriteFilesInput input, MixedTable table, Map<String, String> 
properties) {
+    super(input, table, properties);
   }
 
   @Override
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutorFactory.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutorFactory.java
index 29b92c9bb..0728f7aee 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutorFactory.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutorFactory.java
@@ -33,11 +33,6 @@ public class MixedIcebergRewriteExecutorFactory
 
   @Override
   public OptimizingExecutor<RewriteFilesOutput> 
createExecutor(RewriteFilesInput input) {
-    OptimizingInputProperties optimizingConfig = 
OptimizingInputProperties.parse(properties);
-    return new MixedIcebergRewriteExecutor(
-        input,
-        input.getTable(),
-        optimizingConfig.getStructLikeCollections(),
-        optimizingConfig.getOutputDir());
+    return new MixedIcebergRewriteExecutor(input, input.getTable(), 
properties);
   }
 }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/OptimizingExecutorFactory.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/OptimizingExecutorFactory.java
index c408f970b..e0dfe2f8f 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/OptimizingExecutorFactory.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/OptimizingExecutorFactory.java
@@ -32,5 +32,5 @@ public interface OptimizingExecutorFactory<I extends 
TableOptimizing.OptimizingI
   void initialize(Map<String, String> properties);
 
   /** Create factory by input */
-  OptimizingExecutor createExecutor(I input);
+  OptimizingExecutor<?> createExecutor(I input);
 }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/OptimizingInputProperties.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/OptimizingInputProperties.java
deleted file mode 100644
index b48875029..000000000
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/OptimizingInputProperties.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.optimizing;
-
-import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
-import org.apache.amoro.utils.map.StructLikeCollections;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class OptimizingInputProperties {
-
-  public static final String ENABLE_SPILL_MAP = "enable_spill_map";
-
-  public static final String MAX_IN_MEMORY_SIZE_IN_BYTES = 
"max_size_in_memory";
-
-  public static final String SPILL_MAP_PATH = "spill_map_path";
-
-  public static final String OUTPUT_DIR = "output_location";
-
-  public static final String MOVE_FILE_TO_HIVE_LOCATION = 
"move-files-to-hive-location";
-
-  public static final String TASK_EXECUTOR_FACTORY_IMPL = 
"task-executor-factory-impl";
-
-  private final Map<String, String> properties;
-
-  private OptimizingInputProperties(Map<String, String> properties) {
-    this.properties = Maps.newHashMap(properties);
-  }
-
-  public OptimizingInputProperties() {
-    properties = new HashMap<>();
-  }
-
-  public static OptimizingInputProperties parse(Map<String, String> 
properties) {
-    return new OptimizingInputProperties(properties);
-  }
-
-  public OptimizingInputProperties enableSpillMap() {
-    properties.put(ENABLE_SPILL_MAP, "true");
-    return this;
-  }
-
-  public OptimizingInputProperties setMaxSizeInMemory(long maxSizeInMemory) {
-    properties.put(MAX_IN_MEMORY_SIZE_IN_BYTES, 
String.valueOf(maxSizeInMemory));
-    return this;
-  }
-
-  public OptimizingInputProperties setSpillMapPath(String path) {
-    properties.put(SPILL_MAP_PATH, path);
-    return this;
-  }
-
-  public OptimizingInputProperties setOutputDir(String outputDir) {
-    properties.put(OUTPUT_DIR, outputDir);
-    return this;
-  }
-
-  public OptimizingInputProperties setExecutorFactoryImpl(String 
executorFactoryImpl) {
-    properties.put(TASK_EXECUTOR_FACTORY_IMPL, executorFactoryImpl);
-    return this;
-  }
-
-  public OptimizingInputProperties needMoveFile2HiveLocation() {
-    properties.put(MOVE_FILE_TO_HIVE_LOCATION, "true");
-    return this;
-  }
-
-  public StructLikeCollections getStructLikeCollections() {
-    String enableSpillMapStr = properties.get(ENABLE_SPILL_MAP);
-    boolean enableSpillMap = Boolean.parseBoolean(enableSpillMapStr);
-
-    String maxInMemoryStr = properties.get(MAX_IN_MEMORY_SIZE_IN_BYTES);
-    Long maxInMemory = maxInMemoryStr == null ? null : 
Long.parseLong(maxInMemoryStr);
-
-    String spillMapPath = properties.get(SPILL_MAP_PATH);
-
-    return new StructLikeCollections(enableSpillMap, maxInMemory, 
spillMapPath);
-  }
-
-  public String getOutputDir() {
-    return properties.get(OUTPUT_DIR);
-  }
-
-  public String getExecutorFactoryImpl() {
-    return properties.get(TASK_EXECUTOR_FACTORY_IMPL);
-  }
-
-  public boolean getMoveFile2HiveLocation() {
-    String s = properties.get(MOVE_FILE_TO_HIVE_LOCATION);
-    if (StringUtils.isBlank(s)) {
-      return false;
-    }
-    return Boolean.parseBoolean(s);
-  }
-
-  public Map<String, String> getProperties() {
-    return properties;
-  }
-}
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/TaskProperties.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/TaskProperties.java
new file mode 100644
index 000000000..6a88da6b3
--- /dev/null
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/TaskProperties.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizing;
+
+import org.apache.amoro.OptimizerProperties;
+import org.apache.amoro.utils.PropertyUtil;
+import org.apache.amoro.utils.map.StructLikeCollections;
+
+import java.util.Map;
+
+public class TaskProperties {
+
+  public static final String TASK_EXECUTOR_FACTORY_IMPL = 
"task-executor-factory-impl";
+
+  public static final String PROCESS_ID = "process-id";
+  public static final String UNKNOWN_PROCESS_ID = "unknown";
+
+  public static final String EXTEND_DISK_STORAGE =
+      OptimizerProperties.OPTIMIZER_EXTEND_DISK_STORAGE;
+  public static final boolean EXTEND_DISK_STORAGE_DEFAULT = false;
+
+  public static final String MEMORY_STORAGE_SIZE =
+      OptimizerProperties.OPTIMIZER_MEMORY_STORAGE_SIZE;
+  public static final long MEMORY_STORAGE_SIZE_DEFAULT = 512 * 1024 * 1024;
+
+  public static final String DISK_STORAGE_PATH = 
OptimizerProperties.OPTIMIZER_DISK_STORAGE_PATH;
+
+  public static final String OUTPUT_DIR = "output_location";
+
+  public static final String MOVE_FILE_TO_HIVE_LOCATION = 
"move-files-to-hive-location";
+
+  public static StructLikeCollections getStructLikeCollections(Map<String, 
String> properties) {
+    boolean enableSpillMap =
+        PropertyUtil.propertyAsBoolean(
+            properties, EXTEND_DISK_STORAGE, EXTEND_DISK_STORAGE_DEFAULT);
+    long maxInMemory =
+        PropertyUtil.propertyAsLong(properties, MEMORY_STORAGE_SIZE, 
MEMORY_STORAGE_SIZE_DEFAULT);
+    String spillMapPath = properties.get(DISK_STORAGE_PATH);
+
+    return new StructLikeCollections(enableSpillMap, maxInMemory, 
spillMapPath);
+  }
+
+  public static String getProcessId(Map<String, String> properties) {
+    String processId = properties.get(PROCESS_ID);
+    if (processId == null || processId.trim().isEmpty()) {
+      processId = UNKNOWN_PROCESS_ID;
+    }
+    return processId;
+  }
+}
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
index 80634586f..0c0c6a422 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
@@ -21,7 +21,6 @@ package org.apache.amoro.optimizing.plan;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.config.OptimizingConfig;
 import org.apache.amoro.optimizing.HealthScoreInfo;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
 import org.apache.amoro.optimizing.OptimizingType;
 import org.apache.amoro.optimizing.RewriteFilesInput;
 import org.apache.amoro.optimizing.RewriteStageTask;
@@ -168,7 +167,7 @@ public abstract class AbstractPartitionPlan implements 
PartitionEvaluator {
 
   protected abstract TaskSplitter buildTaskSplitter();
 
-  protected abstract OptimizingInputProperties buildTaskProperties();
+  protected abstract Map<String, String> buildTaskProperties();
 
   protected void markSequence(long sequence) {
     if (fromSequence == null || fromSequence > sequence) {
@@ -305,7 +304,7 @@ public abstract class AbstractPartitionPlan implements 
PartitionEvaluator {
       return rewritePosDataFiles;
     }
 
-    public RewriteStageTask buildTask(OptimizingInputProperties properties) {
+    public RewriteStageTask buildTask(Map<String, String> properties) {
       Set<ContentFile<?>> readOnlyDeleteFiles = Sets.newHashSet();
       Set<ContentFile<?>> rewriteDeleteFiles = Sets.newHashSet();
       for (ContentFile<?> deleteFile : deleteFiles) {
@@ -325,8 +324,7 @@ public abstract class AbstractPartitionPlan implements 
PartitionEvaluator {
       PartitionSpec spec =
           MixedTableUtil.getMixedTablePartitionSpecById(tableObject, 
partition.first());
       String partitionPath = spec.partitionToPath(partition.second());
-      return new RewriteStageTask(
-          identifier.getId(), partitionPath, input, 
properties.getProperties());
+      return new RewriteStageTask(identifier.getId(), partitionPath, input, 
properties);
     }
   }
 
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
index 03ba74cca..998d08624 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
@@ -21,12 +21,14 @@ package org.apache.amoro.optimizing.plan;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.config.OptimizingConfig;
 import org.apache.amoro.optimizing.IcebergRewriteExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
+import org.apache.amoro.optimizing.TaskProperties;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.table.MixedTable;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.util.Pair;
 
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public class IcebergPartitionPlan extends AbstractPartitionPlan {
@@ -55,9 +57,10 @@ public class IcebergPartitionPlan extends 
AbstractPartitionPlan {
   }
 
   @Override
-  protected OptimizingInputProperties buildTaskProperties() {
-    OptimizingInputProperties properties = new OptimizingInputProperties();
-    
properties.setExecutorFactoryImpl(IcebergRewriteExecutorFactory.class.getName());
+  protected Map<String, String> buildTaskProperties() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(
+        TaskProperties.TASK_EXECUTOR_FACTORY_IMPL, 
IcebergRewriteExecutorFactory.class.getName());
     return properties;
   }
 
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
index ae38b8758..5d77f76ff 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
@@ -24,7 +24,7 @@ import org.apache.amoro.data.DataFileType;
 import org.apache.amoro.data.DataTreeNode;
 import org.apache.amoro.data.PrimaryKeyedFile;
 import org.apache.amoro.optimizing.MixedIcebergRewriteExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
+import org.apache.amoro.optimizing.TaskProperties;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -90,9 +90,11 @@ public class MixedIcebergPartitionPlan extends 
AbstractPartitionPlan {
   }
 
   @Override
-  protected OptimizingInputProperties buildTaskProperties() {
-    OptimizingInputProperties properties = new OptimizingInputProperties();
-    
properties.setExecutorFactoryImpl(MixedIcebergRewriteExecutorFactory.class.getName());
+  protected Map<String, String> buildTaskProperties() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(
+        TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
+        MixedIcebergRewriteExecutorFactory.class.getName());
     return properties;
   }
 
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleMap.java 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleMap.java
index 338108dab..ba002752e 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleMap.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleMap.java
@@ -19,12 +19,27 @@
 package org.apache.amoro.utils.map;
 
 import java.io.Closeable;
+import java.util.Objects;
+import java.util.function.BiFunction;
 
-public interface SimpleMap<T, K> extends Closeable {
+public interface SimpleMap<K, V> extends Closeable {
 
-  void put(T key, K value);
+  void put(K key, V value);
 
-  void delete(T key);
+  void delete(K key);
 
-  K get(T key);
+  V get(K key);
+
+  default V merge(K key, V value, BiFunction<? super V, ? super V, ? extends 
V> remappingFunction) {
+    Objects.requireNonNull(remappingFunction);
+    Objects.requireNonNull(value);
+    V oldValue = get(key);
+    V newValue = (oldValue == null) ? value : 
remappingFunction.apply(oldValue, value);
+    if (newValue == null) {
+      delete(key);
+    } else {
+      put(key, newValue);
+    }
+    return newValue;
+  }
 }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeBaseMap.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeBaseMap.java
index 487922709..76f8c74c1 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeBaseMap.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeBaseMap.java
@@ -23,6 +23,7 @@ import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.StructLikeWrapper;
 
 import java.io.IOException;
+import java.util.function.BiFunction;
 
 public abstract class StructLikeBaseMap<T> implements SimpleMap<StructLike, T> 
{
 
@@ -54,6 +55,12 @@ public abstract class StructLikeBaseMap<T> implements 
SimpleMap<StructLike, T> {
     wrapper.set(null); // don't hold a reference to the key.
   }
 
+  @Override
+  public T merge(
+      StructLike key, T value, BiFunction<? super T, ? super T, ? extends T> 
remappingFunction) {
+    return getInternalMap().merge(structLikeWrapper.copyFor(key), value, 
remappingFunction);
+  }
+
   @Override
   public void close() throws IOException {
     getInternalMap().close();
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeMemoryMap.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeMemoryMap.java
index c357d697e..85957a405 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeMemoryMap.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeMemoryMap.java
@@ -23,6 +23,7 @@ import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.StructLikeWrapper;
 
 import java.util.HashMap;
+import java.util.function.BiFunction;
 
 /** Map implementation for {@link StructLikeWrapper} as the key based on 
memory. */
 public class StructLikeMemoryMap<T> extends StructLikeBaseMap<T> {
@@ -62,6 +63,14 @@ public class StructLikeMemoryMap<T> extends 
StructLikeBaseMap<T> {
       return map.get(key);
     }
 
+    @Override
+    public T merge(
+        StructLikeWrapper key,
+        T value,
+        BiFunction<? super T, ? super T, ? extends T> remappingFunction) {
+      return map.merge(key, value, remappingFunction);
+    }
+
     @Override
     public void close() {
       // do nothing and gc will discard it
diff --git 
a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
index 01eaf4180..442efaf74 100644
--- 
a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
+++ 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
@@ -23,6 +23,7 @@ import org.apache.amoro.TableFormat;
 import org.apache.amoro.catalog.BasicCatalogTestHelper;
 import org.apache.amoro.catalog.TableTestBase;
 import org.apache.amoro.io.reader.CombinedDeleteFilter;
+import org.apache.amoro.io.reader.DeleteCache;
 import org.apache.amoro.io.reader.GenericCombinedIcebergDataReader;
 import org.apache.amoro.optimizing.RewriteFilesInput;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
@@ -69,19 +70,35 @@ public class TestIcebergCombinedReader extends 
TableTestBase {
 
   private RewriteFilesInput filterEqDeleteScanTask;
 
-  public TestIcebergCombinedReader(boolean partitionedTable, FileFormat 
fileFormat) {
+  public TestIcebergCombinedReader(
+      boolean partitionedTable, FileFormat fileFormat, boolean 
deleteCacheEnabled) {
     super(
         new BasicCatalogTestHelper(TableFormat.ICEBERG),
         new BasicTableTestHelper(false, partitionedTable, 
buildTableProperties(fileFormat)));
     this.fileFormat = fileFormat;
+    if (deleteCacheEnabled) {
+      System.setProperty(DeleteCache.DELETE_CACHE_ENABLED, "true");
+    } else {
+      System.setProperty(DeleteCache.DELETE_CACHE_ENABLED, "false");
+    }
   }
 
-  @Parameterized.Parameters(name = "partitionedTable = {0}, fileFormat = {1}")
+  @Parameterized.Parameters(
+      name = "partitionedTable = {0}, fileFormat = {1}, deleteCacheEnabled = 
{2}")
   public static Object[][] parameters() {
     return new Object[][] {
-      {true, FileFormat.PARQUET}, {false, FileFormat.PARQUET},
-      {true, FileFormat.AVRO}, {false, FileFormat.AVRO},
-      {true, FileFormat.ORC}, {false, FileFormat.ORC}
+      {true, FileFormat.PARQUET, true},
+      {false, FileFormat.PARQUET, true},
+      {true, FileFormat.AVRO, true},
+      {false, FileFormat.AVRO, true},
+      {true, FileFormat.ORC, true},
+      {false, FileFormat.ORC, true},
+      {true, FileFormat.PARQUET, false},
+      {false, FileFormat.PARQUET, false},
+      {true, FileFormat.AVRO, false},
+      {false, FileFormat.AVRO, false},
+      {true, FileFormat.ORC, false},
+      {false, FileFormat.ORC, false}
     };
   }
 
@@ -197,7 +214,8 @@ public class TestIcebergCombinedReader extends 
TableTestBase {
             IdentityPartitionConverters::convertConstant,
             false,
             null,
-            scanTask);
+            scanTask,
+            "");
     try (CloseableIterable<Record> records = dataReader.readData()) {
       Assert.assertEquals(1, Iterables.size(records));
       Record record = Iterables.getFirst(records, null);
@@ -219,7 +237,8 @@ public class TestIcebergCombinedReader extends 
TableTestBase {
             IdentityPartitionConverters::convertConstant,
             false,
             null,
-            scanTask);
+            scanTask,
+            "");
     try (CloseableIterable<Record> records = dataReader.readDeletedData()) {
       Assert.assertEquals(2, Iterables.size(records));
       Record first = Iterables.getFirst(records, null);
@@ -243,7 +262,8 @@ public class TestIcebergCombinedReader extends 
TableTestBase {
             IdentityPartitionConverters::convertConstant,
             false,
             null,
-            dataScanTask);
+            dataScanTask,
+            "");
     try (CloseableIterable<Record> records = dataReader.readData()) {
       Assert.assertEquals(3, Iterables.size(records));
     }
@@ -263,7 +283,8 @@ public class TestIcebergCombinedReader extends 
TableTestBase {
             IdentityPartitionConverters::convertConstant,
             false,
             null,
-            dataScanTask);
+            dataScanTask,
+            "");
     try (CloseableIterable<Record> records = dataReader.readDeletedData()) {
       Assert.assertEquals(0, Iterables.size(records));
     }
@@ -284,7 +305,8 @@ public class TestIcebergCombinedReader extends 
TableTestBase {
             IdentityPartitionConverters::convertConstant,
             false,
             null,
-            filterEqDeleteScanTask);
+            filterEqDeleteScanTask,
+            "");
 
     Assert.assertTrue(dataReader.getDeleteFilter().isFilterEqDelete());
 
@@ -363,7 +385,8 @@ public class TestIcebergCombinedReader extends 
TableTestBase {
             IdentityPartitionConverters::convertConstant,
             false,
             null,
-            task2);
+            task2,
+            "");
     try (CloseableIterable<Record> readRecords = dataReader.readData()) {
       Assert.assertEquals(1, Iterables.size(readRecords));
     }
@@ -437,7 +460,8 @@ public class TestIcebergCombinedReader extends 
TableTestBase {
             IdentityPartitionConverters::convertConstant,
             false,
             null,
-            task);
+            task,
+            "");
     try (CloseableIterable<Record> readRecords = dataReader.readData()) {
       Assert.assertEquals(0, Iterables.size(readRecords));
     }
@@ -511,7 +535,8 @@ public class TestIcebergCombinedReader extends 
TableTestBase {
             IdentityPartitionConverters::convertConstant,
             false,
             null,
-            task);
+            task,
+            "");
 
     try (CloseableIterable<Record> readRecords = dataReader.readData()) {
       Assert.assertEquals(0, Iterables.size(readRecords));
diff --git 
a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReaderVariousTypes.java
 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReaderVariousTypes.java
index 551d1d76e..40af54ae3 100644
--- 
a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReaderVariousTypes.java
+++ 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReaderVariousTypes.java
@@ -146,7 +146,8 @@ public class TestIcebergCombinedReaderVariousTypes extends 
TableTestBase {
                 IdentityPartitionConverters::convertConstant,
                 false,
                 null,
-                input)
+                input,
+                "")
             .readData();
 
     Assert.assertEquals(Iterables.size(readData), 1);
@@ -197,7 +198,8 @@ public class TestIcebergCombinedReaderVariousTypes extends 
TableTestBase {
             IdentityPartitionConverters::convertConstant,
             false,
             null,
-            input);
+            input,
+            "");
     Assert.assertTrue(reader.getDeleteFilter().isFilterEqDelete());
 
     CloseableIterable<Record> readData = reader.readData();
diff --git 
a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
index d66c399a8..37b79036c 100644
--- 
a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
+++ 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
@@ -27,7 +27,6 @@ import 
org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
-import org.apache.amoro.utils.map.StructLikeCollections;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
@@ -168,7 +167,7 @@ public class IcebergRewriteExecutorTest extends 
TableTestBase {
   @Test
   public void readAllData() throws IOException {
     IcebergRewriteExecutor executor =
-        new IcebergRewriteExecutor(scanTask, getMixedTable(), 
StructLikeCollections.DEFAULT);
+        new IcebergRewriteExecutor(scanTask, getMixedTable(), 
Collections.emptyMap());
 
     RewriteFilesOutput output = executor.execute();
 
@@ -212,7 +211,7 @@ public class IcebergRewriteExecutorTest extends 
TableTestBase {
   @Test
   public void readOnlyData() throws IOException {
     IcebergRewriteExecutor executor =
-        new IcebergRewriteExecutor(dataScanTask, getMixedTable(), 
StructLikeCollections.DEFAULT);
+        new IcebergRewriteExecutor(dataScanTask, getMixedTable(), 
Collections.emptyMap());
 
     RewriteFilesOutput output = executor.execute();
 
diff --git 
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutor.java
 
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutor.java
index b37a6f34b..2c32301e4 100644
--- 
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutor.java
+++ 
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutor.java
@@ -24,9 +24,9 @@ import 
org.apache.amoro.io.writer.MixedTreeNodePosDeleteWriter;
 import org.apache.amoro.optimizing.AbstractRewriteFilesExecutor;
 import org.apache.amoro.optimizing.OptimizingDataReader;
 import org.apache.amoro.optimizing.RewriteFilesInput;
+import org.apache.amoro.optimizing.TaskProperties;
 import org.apache.amoro.table.MixedTable;
 import org.apache.amoro.table.WriteOperationKind;
-import org.apache.amoro.utils.map.StructLikeCollections;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.deletes.PositionDelete;
@@ -36,6 +36,7 @@ import org.apache.iceberg.io.FileWriter;
 import org.apache.iceberg.io.TaskWriter;
 
 import java.util.List;
+import java.util.Map;
 
 /** OptimizingExecutor form mixed format */
 public class MixedHiveRewriteExecutor extends AbstractRewriteFilesExecutor {
@@ -43,12 +44,9 @@ public class MixedHiveRewriteExecutor extends 
AbstractRewriteFilesExecutor {
   private final String outputDir;
 
   public MixedHiveRewriteExecutor(
-      RewriteFilesInput input,
-      MixedTable table,
-      StructLikeCollections structLikeCollections,
-      String outputDir) {
-    super(input, table, structLikeCollections);
-    this.outputDir = outputDir;
+      RewriteFilesInput input, MixedTable table, Map<String, String> 
properties) {
+    super(input, table, properties);
+    this.outputDir = properties.get(TaskProperties.OUTPUT_DIR);
   }
 
   @Override
diff --git 
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutorFactory.java
 
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutorFactory.java
index 9d79c935e..3e94af5d6 100644
--- 
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutorFactory.java
+++ 
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutorFactory.java
@@ -20,8 +20,8 @@ package org.apache.amoro.hive.optimizing;
 
 import org.apache.amoro.optimizing.OptimizingExecutor;
 import org.apache.amoro.optimizing.OptimizingExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
 import org.apache.amoro.optimizing.RewriteFilesInput;
+import org.apache.amoro.optimizing.RewriteFilesOutput;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 
 import java.util.Map;
@@ -38,12 +38,7 @@ public class MixedHiveRewriteExecutorFactory
   }
 
   @Override
-  public OptimizingExecutor createExecutor(RewriteFilesInput input) {
-    OptimizingInputProperties optimizingConfig = 
OptimizingInputProperties.parse(properties);
-    return new MixedHiveRewriteExecutor(
-        input,
-        input.getTable(),
-        optimizingConfig.getStructLikeCollections(),
-        optimizingConfig.getOutputDir());
+  public OptimizingExecutor<RewriteFilesOutput> 
createExecutor(RewriteFilesInput input) {
+    return new MixedHiveRewriteExecutor(input, input.getTable(), properties);
   }
 }
diff --git 
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
 
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
index 004843b7f..ac17e565c 100644
--- 
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
+++ 
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
@@ -24,7 +24,7 @@ import org.apache.amoro.data.DataFileType;
 import org.apache.amoro.data.PrimaryKeyedFile;
 import org.apache.amoro.hive.optimizing.MixedHiveRewriteExecutorFactory;
 import org.apache.amoro.hive.utils.HiveTableUtil;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
+import org.apache.amoro.optimizing.TaskProperties;
 import org.apache.amoro.optimizing.plan.CommonPartitionEvaluator;
 import org.apache.amoro.optimizing.plan.MixedIcebergPartitionPlan;
 import org.apache.amoro.optimizing.plan.PartitionEvaluator;
@@ -116,13 +116,14 @@ public class MixedHivePartitionPlan extends 
MixedIcebergPartitionPlan {
   }
 
   @Override
-  protected OptimizingInputProperties buildTaskProperties() {
-    OptimizingInputProperties properties = super.buildTaskProperties();
-    
properties.setExecutorFactoryImpl(MixedHiveRewriteExecutorFactory.class.getName());
+  protected Map<String, String> buildTaskProperties() {
+    Map<String, String> properties = super.buildTaskProperties();
+    properties.put(
+        TaskProperties.TASK_EXECUTOR_FACTORY_IMPL, 
MixedHiveRewriteExecutorFactory.class.getName());
     if (moveFiles2CurrentHiveLocation()) {
-      properties.needMoveFile2HiveLocation();
+      properties.put(TaskProperties.MOVE_FILE_TO_HIVE_LOCATION, "true");
     } else if (evaluator().isFullNecessary()) {
-      properties.setOutputDir(constructCustomHiveSubdirectory());
+      properties.put(TaskProperties.OUTPUT_DIR, 
constructCustomHiveSubdirectory());
     }
     return properties;
   }
diff --git 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
index 7fd1c8b26..544d31f24 100644
--- 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
+++ 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
@@ -79,6 +79,30 @@ public class OptimizerConfig implements Serializable {
   @Option(name = "-id", aliases = "--" + OptimizerProperties.RESOURCE_ID, 
usage = "Resource id")
   private String resourceId;
 
+  @Option(
+      name = "-ce",
+      aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_ENABLED,
+      usage = "Whether enable cache, default false")
+  private boolean cacheEnabled = 
OptimizerProperties.OPTIMIZER_CACHE_ENABLED_DEFAULT;
+
+  @Option(
+      name = "-cmts",
+      aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_MAX_TOTAL_SIZE,
+      usage = "Max total size in cache, default 128MB")
+  private String cacheMaxTotalSize = 
OptimizerProperties.OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT;
+
+  @Option(
+      name = "-cmes",
+      aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_MAX_ENTRY_SIZE,
+      usage = "Max entry size in cache, default 64MB")
+  private String cacheMaxEntrySize = 
OptimizerProperties.OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT;
+
+  @Option(
+      name = "-ct",
+      aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_TIMEOUT,
+      usage = "Timeout in cache, default 10minutes")
+  private String cacheTimeout = 
OptimizerProperties.OPTIMIZER_CACHE_TIMEOUT_DEFAULT;
+
   public OptimizerConfig() {}
 
   public OptimizerConfig(String[] args) throws CmdLineException {
@@ -158,6 +182,38 @@ public class OptimizerConfig implements Serializable {
     this.resourceId = resourceId;
   }
 
+  public boolean isCacheEnabled() {
+    return cacheEnabled;
+  }
+
+  public void setCacheEnabled(boolean cacheEnabled) {
+    this.cacheEnabled = cacheEnabled;
+  }
+
+  public String getCacheMaxTotalSize() {
+    return cacheMaxTotalSize;
+  }
+
+  public void setCacheMaxTotalSize(String cacheMaxTotalSize) {
+    this.cacheMaxTotalSize = cacheMaxTotalSize;
+  }
+
+  public String getCacheMaxEntrySize() {
+    return cacheMaxEntrySize;
+  }
+
+  public void setCacheMaxEntrySize(String cacheMaxEntrySize) {
+    this.cacheMaxEntrySize = cacheMaxEntrySize;
+  }
+
+  public String getCacheTimeout() {
+    return cacheTimeout;
+  }
+
+  public void setCacheTimeout(String cacheTimeout) {
+    this.cacheTimeout = cacheTimeout;
+  }
+
   @Override
   public String toString() {
     return MoreObjects.toStringHelper(this)
@@ -167,9 +223,13 @@ public class OptimizerConfig implements Serializable {
         .add("groupName", groupName)
         .add("heartBeat", heartBeat)
         .add("extendDiskStorage", extendDiskStorage)
-        .add("rocksDBBasePath", diskStoragePath)
+        .add("diskStoragePath", diskStoragePath)
         .add("memoryStorageSize", memoryStorageSize)
         .add("resourceId", resourceId)
+        .add("cacheEnabled", cacheEnabled)
+        .add("cacheMaxTotalSize", cacheMaxTotalSize)
+        .add("cacheMaxEntrySize", cacheMaxEntrySize)
+        .add("cacheTimeout", cacheTimeout)
         .toString();
   }
 }
diff --git 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
index d40e7ec94..6b1dc0a6c 100644
--- 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
+++ 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
@@ -20,10 +20,12 @@ package org.apache.amoro.optimizer.common;
 
 import org.apache.amoro.api.OptimizingTask;
 import org.apache.amoro.api.OptimizingTaskResult;
+import org.apache.amoro.io.reader.DeleteCache;
 import org.apache.amoro.optimizing.OptimizingExecutor;
 import org.apache.amoro.optimizing.OptimizingExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
 import org.apache.amoro.optimizing.TableOptimizing;
+import org.apache.amoro.optimizing.TaskProperties;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.shade.thrift.org.apache.thrift.TException;
 import org.apache.amoro.utils.ExceptionUtil;
 import org.apache.amoro.utils.SerializationUtil;
@@ -32,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 public class OptimizerExecutor extends AbstractOptimizerOperator {
 
@@ -147,24 +150,18 @@ public class OptimizerExecutor extends 
AbstractOptimizerOperator {
   public static OptimizingTaskResult executeTask(
       OptimizerConfig config, int threadId, OptimizingTask task, Logger 
logger) {
     long startTime = System.currentTimeMillis();
-    TableOptimizing.OptimizingInput input = null;
+    TableOptimizing.OptimizingInput input;
     try {
-      OptimizingInputProperties properties = 
OptimizingInputProperties.parse(task.getProperties());
+      Map<String, String> taskProperties = fillTaskProperties(config, task);
       input = SerializationUtil.simpleDeserialize(task.getTaskInput());
-      String executorFactoryImpl = properties.getExecutorFactoryImpl();
+      String executorFactoryImpl = 
taskProperties.get(TaskProperties.TASK_EXECUTOR_FACTORY_IMPL);
       DynConstructors.Ctor<OptimizingExecutorFactory> ctor =
           DynConstructors.builder(OptimizingExecutorFactory.class)
               .impl(executorFactoryImpl)
               .buildChecked();
       OptimizingExecutorFactory factory = ctor.newInstance();
 
-      if (config.isExtendDiskStorage()) {
-        properties.enableSpillMap();
-      }
-      properties.setMaxSizeInMemory(config.getMemoryStorageSize() * 1024 * 
1024);
-      properties.setSpillMapPath(config.getDiskStoragePath());
-      factory.initialize(properties.getProperties());
-
+      factory.initialize(taskProperties);
       OptimizingExecutor executor = factory.createExecutor(input);
       TableOptimizing.OptimizingOutput output = executor.execute();
       ByteBuffer outputByteBuffer = SerializationUtil.simpleSerialize(output);
@@ -190,4 +187,31 @@ public class OptimizerExecutor extends 
AbstractOptimizerOperator {
       return errorResult;
     }
   }
+
+  private static Map<String, String> fillTaskProperties(
+      OptimizerConfig config, OptimizingTask task) {
+    if (config.isCacheEnabled()) {
+      System.setProperty(DeleteCache.DELETE_CACHE_ENABLED, "true");
+    }
+    if 
(!config.getCacheMaxEntrySize().equals(DeleteCache.DELETE_CACHE_MAX_ENTRY_SIZE_DEFAULT))
 {
+      System.setProperty(DeleteCache.DELETE_CACHE_MAX_ENTRY_SIZE, 
config.getCacheMaxEntrySize());
+    }
+    if 
(!config.getCacheMaxTotalSize().equals(DeleteCache.DELETE_CACHE_MAX_TOTAL_SIZE_DEFAULT))
 {
+      System.setProperty(
+          DeleteCache.DELETE_CACHE_MAX_TOTAL_SIZE_DEFAULT, 
config.getCacheMaxTotalSize());
+    }
+    if (!config.getCacheTimeout().equals(DeleteCache.DELETE_CACHE_TIMEOUT)) {
+      System.setProperty(DeleteCache.DELETE_CACHE_TIMEOUT, 
config.getCacheTimeout());
+    }
+    Map<String, String> properties = Maps.newHashMap(task.getProperties());
+    properties.put(TaskProperties.PROCESS_ID, 
String.valueOf(task.getTaskId().getProcessId()));
+    if (config.isExtendDiskStorage()) {
+      properties.put(TaskProperties.EXTEND_DISK_STORAGE, "true");
+    }
+    properties.put(
+        TaskProperties.MEMORY_STORAGE_SIZE,
+        String.valueOf(config.getMemoryStorageSize() * 1024 * 1024));
+    properties.put(TaskProperties.DISK_STORAGE_PATH, 
config.getDiskStoragePath());
+    return properties;
+  }
 }
diff --git 
a/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizerExecutor.java
 
b/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizerExecutor.java
index 6909ddc02..5bacb7364 100644
--- 
a/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizerExecutor.java
+++ 
b/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizerExecutor.java
@@ -25,8 +25,8 @@ import org.apache.amoro.api.OptimizingTaskResult;
 import org.apache.amoro.optimizing.BaseOptimizingInput;
 import org.apache.amoro.optimizing.OptimizingExecutor;
 import org.apache.amoro.optimizing.OptimizingExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
 import org.apache.amoro.optimizing.TableOptimizing;
+import org.apache.amoro.optimizing.TaskProperties;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.shade.thrift.org.apache.thrift.TException;
 import org.apache.amoro.utils.SerializationUtil;
@@ -129,8 +129,7 @@ public class TestOptimizerExecutor extends 
OptimizerTestBase {
       optimizingTask.setTaskInput(SerializationUtil.simpleSerialize(this));
       Map<String, String> inputProperties = Maps.newHashMap();
       inputProperties.put(
-          OptimizingInputProperties.TASK_EXECUTOR_FACTORY_IMPL,
-          TestOptimizingExecutorFactory.class.getName());
+          TaskProperties.TASK_EXECUTOR_FACTORY_IMPL, 
TestOptimizingExecutorFactory.class.getName());
       optimizingTask.setProperties(inputProperties);
       return optimizingTask;
     }
diff --git a/docs/admin-guides/managing-optimizers.md 
b/docs/admin-guides/managing-optimizers.md
index 61d86ab61..aebea60bd 100644
--- a/docs/admin-guides/managing-optimizers.md
+++ b/docs/admin-guides/managing-optimizers.md
@@ -282,9 +282,13 @@ The optimizer group supports the following properties:
 | Property                       | Container type | Required | Default         
                                                                      | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                      [...]
 
|--------------------------------|----------------|----------|---------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
 | scheduling-policy              | All            | No       | quota           
                                                                      | The 
scheduler group scheduling policy, the default value is `quota`, it will be 
scheduled according to the quota resources configured for each table, the 
larger the table quota is, the more optimizer resources it can take. There is 
also a configuration `balanced` that will balance the scheduling of each table, 
the longer the table has not  [...]
-| memory                         | Local          | Yes      | N/A             
                                                                      | The max 
memory of JVM for local optimizer, in MBs.                                      
                                                                                
                                                                                
                                                                                
              [...]
 | max-input-file-size-per-thread | All            | No       | 
536870912(512MB)                                                                
      | Max input file size per optimize thread.                                
                                                                                
                                                                                
                                                                                
                              [...]
 | ams-optimizing-uri             | All            | No       | 
thrift://{ams.server-expose-host}:{ams.thrift-server.optimizing-service.binding-port}
 | Table optimizing service endpoint. This is used when the default service 
endpoint is not visitable.                                                      
                                                                                
                                                                                
                             [...]
+| cache-enabled                  | All            | No       | false           
                                                                      | Whether 
enable cache in optimizer.                                                      
                                                                                
                                                                                
                                                                                
              [...]
+| cache-max-total-size           | All            | No       | 128mb           
                                                                      | Max 
total size in optimier cache.                                                   
                                                                                
                                                                                
                                                                                
                  [...]
+| cache-max-entry-size           | All            | No       | 64mb            
                                                                      | Max 
entry size in optimizer cache.                                                  
                                                                                
                                                                                
                                                                                
                  [...]
+| cache-timeout                  | All            | No       | 10min           
                                                                      | Timeout 
in optimizer cache.                                                             
                                                                                
                                                                                
                                                                                
              [...]
+| memory                         | Local          | Yes      | N/A             
                                                                      | The max 
memory of JVM for local optimizer, in MBs.                                      
                                                                                
                                                                                
                                                                                
              [...]
 | flink-conf.\<key\>             | Flink          | No       | N/A             
                                                                      | Any 
flink config options could be overwritten, priority is optimizing-group > 
optimizing-container > flink-conf.yaml.                                         
                                                                                
                                                                                
                        [...]
 | spark-conf.\<key\>             | Spark          | No       | N/A             
                                                                      | Any 
spark config options could be overwritten, priority is optimizing-group > 
optimizing-container > spark-defaults.conf.                                     
                                                                                
                                                                                
                        [...]
 
@@ -351,6 +355,10 @@ The description of the relevant parameters is shown in the 
following table:
 | -eds     | No       | Whether extend storage to disk, default false.         
                                                                                
                                                                                
                   |
 | -dsp     | No       | Defines the directory where the storage files are 
saved, the default temporary-file directory is specified by the system property 
`java.io.tmpdir`. On UNIX systems the default value of this property is 
typically "/tmp" or "/var/tmp". |
 | -msz     | No       | Memory storage size limit when extending disk 
storage(MB), default 512(MB).                                                   
                                                                                
                            |
+| -ce      | No       | Whether enable cache in optimizer, default false.      
                                                                                
                                                                                
                   |
+| -cmts    | No       | Max total size in optimier cache, default 128MB.       
                                                                                
                                                                                
                   |
+| -cmes    | No       | Max entry size in optimizer cache, default 64MB.       
                                                                                
                                                                                
                   |
+| -ct      | No       | Timeout in optimizer cache, default 10Min.             
                                                                                
                                                                                
                   |
 
 
 Or you can submit optimizer in your own Spark task development platform or 
local Spark environment with the following configuration. The main parameters 
include:
@@ -378,3 +386,7 @@ The description of the relevant parameters is shown in the 
following table:
 | -eds     | No       | Whether extend storage to disk, default false.         
                                                                                
                                                                                
                   |
 | -dsp     | No       | Defines the directory where the storage files are 
saved, the default temporary-file directory is specified by the system property 
`java.io.tmpdir`. On UNIX systems the default value of this property is 
typically "/tmp" or "/var/tmp". |
 | -msz     | No       | Memory storage size limit when extending disk 
storage(MB), default 512(MB).                                                   
                                                                                
                            |
+| -ce      | No       | Whether enable cache in optimizer, default false.      
                                                                                
                                                                                
                   |
+| -cmts    | No       | Max total size in optimier cache, default 128MB.       
                                                                                
                                                                                
                   |
+| -cmes    | No       | Max entry size in optimizer cache, default 64MB.       
                                                                                
                                                                                
                   |
+| -ct      | No       | Timeout in optimizer cache, default 10Min.             
                                                                                
                                                                                
                   |

Reply via email to