This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 715320f89 [AMORO-2553] Add delete file cache for Iceberg optimizer
(#3793)
715320f89 is described below
commit 715320f8908784b86e69ceca269ac5d62ade4238
Author: ZhouJinsong <[email protected]>
AuthorDate: Wed Oct 15 17:38:33 2025 +0800
[AMORO-2553] Add delete file cache for Iceberg optimizer (#3793)
* Add delete cache for iceberg rewrite executor
* Fix some compile errors
* Add merge method to structLikeMap to improve compaction performance
* Access getOrReadEqDeletes via reflection
* Add configuration for delete cache
* Fix a unit test error
* Change delete-cache-enabled to false by default
* Fix optimizer config name -ce
* Fix some issues according to the comments.
---
.../server/manager/AbstractOptimizerContainer.java | 30 +++-
.../server/optimizing/UnKeyedTableCommit.java | 12 +-
.../optimizing/flow/CompleteOptimizingFlow.java | 9 +-
.../checker/FullOptimizingMove2HiveChecker.java | 10 +-
.../checker/FullOptimizingWrite2HiveChecker.java | 10 +-
.../plan/TestHiveKeyedPartitionPlan.java | 7 +-
.../plan/TestHiveUnkeyedPartitionPlan.java | 7 +-
.../optimizing/plan/TestIcebergPartitionPlan.java | 5 +-
.../optimizing/plan/TestKeyedPartitionPlan.java | 4 +-
.../optimizing/plan/TestOptimizingPlanner.java | 4 +-
.../optimizing/plan/TestUnkeyedPartitionPlan.java | 4 +-
.../java/org/apache/amoro/OptimizerProperties.java | 8 +
.../amoro/io/reader/CombinedDeleteFilter.java | 94 +++++-----
.../org/apache/amoro/io/reader/DeleteCache.java | 191 +++++++++++++++++++++
.../reader/GenericCombinedIcebergDataReader.java | 10 +-
.../optimizing/AbstractRewriteFilesExecutor.java | 7 +-
.../amoro/optimizing/IcebergRewriteExecutor.java | 11 +-
.../optimizing/IcebergRewriteExecutorFactory.java | 6 +-
.../optimizing/MixedIcebergRewriteExecutor.java | 9 +-
.../MixedIcebergRewriteExecutorFactory.java | 7 +-
.../optimizing/OptimizingExecutorFactory.java | 2 +-
.../optimizing/OptimizingInputProperties.java | 117 -------------
.../apache/amoro/optimizing/TaskProperties.java | 66 +++++++
.../optimizing/plan/AbstractPartitionPlan.java | 8 +-
.../optimizing/plan/IcebergPartitionPlan.java | 11 +-
.../optimizing/plan/MixedIcebergPartitionPlan.java | 10 +-
.../java/org/apache/amoro/utils/map/SimpleMap.java | 23 ++-
.../apache/amoro/utils/map/StructLikeBaseMap.java | 7 +
.../amoro/utils/map/StructLikeMemoryMap.java | 9 +
.../apache/amoro/io/TestIcebergCombinedReader.java | 51 ++++--
.../io/TestIcebergCombinedReaderVariousTypes.java | 6 +-
.../optimizing/IcebergRewriteExecutorTest.java | 5 +-
.../hive/optimizing/MixedHiveRewriteExecutor.java | 12 +-
.../MixedHiveRewriteExecutorFactory.java | 11 +-
.../optimizing/plan/MixedHivePartitionPlan.java | 13 +-
.../amoro/optimizer/common/OptimizerConfig.java | 62 ++++++-
.../amoro/optimizer/common/OptimizerExecutor.java | 46 +++--
.../optimizer/common/TestOptimizerExecutor.java | 5 +-
docs/admin-guides/managing-optimizers.md | 14 +-
39 files changed, 622 insertions(+), 301 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
index 9e8017429..16628406d 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
@@ -23,6 +23,7 @@ import org.apache.amoro.resource.InternalResourceContainer;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceStatus;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.utils.PropertyUtil;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
@@ -91,7 +92,7 @@ public abstract class AbstractOptimizerContainer implements
InternalResourceCont
.append(" -hb ")
.append(resource.getProperties().get(OptimizerProperties.OPTIMIZER_HEART_BEAT_INTERVAL));
}
- if (org.apache.iceberg.util.PropertyUtil.propertyAsBoolean(
+ if (PropertyUtil.propertyAsBoolean(
resource.getProperties(),
OptimizerProperties.OPTIMIZER_EXTEND_DISK_STORAGE,
OptimizerProperties.OPTIMIZER_EXTEND_DISK_STORAGE_DEFAULT)) {
@@ -105,6 +106,33 @@ public abstract class AbstractOptimizerContainer
implements InternalResourceCont
resource.getProperties().get(OptimizerProperties.OPTIMIZER_MEMORY_STORAGE_SIZE));
}
}
+ if (PropertyUtil.propertyAsBoolean(
+ resource.getProperties(),
+ OptimizerProperties.OPTIMIZER_CACHE_ENABLED,
+ OptimizerProperties.OPTIMIZER_CACHE_ENABLED_DEFAULT)) {
+ stringBuilder.append(" -ce ");
+ if (resource
+ .getProperties()
+ .containsKey(OptimizerProperties.OPTIMIZER_CACHE_MAX_TOTAL_SIZE)) {
+ stringBuilder
+ .append(" -cmts ")
+ .append(
+
resource.getProperties().get(OptimizerProperties.OPTIMIZER_CACHE_MAX_TOTAL_SIZE));
+ }
+ if (resource
+ .getProperties()
+ .containsKey(OptimizerProperties.OPTIMIZER_CACHE_MAX_ENTRY_SIZE)) {
+ stringBuilder
+ .append(" -cmes ")
+ .append(
+
resource.getProperties().get(OptimizerProperties.OPTIMIZER_CACHE_MAX_ENTRY_SIZE));
+ }
+ if
(resource.getProperties().containsKey(OptimizerProperties.OPTIMIZER_CACHE_TIMEOUT))
{
+ stringBuilder
+ .append(" -ct ")
+
.append(resource.getProperties().get(OptimizerProperties.OPTIMIZER_CACHE_TIMEOUT));
+ }
+ }
if (StringUtils.isNotEmpty(resource.getResourceId())) {
stringBuilder.append(" -id ").append(resource.getResourceId());
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
index a7bea9742..d71ae484b 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
@@ -31,9 +31,9 @@ import org.apache.amoro.hive.utils.HiveTableUtil;
import org.apache.amoro.hive.utils.TableTypeUtil;
import org.apache.amoro.iceberg.Constants;
import org.apache.amoro.op.SnapshotSummary;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.optimizing.RewriteStageTask;
+import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.properties.HiveTableProperties;
import org.apache.amoro.server.optimizing.TaskRuntime.Status;
import org.apache.amoro.server.utils.IcebergTableUtil;
@@ -43,6 +43,7 @@ import org.apache.amoro.table.UnkeyedTable;
import org.apache.amoro.utils.ContentFiles;
import org.apache.amoro.utils.IcebergThreadPools;
import org.apache.amoro.utils.MixedTableUtil;
+import org.apache.amoro.utils.PropertyUtil;
import org.apache.amoro.utils.TableFileUtil;
import org.apache.amoro.utils.TablePropertyUtil;
import org.apache.commons.collections.CollectionUtils;
@@ -328,8 +329,13 @@ public class UnKeyedTableCommit {
}
protected boolean needMoveFile2Hive() {
- return
OptimizingInputProperties.parse(tasks.stream().findAny().get().getProperties())
- .getMoveFile2HiveLocation();
+ return PropertyUtil.propertyAsBoolean(
+ tasks.stream()
+ .findAny()
+ .orElseThrow(() -> new RuntimeException("The tasks is empty"))
+ .getProperties(),
+ TaskProperties.MOVE_FILE_TO_HIVE_LOCATION,
+ false);
}
protected void correctHiveData(Set<DataFile> addedDataFiles, Set<DeleteFile>
addedDeleteFiles)
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
index e12c53403..f26773395 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
@@ -32,7 +32,6 @@ import
org.apache.amoro.hive.optimizing.MixedHiveRewriteExecutor;
import org.apache.amoro.iceberg.Constants;
import org.apache.amoro.optimizing.IcebergRewriteExecutor;
import org.apache.amoro.optimizing.OptimizingExecutor;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.optimizing.plan.AbstractOptimizingPlanner;
@@ -45,7 +44,6 @@ import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.utils.MixedDataFiles;
import org.apache.amoro.utils.TablePropertyUtil;
-import org.apache.amoro.utils.map.StructLikeCollections;
import org.apache.commons.collections.CollectionUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
@@ -213,13 +211,10 @@ public class CompleteOptimizingFlow {
TaskRuntime<RewriteStageTask> taskRuntime) {
if (table.format() == TableFormat.ICEBERG) {
return new IcebergRewriteExecutor(
- taskRuntime.getTaskDescriptor().getInput(), table,
StructLikeCollections.DEFAULT);
+ taskRuntime.getTaskDescriptor().getInput(), table,
taskRuntime.getProperties());
} else {
return new MixedHiveRewriteExecutor(
- taskRuntime.getTaskDescriptor().getInput(),
- table,
- StructLikeCollections.DEFAULT,
-
OptimizingInputProperties.parse(taskRuntime.getProperties()).getOutputDir());
+ taskRuntime.getTaskDescriptor().getInput(), table,
taskRuntime.getProperties());
}
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingMove2HiveChecker.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingMove2HiveChecker.java
index 9b99be1d8..dab0707ae 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingMove2HiveChecker.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingMove2HiveChecker.java
@@ -18,13 +18,14 @@
package org.apache.amoro.server.optimizing.flow.checker;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.optimizing.RewriteStageTask;
+import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.optimizing.plan.AbstractOptimizingPlanner;
import org.apache.amoro.server.optimizing.UnKeyedTableCommit;
import org.apache.amoro.server.optimizing.flow.view.TableDataView;
import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.utils.PropertyUtil;
import org.apache.commons.collections.CollectionUtils;
import javax.annotation.Nullable;
@@ -45,8 +46,9 @@ public class FullOptimizingMove2HiveChecker extends
AbstractHiveChecker {
@Nullable UnKeyedTableCommit latestCommit) {
return CollectionUtils.isNotEmpty(latestTaskDescriptors)
&& latestPlanner.getOptimizingType() == OptimizingType.FULL
- && OptimizingInputProperties.parse(
- latestTaskDescriptors.stream().findAny().get().getProperties())
- .getMoveFile2HiveLocation();
+ && PropertyUtil.propertyAsBoolean(
+ latestTaskDescriptors.stream().findAny().get().getProperties(),
+ TaskProperties.MOVE_FILE_TO_HIVE_LOCATION,
+ false);
}
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingWrite2HiveChecker.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingWrite2HiveChecker.java
index 105e68a65..98179406d 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingWrite2HiveChecker.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/checker/FullOptimizingWrite2HiveChecker.java
@@ -18,9 +18,9 @@
package org.apache.amoro.server.optimizing.flow.checker;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.optimizing.RewriteStageTask;
+import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.optimizing.plan.AbstractOptimizingPlanner;
import org.apache.amoro.server.optimizing.UnKeyedTableCommit;
import org.apache.amoro.server.optimizing.flow.view.TableDataView;
@@ -45,9 +45,11 @@ public class FullOptimizingWrite2HiveChecker extends
AbstractHiveChecker {
@Nullable UnKeyedTableCommit latestCommit) {
return CollectionUtils.isNotEmpty(latestTaskDescriptors)
&& latestPlanner.getOptimizingType() == OptimizingType.FULL
- && OptimizingInputProperties.parse(
-
latestTaskDescriptors.stream().findAny().get().getProperties())
- .getOutputDir()
+ && latestTaskDescriptors.stream()
+ .findAny()
+ .get()
+ .getProperties()
+ .get(TaskProperties.OUTPUT_DIR)
!= null;
}
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
index 412defbd9..387b18231 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
@@ -28,7 +28,7 @@ import org.apache.amoro.hive.catalog.HiveTableTestHelper;
import org.apache.amoro.hive.optimizing.MixedHiveRewriteExecutorFactory;
import org.apache.amoro.hive.optimizing.plan.MixedHivePartitionPlan;
import org.apache.amoro.hive.table.SupportHive;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
+import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.optimizing.plan.AbstractPartitionPlan;
import org.apache.amoro.properties.HiveTableProperties;
import org.apache.amoro.server.optimizing.OptimizingTestHelpers;
@@ -72,7 +72,7 @@ public class TestHiveKeyedPartitionPlan extends
TestKeyedPartitionPlan {
@Override
protected void assertTaskProperties(Map<String, String> expect, Map<String,
String> actual) {
actual = Maps.newHashMap(actual);
- String outputDir = actual.remove(OptimizingInputProperties.OUTPUT_DIR);
+ String outputDir = actual.remove(TaskProperties.OUTPUT_DIR);
if (outputDir != null) {
Assert.assertTrue(Long.parseLong(outputDir.split("_")[1]) > 0);
}
@@ -136,8 +136,7 @@ public class TestHiveKeyedPartitionPlan extends
TestKeyedPartitionPlan {
protected Map<String, String> buildTaskProperties() {
Map<String, String> properties = Maps.newHashMap();
properties.put(
- OptimizingInputProperties.TASK_EXECUTOR_FACTORY_IMPL,
- MixedHiveRewriteExecutorFactory.class.getName());
+ TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
MixedHiveRewriteExecutorFactory.class.getName());
return properties;
}
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
index 82097d92c..2d0e4cfc1 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
@@ -27,7 +27,7 @@ import org.apache.amoro.hive.catalog.HiveTableTestHelper;
import org.apache.amoro.hive.optimizing.MixedHiveRewriteExecutorFactory;
import org.apache.amoro.hive.optimizing.plan.MixedHivePartitionPlan;
import org.apache.amoro.hive.table.SupportHive;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
+import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.optimizing.plan.AbstractPartitionPlan;
import org.apache.amoro.properties.HiveTableProperties;
import org.apache.amoro.server.optimizing.OptimizingTestHelpers;
@@ -71,7 +71,7 @@ public class TestHiveUnkeyedPartitionPlan extends
TestUnkeyedPartitionPlan {
@Override
protected void assertTaskProperties(Map<String, String> expect, Map<String,
String> actual) {
actual = Maps.newHashMap(actual);
- String outputDir = actual.remove(OptimizingInputProperties.OUTPUT_DIR);
+ String outputDir = actual.remove(TaskProperties.OUTPUT_DIR);
if (outputDir != null) {
Assert.assertTrue(Long.parseLong(outputDir.split("_")[1]) > 0);
}
@@ -136,8 +136,7 @@ public class TestHiveUnkeyedPartitionPlan extends
TestUnkeyedPartitionPlan {
protected Map<String, String> buildTaskProperties() {
Map<String, String> properties = Maps.newHashMap();
properties.put(
- OptimizingInputProperties.TASK_EXECUTOR_FACTORY_IMPL,
- MixedHiveRewriteExecutorFactory.class.getName());
+ TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
MixedHiveRewriteExecutorFactory.class.getName());
return properties;
}
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
index 2effe4c84..3d67b62c8 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
@@ -24,7 +24,7 @@ import org.apache.amoro.TableTestHelper;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.optimizing.IcebergRewriteExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
+import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.optimizing.plan.AbstractPartitionPlan;
import org.apache.amoro.optimizing.plan.IcebergPartitionPlan;
import org.apache.amoro.optimizing.scan.IcebergTableFileScanHelper;
@@ -95,8 +95,7 @@ public class TestIcebergPartitionPlan extends
TestUnkeyedPartitionPlan {
protected Map<String, String> buildTaskProperties() {
Map<String, String> properties = Maps.newHashMap();
properties.put(
- OptimizingInputProperties.TASK_EXECUTOR_FACTORY_IMPL,
- IcebergRewriteExecutorFactory.class.getName());
+ TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
IcebergRewriteExecutorFactory.class.getName());
return properties;
}
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
index 4a4b38472..1984f31d8 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
@@ -25,8 +25,8 @@ import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.data.ChangeAction;
import org.apache.amoro.optimizing.MixedIcebergRewriteExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.optimizing.RewriteStageTask;
+import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.optimizing.plan.AbstractPartitionPlan;
import org.apache.amoro.optimizing.plan.MixedIcebergPartitionPlan;
import org.apache.amoro.optimizing.scan.KeyedTableFileScanHelper;
@@ -235,7 +235,7 @@ public class TestKeyedPartitionPlan extends
MixedTablePlanTestBase {
protected Map<String, String> buildTaskProperties() {
Map<String, String> properties = Maps.newHashMap();
properties.put(
- OptimizingInputProperties.TASK_EXECUTOR_FACTORY_IMPL,
+ TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
MixedIcebergRewriteExecutorFactory.class.getName());
return properties;
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
index be189c29b..051798b7b 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
@@ -25,9 +25,9 @@ import org.apache.amoro.TableTestHelper;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.optimizing.MixedIcebergRewriteExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.optimizing.RewriteStageTask;
+import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.optimizing.plan.AbstractOptimizingPlanner;
import org.apache.amoro.optimizing.scan.TableFileScanHelper;
import org.apache.amoro.server.utils.IcebergTableUtil;
@@ -113,7 +113,7 @@ public class TestOptimizingPlanner extends
TestOptimizingEvaluator {
protected Map<String, String> buildTaskProperties() {
Map<String, String> properties = Maps.newHashMap();
properties.put(
- OptimizingInputProperties.TASK_EXECUTOR_FACTORY_IMPL,
+ TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
MixedIcebergRewriteExecutorFactory.class.getName());
return properties;
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
index 33d7aa33b..558ace42b 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
@@ -24,7 +24,7 @@ import org.apache.amoro.TableTestHelper;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.optimizing.MixedIcebergRewriteExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
+import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.optimizing.plan.AbstractPartitionPlan;
import org.apache.amoro.optimizing.plan.MixedIcebergPartitionPlan;
import org.apache.amoro.optimizing.scan.TableFileScanHelper;
@@ -106,7 +106,7 @@ public class TestUnkeyedPartitionPlan extends
MixedTablePlanTestBase {
protected Map<String, String> buildTaskProperties() {
Map<String, String> properties = Maps.newHashMap();
properties.put(
- OptimizingInputProperties.TASK_EXECUTOR_FACTORY_IMPL,
+ TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
MixedIcebergRewriteExecutorFactory.class.getName());
return properties;
}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
index f83912d9c..bea9b90f9 100644
--- a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
+++ b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
@@ -39,4 +39,12 @@ public class OptimizerProperties {
public static final String OPTIMIZER_MEMORY_STORAGE_SIZE =
"memory-storage-size";
public static final String MAX_INPUT_FILE_SIZE_PER_THREAD =
"max-input-file-size-per-thread";
public static final Long MAX_INPUT_FILE_SIZE_PER_THREAD_DEFAULT = 512 * 1024
* 1024L; // 512MB
+ public static final String OPTIMIZER_CACHE_ENABLED = "cache-enabled";
+ public static final boolean OPTIMIZER_CACHE_ENABLED_DEFAULT = false;
+ public static final String OPTIMIZER_CACHE_MAX_ENTRY_SIZE =
"cache-max-entry-size";
+ public static final String OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT = "64mb";
+ public static final String OPTIMIZER_CACHE_MAX_TOTAL_SIZE =
"cache-max-total-size";
+ public static final String OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT = "128mb";
+ public static final String OPTIMIZER_CACHE_TIMEOUT = "cache-timeout";
+ public static final String OPTIMIZER_CACHE_TIMEOUT_DEFAULT = "10min";
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
index 830976ea2..a18b2ab81 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
@@ -23,7 +23,6 @@ import org.apache.amoro.io.CloseablePredicate;
import org.apache.amoro.optimizing.RewriteFilesInput;
import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
-import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Multimap;
@@ -31,6 +30,7 @@ import
org.apache.amoro.shade.guava32.com.google.common.collect.Multimaps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.shade.guava32.com.google.common.hash.BloomFilter;
import org.apache.amoro.utils.ContentFiles;
+import org.apache.amoro.utils.DynMethods;
import org.apache.amoro.utils.map.StructLikeBaseMap;
import org.apache.amoro.utils.map.StructLikeCollections;
import org.apache.iceberg.Accessor;
@@ -41,6 +41,8 @@ import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.BaseDeleteLoader;
+import org.apache.iceberg.data.DeleteLoader;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataReader;
@@ -64,11 +66,12 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
@@ -91,6 +94,12 @@ public abstract class CombinedDeleteFilter<T extends
StructLike> {
private static final Accessor<StructLike> POSITION_ACCESSOR =
POS_DELETE_SCHEMA.accessorForField(MetadataColumns.DELETE_FILE_POS.fieldId());
+ // BaseDeleteLoader#getOrReadEqDeletes is a private method, accessed it via
reflection
+ private static final DynMethods.UnboundMethod READ_EQ_DELETES_METHOD =
+ DynMethods.builder("getOrReadEqDeletes")
+ .hiddenImpl(BaseDeleteLoader.class, DeleteFile.class, Schema.class)
+ .build();
+
@VisibleForTesting public static long FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT
= 1000000L;
private final RewriteFilesInput input;
@@ -117,11 +126,14 @@ public abstract class CombinedDeleteFilter<T extends
StructLike> {
private final long dataRecordCnt;
private final boolean filterEqDelete;
+ private final DeleteLoader deleteLoader;
+ private final String deleteGroup;
protected CombinedDeleteFilter(
RewriteFilesInput rewriteFilesInput,
Schema tableSchema,
- StructLikeCollections structLikeCollections) {
+ StructLikeCollections structLikeCollections,
+ String deleteGroup) {
this.input = rewriteFilesInput;
this.dataRecordCnt =
Arrays.stream(rewriteFilesInput.dataFiles()).mapToLong(ContentFile::recordCount).sum();
@@ -157,6 +169,14 @@ public abstract class CombinedDeleteFilter<T extends
StructLike> {
this.structLikeCollections = structLikeCollections;
}
this.filterEqDelete = filterEqDelete();
+ this.deleteGroup = deleteGroup;
+ if (Boolean.parseBoolean(
+ System.getProperty(
+ DeleteCache.DELETE_CACHE_ENABLED,
DeleteCache.DELETE_CACHE_ENABLED_DEFAULT))) {
+ this.deleteLoader = new CachingDeleteLoader(this::getInputFile);
+ } else {
+ this.deleteLoader = new BaseDeleteLoader(this::getInputFile);
+ }
}
/**
@@ -311,44 +331,22 @@ public abstract class CombinedDeleteFilter<T extends
StructLike> {
Schema deleteSchema = deleteSchemaEntry.getValue();
Iterable<DeleteFile> eqDeletes = eqDeleteFilesByDeleteIds.get(ids);
- InternalRecordWrapper internalRecordWrapper =
- new InternalRecordWrapper(deleteSchema.asStruct());
-
- CloseableIterable<RecordWithLsn> deleteRecords =
- CloseableIterable.transform(
- CloseableIterable.concat(
- Iterables.transform(
- eqDeletes,
- s ->
- CloseableIterable.transform(
- openFile(s, deleteSchema),
- r -> new RecordWithLsn(s.dataSequenceNumber(),
r)))),
- RecordWithLsn::recordCopy);
-
StructLikeBaseMap<Long> structLikeMap =
structLikeCollections.createStructLikeMap(deleteSchema.asStruct());
+ structMapCloseable.add(structLikeMap);
- // init map
- try (CloseableIterable<RecordWithLsn> deletes = deleteRecords) {
- Iterator<RecordWithLsn> it =
- getFileIO() == null ? deletes.iterator() :
getFileIO().doAs(deletes::iterator);
- while (it.hasNext()) {
- RecordWithLsn recordWithLsn = it.next();
- StructLike deletePK =
internalRecordWrapper.copyFor(recordWithLsn.getRecord());
- if (filterEqDelete && !bloomFilter.mightContain(deletePK)) {
+ for (DeleteFile deleteFile : eqDeletes) {
+ for (StructLike record : openEqualityDeletes(deleteFile, deleteSchema)) {
+ if (filterEqDelete && !bloomFilter.mightContain(record)) {
continue;
}
- Long lsn = recordWithLsn.getLsn();
- Long old = structLikeMap.get(deletePK);
- if (old == null || old.compareTo(lsn) <= 0) {
- structLikeMap.put(deletePK, lsn);
- }
+ Long lsn = deleteFile.dataSequenceNumber();
+ structLikeMap.merge(record, lsn, Math::max);
}
- } catch (IOException e) {
- throw new RuntimeException(e);
}
- structMapCloseable.add(structLikeMap);
+ InternalRecordWrapper internalRecordWrapper =
+ new InternalRecordWrapper(deleteSchema.asStruct());
return structForDelete -> {
StructProjection deleteProjection =
StructProjection.create(requiredSchema,
deleteSchema).wrap(structForDelete.getPk());
@@ -444,6 +442,10 @@ public abstract class CombinedDeleteFilter<T extends
StructLike> {
return openFile(file, POS_DELETE_SCHEMA);
}
+ private Iterable<StructLike> openEqualityDeletes(DeleteFile file, Schema
deleteSchema) {
+ return READ_EQ_DELETES_METHOD.invoke(deleteLoader, file, deleteSchema);
+ }
+
private CloseableIterable<Record> openFile(ContentFile<?> contentFile,
Schema deleteSchema) {
InputFile input = getInputFile(contentFile);
switch (contentFile.format()) {
@@ -477,26 +479,22 @@ public abstract class CombinedDeleteFilter<T extends
StructLike> {
}
}
- static class RecordWithLsn {
- private final Long lsn;
- private Record record;
-
- public RecordWithLsn(Long lsn, Record record) {
- this.lsn = lsn;
- this.record = record;
- }
+ class CachingDeleteLoader extends BaseDeleteLoader {
+ private final DeleteCache cache;
- public Long getLsn() {
- return lsn;
+ public CachingDeleteLoader(Function<DeleteFile, InputFile> loadInputFile) {
+ super(loadInputFile);
+ this.cache = DeleteCache.getInstance();
}
- public Record getRecord() {
- return record;
+ @Override
+ protected boolean canCache(long size) {
+ return cache != null && size < cache.maxEntrySize();
}
- public RecordWithLsn recordCopy() {
- record = record.copy();
- return this;
+ @Override
+ protected <V> V getOrLoad(String key, Supplier<V> valueSupplier, long
valueSize) {
+ return cache.getOrLoad(CombinedDeleteFilter.this.deleteGroup, key,
valueSupplier, valueSize);
}
}
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/DeleteCache.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/DeleteCache.java
new file mode 100644
index 000000000..33d50da02
--- /dev/null
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/DeleteCache.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.io.reader;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.amoro.config.ConfigHelpers;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.utils.MemorySize;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * A cache for reducing the computation and IO overhead for iceberg delete
files.
+ *
+ * <p>The cache is configured and controlled through JVM system properties. It
supports both limits
+ * on the total cache size and maximum size for individual entries.
Additionally, it implements
+ * automatic eviction of entries after a specified duration of inactivity.
+ *
+ * <p>The cache is accessed and populated via {@link #getOrLoad(String,
String, Supplier, long)}. If
+ * the value is not present in the cache, it is computed using the provided
supplier and stored in
+ * the cache, subject to the defined size constraints. When a key is added, it
must be associated
+ * with a particular group ID. Once the group is no longer needed, it is
recommended to explicitly
+ * invalidate its state by calling {@link #invalidate(String)} instead of
relying on automatic
+ * eviction.
+ *
+ * <p>Note that this class employs the singleton pattern to ensure only one
cache exists per JVM.
+ */
+public class DeleteCache {
+ private static final Logger LOG = LoggerFactory.getLogger(DeleteCache.class);
+ public static final String DELETE_CACHE_ENABLED = "delete-cache-enabled";
+ public static final String DELETE_CACHE_ENABLED_DEFAULT = "false";
+ public static final String DELETE_CACHE_MAX_ENTRY_SIZE =
"delete-cache-max-entry-size";
+ public static final String DELETE_CACHE_MAX_ENTRY_SIZE_DEFAULT = "64mb";
+ public static final String DELETE_CACHE_MAX_TOTAL_SIZE =
"delete-cache-max-total-size";
+ public static final String DELETE_CACHE_MAX_TOTAL_SIZE_DEFAULT = "128mb";
+ public static final String DELETE_CACHE_TIMEOUT = "delete-cache-timeout";
+ public static final String DELETE_CACHE_TIMEOUT_DEFAULT = "10min";
+ private static final int MAX_GROUPS = 5;
+ private static volatile DeleteCache INSTANCE;
+
+ private final Duration timeout;
+ private final long maxEntrySize;
+ private final long maxTotalSize;
+ private final List<String> groups = new CopyOnWriteArrayList<>();
+ private volatile Cache<String, CacheValue> state;
+
+ private DeleteCache(Duration timeout, long maxEntrySize, long maxTotalSize) {
+ this.timeout = timeout;
+ this.maxEntrySize = maxEntrySize;
+ this.maxTotalSize = maxTotalSize;
+ }
+
+ public static DeleteCache getInstance() {
+ if (INSTANCE == null) {
+ long maxEntrySize =
+ MemorySize.parseBytes(
+ System.getProperty(DELETE_CACHE_MAX_ENTRY_SIZE,
DELETE_CACHE_MAX_ENTRY_SIZE_DEFAULT));
+ long maxTotalSize =
+ MemorySize.parseBytes(
+ System.getProperty(DELETE_CACHE_MAX_TOTAL_SIZE,
DELETE_CACHE_MAX_TOTAL_SIZE_DEFAULT));
+ Duration timeout =
+ ConfigHelpers.TimeUtils.parseDuration(
+ System.getProperty(DELETE_CACHE_TIMEOUT,
DELETE_CACHE_TIMEOUT_DEFAULT));
+ initialInstance(timeout, maxEntrySize, maxTotalSize);
+ }
+ return INSTANCE;
+ }
+
+ public static synchronized void initialInstance(
+ Duration timeout, long maxEntrySize, long maxTotalSize) {
+ if (INSTANCE == null) {
+ INSTANCE = new DeleteCache(timeout, maxEntrySize, maxTotalSize);
+ } else {
+ LOG.warn("Cache is already initialed.");
+ }
+ }
+
+ public <V> V getOrLoad(String group, String key, Supplier<V> valueSupplier,
long valueSize) {
+ if (valueSize > maxEntrySize) {
+ LOG.debug("{} exceeds max entry size: {} > {}", key, valueSize,
maxEntrySize);
+ return valueSupplier.get();
+ }
+ if (!groups.contains(group)) {
+ if (groups.size() > MAX_GROUPS) {
+ String removed = groups.remove(groups.size() - 1);
+ if (removed != null) {
+ invalidate(removed);
+ }
+ }
+ groups.add(group);
+ }
+ String internalKey = group + "_" + key;
+ CacheValue value = state().get(internalKey, loadFunc(valueSupplier,
valueSize));
+ Preconditions.checkNotNull(value, "Loaded value must not be null");
+ return value.get();
+ }
+
+ public void invalidate(String group) {
+ if (state != null && group != null) {
+ List<String> internalKeys = findInternalKeys(group);
+ LOG.info("Invalidating {} keys associated with {}", internalKeys.size(),
group);
+ internalKeys.forEach(internalKey -> state.invalidate(internalKey));
+ LOG.info("Current cache stats {}", state.stats());
+ }
+ }
+
+ private List<String> findInternalKeys(String group) {
+ return state.asMap().keySet().stream()
+ .filter(internalKey -> internalKey.startsWith(group))
+ .collect(Collectors.toList());
+ }
+
+ private <V> Function<String, CacheValue> loadFunc(Supplier<V> valueSupplier,
long valueSize) {
+ return key -> {
+ long start = System.currentTimeMillis();
+ V value = valueSupplier.get();
+ long end = System.currentTimeMillis();
+ LOG.debug("Loaded {} with size {} in {} ms", key, valueSize, (end -
start));
+ return new CacheValue(value, valueSize);
+ };
+ }
+
+ private Cache<String, CacheValue> state() {
+ if (state == null) {
+ synchronized (this) {
+ if (state == null) {
+ LOG.info("Initializing cache state");
+ this.state = initState();
+ }
+ }
+ }
+ return state;
+ }
+
+ private Cache<String, CacheValue> initState() {
+ return Caffeine.newBuilder()
+ .expireAfterAccess(timeout)
+ .maximumWeight(maxTotalSize)
+ .weigher((key, value) -> ((CacheValue) value).weight())
+ .recordStats()
+ .removalListener((key, value, cause) -> LOG.debug("Evicted {} ({})",
key, cause))
+ .build();
+ }
+
+ public long maxEntrySize() {
+ return maxEntrySize;
+ }
+
+ static class CacheValue {
+ private final Object value;
+ private final long size;
+
+ CacheValue(Object value, long size) {
+ this.value = value;
+ this.size = size;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <V> V get() {
+ return (V) value;
+ }
+
+ public int weight() {
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
+ }
+}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/GenericCombinedIcebergDataReader.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/GenericCombinedIcebergDataReader.java
index c16d23394..7da19ee9d 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/GenericCombinedIcebergDataReader.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/GenericCombinedIcebergDataReader.java
@@ -84,7 +84,8 @@ public class GenericCombinedIcebergDataReader implements
OptimizingDataReader {
BiFunction<Type, Object, Object> convertConstant,
boolean reuseContainer,
StructLikeCollections structLikeCollections,
- RewriteFilesInput rewriteFilesInput) {
+ RewriteFilesInput rewriteFilesInput,
+ String deleteGroup) {
this.tableSchema = tableSchema;
this.spec = spec;
this.encryptionManager = encryptionManager;
@@ -95,7 +96,7 @@ public class GenericCombinedIcebergDataReader implements
OptimizingDataReader {
this.reuseContainer = reuseContainer;
this.input = rewriteFilesInput;
this.deleteFilter =
- new GenericDeleteFilter(rewriteFilesInput, tableSchema,
structLikeCollections);
+ new GenericDeleteFilter(rewriteFilesInput, tableSchema,
structLikeCollections, deleteGroup);
}
@Override
@@ -301,8 +302,9 @@ public class GenericCombinedIcebergDataReader implements
OptimizingDataReader {
public GenericDeleteFilter(
RewriteFilesInput rewriteFilesInput,
Schema tableSchema,
- StructLikeCollections structLikeCollections) {
- super(rewriteFilesInput, tableSchema, structLikeCollections);
+ StructLikeCollections structLikeCollections,
+ String deleteGroup) {
+ super(rewriteFilesInput, tableSchema, structLikeCollections,
deleteGroup);
}
@Override
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
index 931fadc07..4446619f9 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
@@ -68,6 +68,8 @@ public abstract class AbstractRewriteFilesExecutor
protected final RewriteFilesInput input;
+ protected final Map<String, String> properties;
+
protected MixedTable table;
protected OptimizingDataReader dataReader;
@@ -77,11 +79,12 @@ public abstract class AbstractRewriteFilesExecutor
protected StructLikeCollections structLikeCollections;
public AbstractRewriteFilesExecutor(
- RewriteFilesInput input, MixedTable table, StructLikeCollections
structLikeCollections) {
+ RewriteFilesInput input, MixedTable table, Map<String, String>
properties) {
this.input = input;
this.table = table;
this.io = table.io();
- this.structLikeCollections = structLikeCollections;
+ this.properties = properties;
+ this.structLikeCollections =
TaskProperties.getStructLikeCollections(properties);
dataReader = dataReader();
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
index 10cbe1a26..606be5754 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
@@ -22,7 +22,6 @@ import
org.apache.amoro.io.reader.GenericCombinedIcebergDataReader;
import org.apache.amoro.io.writer.GenericIcebergPartitionedFanoutWriter;
import org.apache.amoro.io.writer.IcebergFanoutPosDeleteWriter;
import org.apache.amoro.table.MixedTable;
-import org.apache.amoro.utils.map.StructLikeCollections;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.TableProperties;
@@ -37,18 +36,19 @@ import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import java.util.Arrays;
+import java.util.Map;
import java.util.UUID;
/** OptimizingExecutor for iceberg format. */
public class IcebergRewriteExecutor extends AbstractRewriteFilesExecutor {
-
public IcebergRewriteExecutor(
- RewriteFilesInput input, MixedTable table, StructLikeCollections
structLikeCollections) {
- super(input, table, structLikeCollections);
+ RewriteFilesInput input, MixedTable table, Map<String, String>
properties) {
+ super(input, table, properties);
}
@Override
protected OptimizingDataReader dataReader() {
+ String processId = TaskProperties.getProcessId(properties);
return new GenericCombinedIcebergDataReader(
io,
table.schema(),
@@ -59,7 +59,8 @@ public class IcebergRewriteExecutor extends
AbstractRewriteFilesExecutor {
IdentityPartitionConverters::convertConstant,
false,
structLikeCollections,
- input);
+ input,
+ processId);
}
@Override
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutorFactory.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutorFactory.java
index 48be84c01..fd60ab9ef 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutorFactory.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutorFactory.java
@@ -32,9 +32,7 @@ public class IcebergRewriteExecutorFactory implements
OptimizingExecutorFactory<
}
@Override
- public OptimizingExecutor createExecutor(RewriteFilesInput input) {
- OptimizingInputProperties optimizingConfig =
OptimizingInputProperties.parse(properties);
- return new IcebergRewriteExecutor(
- input, input.getTable(), optimizingConfig.getStructLikeCollections());
+ public OptimizingExecutor<RewriteFilesOutput>
createExecutor(RewriteFilesInput input) {
+ return new IcebergRewriteExecutor(input, input.getTable(), properties);
}
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutor.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutor.java
index 02861ba67..f274e300a 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutor.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutor.java
@@ -22,7 +22,6 @@ import org.apache.amoro.data.PrimaryKeyedFile;
import org.apache.amoro.io.writer.GenericTaskWriters;
import org.apache.amoro.io.writer.MixedTreeNodePosDeleteWriter;
import org.apache.amoro.table.MixedTable;
-import org.apache.amoro.utils.map.StructLikeCollections;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.io.DeleteWriteResult;
@@ -31,15 +30,13 @@ import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.io.TaskWriter;
import java.util.List;
+import java.util.Map;
public class MixedIcebergRewriteExecutor extends AbstractRewriteFilesExecutor {
public MixedIcebergRewriteExecutor(
- RewriteFilesInput input,
- MixedTable table,
- StructLikeCollections structLikeCollections,
- String outputDir) {
- super(input, table, structLikeCollections);
+ RewriteFilesInput input, MixedTable table, Map<String, String>
properties) {
+ super(input, table, properties);
}
@Override
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutorFactory.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutorFactory.java
index 29b92c9bb..0728f7aee 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutorFactory.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutorFactory.java
@@ -33,11 +33,6 @@ public class MixedIcebergRewriteExecutorFactory
@Override
public OptimizingExecutor<RewriteFilesOutput>
createExecutor(RewriteFilesInput input) {
- OptimizingInputProperties optimizingConfig =
OptimizingInputProperties.parse(properties);
- return new MixedIcebergRewriteExecutor(
- input,
- input.getTable(),
- optimizingConfig.getStructLikeCollections(),
- optimizingConfig.getOutputDir());
+ return new MixedIcebergRewriteExecutor(input, input.getTable(),
properties);
}
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/OptimizingExecutorFactory.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/OptimizingExecutorFactory.java
index c408f970b..e0dfe2f8f 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/OptimizingExecutorFactory.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/OptimizingExecutorFactory.java
@@ -32,5 +32,5 @@ public interface OptimizingExecutorFactory<I extends
TableOptimizing.OptimizingI
void initialize(Map<String, String> properties);
/** Create factory by input */
- OptimizingExecutor createExecutor(I input);
+ OptimizingExecutor<?> createExecutor(I input);
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/OptimizingInputProperties.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/OptimizingInputProperties.java
deleted file mode 100644
index b48875029..000000000
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/OptimizingInputProperties.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.optimizing;
-
-import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
-import org.apache.amoro.utils.map.StructLikeCollections;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class OptimizingInputProperties {
-
- public static final String ENABLE_SPILL_MAP = "enable_spill_map";
-
- public static final String MAX_IN_MEMORY_SIZE_IN_BYTES =
"max_size_in_memory";
-
- public static final String SPILL_MAP_PATH = "spill_map_path";
-
- public static final String OUTPUT_DIR = "output_location";
-
- public static final String MOVE_FILE_TO_HIVE_LOCATION =
"move-files-to-hive-location";
-
- public static final String TASK_EXECUTOR_FACTORY_IMPL =
"task-executor-factory-impl";
-
- private final Map<String, String> properties;
-
- private OptimizingInputProperties(Map<String, String> properties) {
- this.properties = Maps.newHashMap(properties);
- }
-
- public OptimizingInputProperties() {
- properties = new HashMap<>();
- }
-
- public static OptimizingInputProperties parse(Map<String, String>
properties) {
- return new OptimizingInputProperties(properties);
- }
-
- public OptimizingInputProperties enableSpillMap() {
- properties.put(ENABLE_SPILL_MAP, "true");
- return this;
- }
-
- public OptimizingInputProperties setMaxSizeInMemory(long maxSizeInMemory) {
- properties.put(MAX_IN_MEMORY_SIZE_IN_BYTES,
String.valueOf(maxSizeInMemory));
- return this;
- }
-
- public OptimizingInputProperties setSpillMapPath(String path) {
- properties.put(SPILL_MAP_PATH, path);
- return this;
- }
-
- public OptimizingInputProperties setOutputDir(String outputDir) {
- properties.put(OUTPUT_DIR, outputDir);
- return this;
- }
-
- public OptimizingInputProperties setExecutorFactoryImpl(String
executorFactoryImpl) {
- properties.put(TASK_EXECUTOR_FACTORY_IMPL, executorFactoryImpl);
- return this;
- }
-
- public OptimizingInputProperties needMoveFile2HiveLocation() {
- properties.put(MOVE_FILE_TO_HIVE_LOCATION, "true");
- return this;
- }
-
- public StructLikeCollections getStructLikeCollections() {
- String enableSpillMapStr = properties.get(ENABLE_SPILL_MAP);
- boolean enableSpillMap = Boolean.parseBoolean(enableSpillMapStr);
-
- String maxInMemoryStr = properties.get(MAX_IN_MEMORY_SIZE_IN_BYTES);
- Long maxInMemory = maxInMemoryStr == null ? null :
Long.parseLong(maxInMemoryStr);
-
- String spillMapPath = properties.get(SPILL_MAP_PATH);
-
- return new StructLikeCollections(enableSpillMap, maxInMemory,
spillMapPath);
- }
-
- public String getOutputDir() {
- return properties.get(OUTPUT_DIR);
- }
-
- public String getExecutorFactoryImpl() {
- return properties.get(TASK_EXECUTOR_FACTORY_IMPL);
- }
-
- public boolean getMoveFile2HiveLocation() {
- String s = properties.get(MOVE_FILE_TO_HIVE_LOCATION);
- if (StringUtils.isBlank(s)) {
- return false;
- }
- return Boolean.parseBoolean(s);
- }
-
- public Map<String, String> getProperties() {
- return properties;
- }
-}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/TaskProperties.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/TaskProperties.java
new file mode 100644
index 000000000..6a88da6b3
--- /dev/null
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/TaskProperties.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizing;
+
+import org.apache.amoro.OptimizerProperties;
+import org.apache.amoro.utils.PropertyUtil;
+import org.apache.amoro.utils.map.StructLikeCollections;
+
+import java.util.Map;
+
+public class TaskProperties {
+
+ public static final String TASK_EXECUTOR_FACTORY_IMPL =
"task-executor-factory-impl";
+
+ public static final String PROCESS_ID = "process-id";
+ public static final String UNKNOWN_PROCESS_ID = "unknown";
+
+ public static final String EXTEND_DISK_STORAGE =
+ OptimizerProperties.OPTIMIZER_EXTEND_DISK_STORAGE;
+ public static final boolean EXTEND_DISK_STORAGE_DEFAULT = false;
+
+ public static final String MEMORY_STORAGE_SIZE =
+ OptimizerProperties.OPTIMIZER_MEMORY_STORAGE_SIZE;
+ public static final long MEMORY_STORAGE_SIZE_DEFAULT = 512 * 1024 * 1024;
+
+ public static final String DISK_STORAGE_PATH =
OptimizerProperties.OPTIMIZER_DISK_STORAGE_PATH;
+
+ public static final String OUTPUT_DIR = "output_location";
+
+ public static final String MOVE_FILE_TO_HIVE_LOCATION =
"move-files-to-hive-location";
+
+ public static StructLikeCollections getStructLikeCollections(Map<String,
String> properties) {
+ boolean enableSpillMap =
+ PropertyUtil.propertyAsBoolean(
+ properties, EXTEND_DISK_STORAGE, EXTEND_DISK_STORAGE_DEFAULT);
+ long maxInMemory =
+ PropertyUtil.propertyAsLong(properties, MEMORY_STORAGE_SIZE,
MEMORY_STORAGE_SIZE_DEFAULT);
+ String spillMapPath = properties.get(DISK_STORAGE_PATH);
+
+ return new StructLikeCollections(enableSpillMap, maxInMemory,
spillMapPath);
+ }
+
+ public static String getProcessId(Map<String, String> properties) {
+ String processId = properties.get(PROCESS_ID);
+ if (processId == null || processId.trim().isEmpty()) {
+ processId = UNKNOWN_PROCESS_ID;
+ }
+ return processId;
+ }
+}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
index 80634586f..0c0c6a422 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
@@ -21,7 +21,6 @@ package org.apache.amoro.optimizing.plan;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.optimizing.HealthScoreInfo;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteStageTask;
@@ -168,7 +167,7 @@ public abstract class AbstractPartitionPlan implements
PartitionEvaluator {
protected abstract TaskSplitter buildTaskSplitter();
- protected abstract OptimizingInputProperties buildTaskProperties();
+ protected abstract Map<String, String> buildTaskProperties();
protected void markSequence(long sequence) {
if (fromSequence == null || fromSequence > sequence) {
@@ -305,7 +304,7 @@ public abstract class AbstractPartitionPlan implements
PartitionEvaluator {
return rewritePosDataFiles;
}
- public RewriteStageTask buildTask(OptimizingInputProperties properties) {
+ public RewriteStageTask buildTask(Map<String, String> properties) {
Set<ContentFile<?>> readOnlyDeleteFiles = Sets.newHashSet();
Set<ContentFile<?>> rewriteDeleteFiles = Sets.newHashSet();
for (ContentFile<?> deleteFile : deleteFiles) {
@@ -325,8 +324,7 @@ public abstract class AbstractPartitionPlan implements
PartitionEvaluator {
PartitionSpec spec =
MixedTableUtil.getMixedTablePartitionSpecById(tableObject,
partition.first());
String partitionPath = spec.partitionToPath(partition.second());
- return new RewriteStageTask(
- identifier.getId(), partitionPath, input,
properties.getProperties());
+ return new RewriteStageTask(identifier.getId(), partitionPath, input,
properties);
}
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
index 03ba74cca..998d08624 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
@@ -21,12 +21,14 @@ package org.apache.amoro.optimizing.plan;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.optimizing.IcebergRewriteExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
+import org.apache.amoro.optimizing.TaskProperties;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.MixedTable;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.util.Pair;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
public class IcebergPartitionPlan extends AbstractPartitionPlan {
@@ -55,9 +57,10 @@ public class IcebergPartitionPlan extends
AbstractPartitionPlan {
}
@Override
- protected OptimizingInputProperties buildTaskProperties() {
- OptimizingInputProperties properties = new OptimizingInputProperties();
-
properties.setExecutorFactoryImpl(IcebergRewriteExecutorFactory.class.getName());
+ protected Map<String, String> buildTaskProperties() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(
+ TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
IcebergRewriteExecutorFactory.class.getName());
return properties;
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
index ae38b8758..5d77f76ff 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
@@ -24,7 +24,7 @@ import org.apache.amoro.data.DataFileType;
import org.apache.amoro.data.DataTreeNode;
import org.apache.amoro.data.PrimaryKeyedFile;
import org.apache.amoro.optimizing.MixedIcebergRewriteExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
+import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -90,9 +90,11 @@ public class MixedIcebergPartitionPlan extends
AbstractPartitionPlan {
}
@Override
- protected OptimizingInputProperties buildTaskProperties() {
- OptimizingInputProperties properties = new OptimizingInputProperties();
-
properties.setExecutorFactoryImpl(MixedIcebergRewriteExecutorFactory.class.getName());
+ protected Map<String, String> buildTaskProperties() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(
+ TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
+ MixedIcebergRewriteExecutorFactory.class.getName());
return properties;
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleMap.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleMap.java
index 338108dab..ba002752e 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleMap.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/SimpleMap.java
@@ -19,12 +19,27 @@
package org.apache.amoro.utils.map;
import java.io.Closeable;
+import java.util.Objects;
+import java.util.function.BiFunction;
-public interface SimpleMap<T, K> extends Closeable {
+public interface SimpleMap<K, V> extends Closeable {
- void put(T key, K value);
+ void put(K key, V value);
- void delete(T key);
+ void delete(K key);
- K get(T key);
+ V get(K key);
+
+ default V merge(K key, V value, BiFunction<? super V, ? super V, ? extends
V> remappingFunction) {
+ Objects.requireNonNull(remappingFunction);
+ Objects.requireNonNull(value);
+ V oldValue = get(key);
+ V newValue = (oldValue == null) ? value :
remappingFunction.apply(oldValue, value);
+ if (newValue == null) {
+ delete(key);
+ } else {
+ put(key, newValue);
+ }
+ return newValue;
+ }
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeBaseMap.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeBaseMap.java
index 487922709..76f8c74c1 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeBaseMap.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeBaseMap.java
@@ -23,6 +23,7 @@ import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeWrapper;
import java.io.IOException;
+import java.util.function.BiFunction;
public abstract class StructLikeBaseMap<T> implements SimpleMap<StructLike, T>
{
@@ -54,6 +55,12 @@ public abstract class StructLikeBaseMap<T> implements
SimpleMap<StructLike, T> {
wrapper.set(null); // don't hold a reference to the key.
}
+ @Override
+ public T merge(
+ StructLike key, T value, BiFunction<? super T, ? super T, ? extends T>
remappingFunction) {
+ return getInternalMap().merge(structLikeWrapper.copyFor(key), value,
remappingFunction);
+ }
+
@Override
public void close() throws IOException {
getInternalMap().close();
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeMemoryMap.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeMemoryMap.java
index c357d697e..85957a405 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeMemoryMap.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/map/StructLikeMemoryMap.java
@@ -23,6 +23,7 @@ import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeWrapper;
import java.util.HashMap;
+import java.util.function.BiFunction;
/** Map implementation for {@link StructLikeWrapper} as the key based on
memory. */
public class StructLikeMemoryMap<T> extends StructLikeBaseMap<T> {
@@ -62,6 +63,14 @@ public class StructLikeMemoryMap<T> extends
StructLikeBaseMap<T> {
return map.get(key);
}
+ @Override
+ public T merge(
+ StructLikeWrapper key,
+ T value,
+ BiFunction<? super T, ? super T, ? extends T> remappingFunction) {
+ return map.merge(key, value, remappingFunction);
+ }
+
@Override
public void close() {
// do nothing and gc will discard it
diff --git
a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
index 01eaf4180..442efaf74 100644
---
a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
+++
b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java
@@ -23,6 +23,7 @@ import org.apache.amoro.TableFormat;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.TableTestBase;
import org.apache.amoro.io.reader.CombinedDeleteFilter;
+import org.apache.amoro.io.reader.DeleteCache;
import org.apache.amoro.io.reader.GenericCombinedIcebergDataReader;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
@@ -69,19 +70,35 @@ public class TestIcebergCombinedReader extends
TableTestBase {
private RewriteFilesInput filterEqDeleteScanTask;
- public TestIcebergCombinedReader(boolean partitionedTable, FileFormat
fileFormat) {
+ public TestIcebergCombinedReader(
+ boolean partitionedTable, FileFormat fileFormat, boolean
deleteCacheEnabled) {
super(
new BasicCatalogTestHelper(TableFormat.ICEBERG),
new BasicTableTestHelper(false, partitionedTable,
buildTableProperties(fileFormat)));
this.fileFormat = fileFormat;
+ if (deleteCacheEnabled) {
+ System.setProperty(DeleteCache.DELETE_CACHE_ENABLED, "true");
+ } else {
+ System.setProperty(DeleteCache.DELETE_CACHE_ENABLED, "false");
+ }
}
- @Parameterized.Parameters(name = "partitionedTable = {0}, fileFormat = {1}")
+ @Parameterized.Parameters(
+ name = "partitionedTable = {0}, fileFormat = {1}, deleteCacheEnabled =
{2}")
public static Object[][] parameters() {
return new Object[][] {
- {true, FileFormat.PARQUET}, {false, FileFormat.PARQUET},
- {true, FileFormat.AVRO}, {false, FileFormat.AVRO},
- {true, FileFormat.ORC}, {false, FileFormat.ORC}
+ {true, FileFormat.PARQUET, true},
+ {false, FileFormat.PARQUET, true},
+ {true, FileFormat.AVRO, true},
+ {false, FileFormat.AVRO, true},
+ {true, FileFormat.ORC, true},
+ {false, FileFormat.ORC, true},
+ {true, FileFormat.PARQUET, false},
+ {false, FileFormat.PARQUET, false},
+ {true, FileFormat.AVRO, false},
+ {false, FileFormat.AVRO, false},
+ {true, FileFormat.ORC, false},
+ {false, FileFormat.ORC, false}
};
}
@@ -197,7 +214,8 @@ public class TestIcebergCombinedReader extends
TableTestBase {
IdentityPartitionConverters::convertConstant,
false,
null,
- scanTask);
+ scanTask,
+ "");
try (CloseableIterable<Record> records = dataReader.readData()) {
Assert.assertEquals(1, Iterables.size(records));
Record record = Iterables.getFirst(records, null);
@@ -219,7 +237,8 @@ public class TestIcebergCombinedReader extends
TableTestBase {
IdentityPartitionConverters::convertConstant,
false,
null,
- scanTask);
+ scanTask,
+ "");
try (CloseableIterable<Record> records = dataReader.readDeletedData()) {
Assert.assertEquals(2, Iterables.size(records));
Record first = Iterables.getFirst(records, null);
@@ -243,7 +262,8 @@ public class TestIcebergCombinedReader extends
TableTestBase {
IdentityPartitionConverters::convertConstant,
false,
null,
- dataScanTask);
+ dataScanTask,
+ "");
try (CloseableIterable<Record> records = dataReader.readData()) {
Assert.assertEquals(3, Iterables.size(records));
}
@@ -263,7 +283,8 @@ public class TestIcebergCombinedReader extends
TableTestBase {
IdentityPartitionConverters::convertConstant,
false,
null,
- dataScanTask);
+ dataScanTask,
+ "");
try (CloseableIterable<Record> records = dataReader.readDeletedData()) {
Assert.assertEquals(0, Iterables.size(records));
}
@@ -284,7 +305,8 @@ public class TestIcebergCombinedReader extends
TableTestBase {
IdentityPartitionConverters::convertConstant,
false,
null,
- filterEqDeleteScanTask);
+ filterEqDeleteScanTask,
+ "");
Assert.assertTrue(dataReader.getDeleteFilter().isFilterEqDelete());
@@ -363,7 +385,8 @@ public class TestIcebergCombinedReader extends
TableTestBase {
IdentityPartitionConverters::convertConstant,
false,
null,
- task2);
+ task2,
+ "");
try (CloseableIterable<Record> readRecords = dataReader.readData()) {
Assert.assertEquals(1, Iterables.size(readRecords));
}
@@ -437,7 +460,8 @@ public class TestIcebergCombinedReader extends
TableTestBase {
IdentityPartitionConverters::convertConstant,
false,
null,
- task);
+ task,
+ "");
try (CloseableIterable<Record> readRecords = dataReader.readData()) {
Assert.assertEquals(0, Iterables.size(readRecords));
}
@@ -511,7 +535,8 @@ public class TestIcebergCombinedReader extends
TableTestBase {
IdentityPartitionConverters::convertConstant,
false,
null,
- task);
+ task,
+ "");
try (CloseableIterable<Record> readRecords = dataReader.readData()) {
Assert.assertEquals(0, Iterables.size(readRecords));
diff --git
a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReaderVariousTypes.java
b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReaderVariousTypes.java
index 551d1d76e..40af54ae3 100644
---
a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReaderVariousTypes.java
+++
b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReaderVariousTypes.java
@@ -146,7 +146,8 @@ public class TestIcebergCombinedReaderVariousTypes extends
TableTestBase {
IdentityPartitionConverters::convertConstant,
false,
null,
- input)
+ input,
+ "")
.readData();
Assert.assertEquals(Iterables.size(readData), 1);
@@ -197,7 +198,8 @@ public class TestIcebergCombinedReaderVariousTypes extends
TableTestBase {
IdentityPartitionConverters::convertConstant,
false,
null,
- input);
+ input,
+ "");
Assert.assertTrue(reader.getDeleteFilter().isFilterEqDelete());
CloseableIterable<Record> readData = reader.readData();
diff --git
a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
index d66c399a8..37b79036c 100644
---
a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
+++
b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
@@ -27,7 +27,6 @@ import
org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
-import org.apache.amoro.utils.map.StructLikeCollections;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
@@ -168,7 +167,7 @@ public class IcebergRewriteExecutorTest extends
TableTestBase {
@Test
public void readAllData() throws IOException {
IcebergRewriteExecutor executor =
- new IcebergRewriteExecutor(scanTask, getMixedTable(),
StructLikeCollections.DEFAULT);
+ new IcebergRewriteExecutor(scanTask, getMixedTable(),
Collections.emptyMap());
RewriteFilesOutput output = executor.execute();
@@ -212,7 +211,7 @@ public class IcebergRewriteExecutorTest extends
TableTestBase {
@Test
public void readOnlyData() throws IOException {
IcebergRewriteExecutor executor =
- new IcebergRewriteExecutor(dataScanTask, getMixedTable(),
StructLikeCollections.DEFAULT);
+ new IcebergRewriteExecutor(dataScanTask, getMixedTable(),
Collections.emptyMap());
RewriteFilesOutput output = executor.execute();
diff --git
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutor.java
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutor.java
index b37a6f34b..2c32301e4 100644
---
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutor.java
+++
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutor.java
@@ -24,9 +24,9 @@ import
org.apache.amoro.io.writer.MixedTreeNodePosDeleteWriter;
import org.apache.amoro.optimizing.AbstractRewriteFilesExecutor;
import org.apache.amoro.optimizing.OptimizingDataReader;
import org.apache.amoro.optimizing.RewriteFilesInput;
+import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.WriteOperationKind;
-import org.apache.amoro.utils.map.StructLikeCollections;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
@@ -36,6 +36,7 @@ import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.io.TaskWriter;
import java.util.List;
+import java.util.Map;
/** OptimizingExecutor form mixed format */
public class MixedHiveRewriteExecutor extends AbstractRewriteFilesExecutor {
@@ -43,12 +44,9 @@ public class MixedHiveRewriteExecutor extends
AbstractRewriteFilesExecutor {
private final String outputDir;
public MixedHiveRewriteExecutor(
- RewriteFilesInput input,
- MixedTable table,
- StructLikeCollections structLikeCollections,
- String outputDir) {
- super(input, table, structLikeCollections);
- this.outputDir = outputDir;
+ RewriteFilesInput input, MixedTable table, Map<String, String>
properties) {
+ super(input, table, properties);
+ this.outputDir = properties.get(TaskProperties.OUTPUT_DIR);
}
@Override
diff --git
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutorFactory.java
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutorFactory.java
index 9d79c935e..3e94af5d6 100644
---
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutorFactory.java
+++
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutorFactory.java
@@ -20,8 +20,8 @@ package org.apache.amoro.hive.optimizing;
import org.apache.amoro.optimizing.OptimizingExecutor;
import org.apache.amoro.optimizing.OptimizingExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.optimizing.RewriteFilesInput;
+import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import java.util.Map;
@@ -38,12 +38,7 @@ public class MixedHiveRewriteExecutorFactory
}
@Override
- public OptimizingExecutor createExecutor(RewriteFilesInput input) {
- OptimizingInputProperties optimizingConfig =
OptimizingInputProperties.parse(properties);
- return new MixedHiveRewriteExecutor(
- input,
- input.getTable(),
- optimizingConfig.getStructLikeCollections(),
- optimizingConfig.getOutputDir());
+ public OptimizingExecutor<RewriteFilesOutput>
createExecutor(RewriteFilesInput input) {
+ return new MixedHiveRewriteExecutor(input, input.getTable(), properties);
}
}
diff --git
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
index 004843b7f..ac17e565c 100644
---
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
+++
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
@@ -24,7 +24,7 @@ import org.apache.amoro.data.DataFileType;
import org.apache.amoro.data.PrimaryKeyedFile;
import org.apache.amoro.hive.optimizing.MixedHiveRewriteExecutorFactory;
import org.apache.amoro.hive.utils.HiveTableUtil;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
+import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.optimizing.plan.CommonPartitionEvaluator;
import org.apache.amoro.optimizing.plan.MixedIcebergPartitionPlan;
import org.apache.amoro.optimizing.plan.PartitionEvaluator;
@@ -116,13 +116,14 @@ public class MixedHivePartitionPlan extends
MixedIcebergPartitionPlan {
}
@Override
- protected OptimizingInputProperties buildTaskProperties() {
- OptimizingInputProperties properties = super.buildTaskProperties();
-
properties.setExecutorFactoryImpl(MixedHiveRewriteExecutorFactory.class.getName());
+ protected Map<String, String> buildTaskProperties() {
+ Map<String, String> properties = super.buildTaskProperties();
+ properties.put(
+ TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
MixedHiveRewriteExecutorFactory.class.getName());
if (moveFiles2CurrentHiveLocation()) {
- properties.needMoveFile2HiveLocation();
+ properties.put(TaskProperties.MOVE_FILE_TO_HIVE_LOCATION, "true");
} else if (evaluator().isFullNecessary()) {
- properties.setOutputDir(constructCustomHiveSubdirectory());
+ properties.put(TaskProperties.OUTPUT_DIR,
constructCustomHiveSubdirectory());
}
return properties;
}
diff --git
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
index 7fd1c8b26..544d31f24 100644
---
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
+++
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
@@ -79,6 +79,30 @@ public class OptimizerConfig implements Serializable {
@Option(name = "-id", aliases = "--" + OptimizerProperties.RESOURCE_ID,
usage = "Resource id")
private String resourceId;
+ @Option(
+ name = "-ce",
+ aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_ENABLED,
+ usage = "Whether enable cache, default false")
+ private boolean cacheEnabled =
OptimizerProperties.OPTIMIZER_CACHE_ENABLED_DEFAULT;
+
+ @Option(
+ name = "-cmts",
+ aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_MAX_TOTAL_SIZE,
+ usage = "Max total size in cache, default 128MB")
+ private String cacheMaxTotalSize =
OptimizerProperties.OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT;
+
+ @Option(
+ name = "-cmes",
+ aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_MAX_ENTRY_SIZE,
+ usage = "Max entry size in cache, default 64MB")
+ private String cacheMaxEntrySize =
OptimizerProperties.OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT;
+
+ @Option(
+ name = "-ct",
+ aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_TIMEOUT,
+ usage = "Timeout in cache, default 10minutes")
+ private String cacheTimeout =
OptimizerProperties.OPTIMIZER_CACHE_TIMEOUT_DEFAULT;
+
public OptimizerConfig() {}
public OptimizerConfig(String[] args) throws CmdLineException {
@@ -158,6 +182,38 @@ public class OptimizerConfig implements Serializable {
this.resourceId = resourceId;
}
+ public boolean isCacheEnabled() {
+ return cacheEnabled;
+ }
+
+ public void setCacheEnabled(boolean cacheEnabled) {
+ this.cacheEnabled = cacheEnabled;
+ }
+
+ public String getCacheMaxTotalSize() {
+ return cacheMaxTotalSize;
+ }
+
+ public void setCacheMaxTotalSize(String cacheMaxTotalSize) {
+ this.cacheMaxTotalSize = cacheMaxTotalSize;
+ }
+
+ public String getCacheMaxEntrySize() {
+ return cacheMaxEntrySize;
+ }
+
+ public void setCacheMaxEntrySize(String cacheMaxEntrySize) {
+ this.cacheMaxEntrySize = cacheMaxEntrySize;
+ }
+
+ public String getCacheTimeout() {
+ return cacheTimeout;
+ }
+
+ public void setCacheTimeout(String cacheTimeout) {
+ this.cacheTimeout = cacheTimeout;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
@@ -167,9 +223,13 @@ public class OptimizerConfig implements Serializable {
.add("groupName", groupName)
.add("heartBeat", heartBeat)
.add("extendDiskStorage", extendDiskStorage)
- .add("rocksDBBasePath", diskStoragePath)
+ .add("diskStoragePath", diskStoragePath)
.add("memoryStorageSize", memoryStorageSize)
.add("resourceId", resourceId)
+ .add("cacheEnabled", cacheEnabled)
+ .add("cacheMaxTotalSize", cacheMaxTotalSize)
+ .add("cacheMaxEntrySize", cacheMaxEntrySize)
+ .add("cacheTimeout", cacheTimeout)
.toString();
}
}
diff --git
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
index d40e7ec94..6b1dc0a6c 100644
---
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
+++
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
@@ -20,10 +20,12 @@ package org.apache.amoro.optimizer.common;
import org.apache.amoro.api.OptimizingTask;
import org.apache.amoro.api.OptimizingTaskResult;
+import org.apache.amoro.io.reader.DeleteCache;
import org.apache.amoro.optimizing.OptimizingExecutor;
import org.apache.amoro.optimizing.OptimizingExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.optimizing.TableOptimizing;
+import org.apache.amoro.optimizing.TaskProperties;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.thrift.org.apache.thrift.TException;
import org.apache.amoro.utils.ExceptionUtil;
import org.apache.amoro.utils.SerializationUtil;
@@ -32,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
+import java.util.Map;
public class OptimizerExecutor extends AbstractOptimizerOperator {
@@ -147,24 +150,18 @@ public class OptimizerExecutor extends
AbstractOptimizerOperator {
public static OptimizingTaskResult executeTask(
OptimizerConfig config, int threadId, OptimizingTask task, Logger
logger) {
long startTime = System.currentTimeMillis();
- TableOptimizing.OptimizingInput input = null;
+ TableOptimizing.OptimizingInput input;
try {
- OptimizingInputProperties properties =
OptimizingInputProperties.parse(task.getProperties());
+ Map<String, String> taskProperties = fillTaskProperties(config, task);
input = SerializationUtil.simpleDeserialize(task.getTaskInput());
- String executorFactoryImpl = properties.getExecutorFactoryImpl();
+ String executorFactoryImpl =
taskProperties.get(TaskProperties.TASK_EXECUTOR_FACTORY_IMPL);
DynConstructors.Ctor<OptimizingExecutorFactory> ctor =
DynConstructors.builder(OptimizingExecutorFactory.class)
.impl(executorFactoryImpl)
.buildChecked();
OptimizingExecutorFactory factory = ctor.newInstance();
- if (config.isExtendDiskStorage()) {
- properties.enableSpillMap();
- }
- properties.setMaxSizeInMemory(config.getMemoryStorageSize() * 1024 *
1024);
- properties.setSpillMapPath(config.getDiskStoragePath());
- factory.initialize(properties.getProperties());
-
+ factory.initialize(taskProperties);
OptimizingExecutor executor = factory.createExecutor(input);
TableOptimizing.OptimizingOutput output = executor.execute();
ByteBuffer outputByteBuffer = SerializationUtil.simpleSerialize(output);
@@ -190,4 +187,31 @@ public class OptimizerExecutor extends
AbstractOptimizerOperator {
return errorResult;
}
}
+
+ private static Map<String, String> fillTaskProperties(
+ OptimizerConfig config, OptimizingTask task) {
+ if (config.isCacheEnabled()) {
+ System.setProperty(DeleteCache.DELETE_CACHE_ENABLED, "true");
+ }
+ if
(!config.getCacheMaxEntrySize().equals(DeleteCache.DELETE_CACHE_MAX_ENTRY_SIZE_DEFAULT))
{
+ System.setProperty(DeleteCache.DELETE_CACHE_MAX_ENTRY_SIZE,
config.getCacheMaxEntrySize());
+ }
+ if
(!config.getCacheMaxTotalSize().equals(DeleteCache.DELETE_CACHE_MAX_TOTAL_SIZE_DEFAULT))
{
+ System.setProperty(
+ DeleteCache.DELETE_CACHE_MAX_TOTAL_SIZE_DEFAULT,
config.getCacheMaxTotalSize());
+ }
+ if (!config.getCacheTimeout().equals(DeleteCache.DELETE_CACHE_TIMEOUT)) {
+ System.setProperty(DeleteCache.DELETE_CACHE_TIMEOUT,
config.getCacheTimeout());
+ }
+ Map<String, String> properties = Maps.newHashMap(task.getProperties());
+ properties.put(TaskProperties.PROCESS_ID,
String.valueOf(task.getTaskId().getProcessId()));
+ if (config.isExtendDiskStorage()) {
+ properties.put(TaskProperties.EXTEND_DISK_STORAGE, "true");
+ }
+ properties.put(
+ TaskProperties.MEMORY_STORAGE_SIZE,
+ String.valueOf(config.getMemoryStorageSize() * 1024 * 1024));
+ properties.put(TaskProperties.DISK_STORAGE_PATH,
config.getDiskStoragePath());
+ return properties;
+ }
}
diff --git
a/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizerExecutor.java
b/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizerExecutor.java
index 6909ddc02..5bacb7364 100644
---
a/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizerExecutor.java
+++
b/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizerExecutor.java
@@ -25,8 +25,8 @@ import org.apache.amoro.api.OptimizingTaskResult;
import org.apache.amoro.optimizing.BaseOptimizingInput;
import org.apache.amoro.optimizing.OptimizingExecutor;
import org.apache.amoro.optimizing.OptimizingExecutorFactory;
-import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.optimizing.TableOptimizing;
+import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.thrift.org.apache.thrift.TException;
import org.apache.amoro.utils.SerializationUtil;
@@ -129,8 +129,7 @@ public class TestOptimizerExecutor extends
OptimizerTestBase {
optimizingTask.setTaskInput(SerializationUtil.simpleSerialize(this));
Map<String, String> inputProperties = Maps.newHashMap();
inputProperties.put(
- OptimizingInputProperties.TASK_EXECUTOR_FACTORY_IMPL,
- TestOptimizingExecutorFactory.class.getName());
+ TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
TestOptimizingExecutorFactory.class.getName());
optimizingTask.setProperties(inputProperties);
return optimizingTask;
}
diff --git a/docs/admin-guides/managing-optimizers.md
b/docs/admin-guides/managing-optimizers.md
index 61d86ab61..aebea60bd 100644
--- a/docs/admin-guides/managing-optimizers.md
+++ b/docs/admin-guides/managing-optimizers.md
@@ -282,9 +282,13 @@ The optimizer group supports the following properties:
| Property | Container type | Required | Default
|
Description
[...]
|--------------------------------|----------------|----------|---------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| scheduling-policy | All | No | quota
| The
scheduler group scheduling policy, the default value is `quota`, it will be
scheduled according to the quota resources configured for each table, the
larger the table quota is, the more optimizer resources it can take. There is
also a configuration `balanced` that will balance the scheduling of each table,
the longer the table has not [...]
-| memory | Local | Yes | N/A
| The max
memory of JVM for local optimizer, in MBs.
[...]
| max-input-file-size-per-thread | All | No |
536870912(512MB)
| Max input file size per optimize thread.
[...]
| ams-optimizing-uri | All | No |
thrift://{ams.server-expose-host}:{ams.thrift-server.optimizing-service.binding-port}
| Table optimizing service endpoint. This is used when the default service
endpoint is not visitable.
[...]
+| cache-enabled | All | No | false
| Whether
enable cache in optimizer.
[...]
+| cache-max-total-size | All | No | 128mb
| Max
total size in optimier cache.
[...]
+| cache-max-entry-size | All | No | 64mb
| Max
entry size in optimizer cache.
[...]
+| cache-timeout | All | No | 10min
| Timeout
in optimizer cache.
[...]
+| memory | Local | Yes | N/A
| The max
memory of JVM for local optimizer, in MBs.
[...]
| flink-conf.\<key\> | Flink | No | N/A
| Any
flink config options could be overwritten, priority is optimizing-group >
optimizing-container > flink-conf.yaml.
[...]
| spark-conf.\<key\> | Spark | No | N/A
| Any
spark config options could be overwritten, priority is optimizing-group >
optimizing-container > spark-defaults.conf.
[...]
@@ -351,6 +355,10 @@ The description of the relevant parameters is shown in the
following table:
| -eds | No | Whether extend storage to disk, default false.
|
| -dsp | No | Defines the directory where the storage files are
saved, the default temporary-file directory is specified by the system property
`java.io.tmpdir`. On UNIX systems the default value of this property is
typically "/tmp" or "/var/tmp". |
| -msz | No | Memory storage size limit when extending disk
storage(MB), default 512(MB).
|
+| -ce | No | Whether enable cache in optimizer, default false.
|
+| -cmts | No | Max total size in optimier cache, default 128MB.
|
+| -cmes | No | Max entry size in optimizer cache, default 64MB.
|
+| -ct | No | Timeout in optimizer cache, default 10Min.
|
Or you can submit optimizer in your own Spark task development platform or
local Spark environment with the following configuration. The main parameters
include:
@@ -378,3 +386,7 @@ The description of the relevant parameters is shown in the
following table:
| -eds | No | Whether extend storage to disk, default false.
|
| -dsp | No | Defines the directory where the storage files are
saved, the default temporary-file directory is specified by the system property
`java.io.tmpdir`. On UNIX systems the default value of this property is
typically "/tmp" or "/var/tmp". |
| -msz | No | Memory storage size limit when extending disk
storage(MB), default 512(MB).
|
+| -ce | No | Whether enable cache in optimizer, default false.
|
+| -cmts | No | Max total size in optimier cache, default 128MB.
|
+| -cmes | No | Max entry size in optimizer cache, default 64MB.
|
+| -ct | No | Timeout in optimizer cache, default 10Min.
|