This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new ab713b151 [AMORO-3775] Add support for metric-based refresh event
trigger in TableRuntimeRefreshExecutor (#3776)
ab713b151 is described below
commit ab713b1513cb98ea325a3ef3b5ff67dce4a89e85
Author: Jzjsnow <[email protected]>
AuthorDate: Mon Dec 22 16:46:44 2025 +0800
[AMORO-3775] Add support for metric-based refresh event trigger in
TableRuntimeRefreshExecutor (#3776)
[AMORO-3775] Add support for metadata-based refresh event in
TableRuntimeRefreshExecutor
---
.../inline/TableRuntimeRefreshExecutor.java | 15 +-
.../amoro/server/table/DefaultTableRuntime.java | 1 +
.../amoro/server/table/TableConfigurations.java | 12 +-
.../amoro/server/utils/IcebergTableUtil.java | 38 +-
.../plan/TestHiveKeyedPartitionPlan.java | 3 +-
.../plan/TestHiveUnkeyedPartitionPlan.java | 3 +-
.../optimizing/plan/TestIcebergPartitionPlan.java | 3 +-
.../optimizing/plan/TestKeyedPartitionPlan.java | 3 +-
.../optimizing/plan/TestUnkeyedPartitionPlan.java | 3 +-
.../org/apache/amoro/config/OptimizingConfig.java | 38 +-
.../evaluation/MetadataBasedEvaluationEvent.java | 103 ++++
.../MixedAndIcebergTableStatsProvider.java | 62 +++
.../optimizing/evaluation/TableStatsProvider.java | 46 ++
.../plan/AbstractOptimizingEvaluator.java | 5 +-
.../optimizing/plan/AbstractOptimizingPlanner.java | 6 +-
.../optimizing/plan/AbstractPartitionPlan.java | 13 +-
.../optimizing/plan/CommonPartitionEvaluator.java | 57 ++-
.../optimizing/plan/IcebergOptimizerEvaluator.java | 9 +-
.../optimizing/plan/IcebergOptimizingPlanner.java | 9 +-
.../optimizing/plan/IcebergPartitionPlan.java | 6 +-
.../plan/MixedIcebergOptimizingEvaluator.java | 9 +-
.../plan/MixedIcebergOptimizingPlanner.java | 9 +-
.../optimizing/plan/MixedIcebergPartitionPlan.java | 20 +-
.../org/apache/amoro/table/TableProperties.java | 13 +
.../TestMetadataBasedEvaluationEvent.java | 549 +++++++++++++++++++++
.../plan/MixedHiveOptimizingEvaluator.java | 9 +-
.../plan/MixedHiveOptimizingPlanner.java | 9 +-
.../optimizing/plan/MixedHivePartitionPlan.java | 15 +-
28 files changed, 1017 insertions(+), 51 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
index bbc63125b..18071a2cd 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
@@ -20,7 +20,9 @@ package org.apache.amoro.server.scheduler.inline;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableRuntime;
+import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.optimizing.evaluation.MetadataBasedEvaluationEvent;
import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.OptimizingProcess;
@@ -60,8 +62,19 @@ public class TableRuntimeRefreshExecutor extends
PeriodicTableScheduler {
private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime,
MixedTable table) {
// only evaluate pending input when optimizing is enabled and in idle state
- if (tableRuntime.getTableConfiguration().getOptimizingConfig().isEnabled()
+ OptimizingConfig optimizingConfig = tableRuntime.getOptimizingConfig();
+ if (optimizingConfig.isEnabled()
&& tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {
+
+ if (optimizingConfig.isMetadataBasedTriggerEnabled()
+ && !MetadataBasedEvaluationEvent.isEvaluatingNecessary(
+ optimizingConfig, table, tableRuntime.getLastPlanTime())) {
+ logger.debug(
+ "{} optimizing is not necessary due to metadata based trigger",
+ tableRuntime.getTableIdentifier());
+ return;
+ }
+
AbstractOptimizingEvaluator evaluator =
IcebergTableUtil.createOptimizingEvaluator(tableRuntime, table,
maxPendingPartitions);
if (evaluator.isNecessary()) {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index c2532a6bc..c1506218d 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -554,6 +554,7 @@ public class DefaultTableRuntime extends
AbstractTableRuntime
currentChangeSnapshotId);
state.setCurrentChangeSnapshotId(currentChangeSnapshotId);
state.setCurrentSnapshotId(currentSnapshotId);
+ return true;
}
} else {
long currentSnapshotId = doRefreshSnapshots((UnkeyedTable) table);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
index f5b239ff4..c891a24f4 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
@@ -331,7 +331,17 @@ public class TableConfigurations {
PropertyUtil.propertyAsLong(
properties,
TableProperties.SELF_OPTIMIZING_MIN_PLAN_INTERVAL,
- TableProperties.SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT));
+ TableProperties.SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT))
+ .setEvaluationFallbackInterval(
+ PropertyUtil.propertyAsLong(
+ properties,
+ TableProperties.SELF_OPTIMIZING_EVALUATION_FALLBACK_INTERVAL,
+
TableProperties.SELF_OPTIMIZING_EVALUATION_FALLBACK_INTERVAL_DEFAULT))
+ .setEvaluationMseTolerance(
+ PropertyUtil.propertyAsLong(
+ properties,
+
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE,
+
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT));
}
/**
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java
b/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java
index f82dca86c..0b96dddd8 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java
@@ -228,15 +228,37 @@ public class IcebergTableUtil {
OptimizingConfig config = tableRuntime.getOptimizingConfig();
long lastMinor = tableRuntime.getLastMinorOptimizingTime();
long lastFull = tableRuntime.getLastFullOptimizingTime();
+ long lastMajor = tableRuntime.getLastMajorOptimizingTime();
if (TableFormat.ICEBERG.equals(table.format())) {
return new IcebergOptimizerEvaluator(
- identifier, config, table, snapshot, maxPendingPartitions,
lastMinor, lastFull);
+ identifier,
+ config,
+ table,
+ snapshot,
+ maxPendingPartitions,
+ lastMinor,
+ lastFull,
+ lastMajor);
} else if (TableFormat.MIXED_ICEBERG.equals(table.format())) {
return new MixedIcebergOptimizingEvaluator(
- identifier, config, table, snapshot, maxPendingPartitions,
lastMinor, lastFull);
+ identifier,
+ config,
+ table,
+ snapshot,
+ maxPendingPartitions,
+ lastMinor,
+ lastFull,
+ lastMajor);
} else if (TableFormat.MIXED_HIVE.equals(table.format())) {
return new MixedHiveOptimizingEvaluator(
- identifier, config, table, snapshot, maxPendingPartitions,
lastMinor, lastFull);
+ identifier,
+ config,
+ table,
+ snapshot,
+ maxPendingPartitions,
+ lastMinor,
+ lastFull,
+ lastMajor);
}
throw new IllegalStateException("Un-supported table-format:" +
table.format().toString());
}
@@ -267,6 +289,7 @@ public class IcebergTableUtil {
OptimizingConfig config = tableRuntime.getOptimizingConfig();
long lastMinor = tableRuntime.getLastMinorOptimizingTime();
long lastFull = tableRuntime.getLastFullOptimizingTime();
+ long lastMajor = tableRuntime.getLastMajorOptimizingTime();
TableSnapshot snapshot = IcebergTableUtil.getSnapshot(table, tableRuntime);
if (TableFormat.ICEBERG.equals(table.format())) {
return new IcebergOptimizingPlanner(
@@ -279,7 +302,8 @@ public class IcebergTableUtil {
availableCore,
maxInputSizePerThread,
lastMinor,
- lastFull);
+ lastFull,
+ lastMajor);
} else if (TableFormat.MIXED_ICEBERG.equals(table.format())) {
return new MixedIcebergOptimizingPlanner(
identifier,
@@ -291,7 +315,8 @@ public class IcebergTableUtil {
availableCore,
maxInputSizePerThread,
lastMinor,
- lastFull);
+ lastFull,
+ lastMajor);
} else if (TableFormat.MIXED_HIVE.equals(table.format())) {
return new MixedHiveOptimizingPlanner(
identifier,
@@ -303,7 +328,8 @@ public class IcebergTableUtil {
availableCore,
maxInputSizePerThread,
lastMinor,
- lastFull);
+ lastFull,
+ lastMajor);
}
throw new IllegalStateException("Unsupported table format:" +
table.format().toString());
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
index 387b18231..b6eb39a10 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
@@ -91,7 +91,8 @@ public class TestHiveKeyedPartitionPlan extends
TestKeyedPartitionPlan {
hiveLocation,
System.currentTimeMillis(),
getTableRuntime().getLastMinorOptimizingTime(),
- getTableRuntime().getLastFullOptimizingTime());
+ getTableRuntime().getLastFullOptimizingTime(),
+ getTableRuntime().getLastMajorOptimizingTime());
}
@Test
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
index 2d0e4cfc1..cede17360 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
@@ -90,7 +90,8 @@ public class TestHiveUnkeyedPartitionPlan extends
TestUnkeyedPartitionPlan {
hiveLocation,
System.currentTimeMillis(),
getTableRuntime().getLastMinorOptimizingTime(),
- getTableRuntime().getLastFullOptimizingTime());
+ getTableRuntime().getLastFullOptimizingTime(),
+ getTableRuntime().getLastMajorOptimizingTime());
}
@Test
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
index 3d67b62c8..7ad399709 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
@@ -83,7 +83,8 @@ public class TestIcebergPartitionPlan extends
TestUnkeyedPartitionPlan {
getPartition(),
System.currentTimeMillis(),
tableRuntime.getLastMinorOptimizingTime(),
- tableRuntime.getLastFullOptimizingTime());
+ tableRuntime.getLastFullOptimizingTime(),
+ tableRuntime.getLastMajorOptimizingTime());
}
@Override
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
index 1984f31d8..d80a1e57e 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
@@ -222,7 +222,8 @@ public class TestKeyedPartitionPlan extends
MixedTablePlanTestBase {
getPartition(),
System.currentTimeMillis(),
getTableRuntime().getLastMinorOptimizingTime(),
- getTableRuntime().getLastFullOptimizingTime());
+ getTableRuntime().getLastFullOptimizingTime(),
+ getTableRuntime().getLastMajorOptimizingTime());
}
@Override
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
index 558ace42b..0ad80622c 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
@@ -88,7 +88,8 @@ public class TestUnkeyedPartitionPlan extends
MixedTablePlanTestBase {
getPartition(),
System.currentTimeMillis(),
getTableRuntime().getLastMinorOptimizingTime(),
- getTableRuntime().getLastFullOptimizingTime());
+ getTableRuntime().getLastFullOptimizingTime(),
+ getTableRuntime().getLastMajorOptimizingTime());
}
@Override
diff --git
a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
index c5f8ef77e..0c743ac6b 100644
--- a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
+++ b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
@@ -91,6 +91,12 @@ public class OptimizingConfig {
// self-optimizing.min-plan-interval
private long minPlanInterval;
+ // self-optimizing.evaluation.file-size.mse-tolerance
+ private long evaluationMseTolerance;
+
+ // self-optimizing.evaluation.fallback-interval
+ private long evaluationFallbackInterval;
+
public OptimizingConfig() {}
public boolean isEnabled() {
@@ -290,6 +296,28 @@ public class OptimizingConfig {
return this;
}
+ public long getEvaluationFallbackInterval() {
+ return evaluationFallbackInterval;
+ }
+
+ public OptimizingConfig setEvaluationFallbackInterval(long
evaluationFallbackInterval) {
+ this.evaluationFallbackInterval = evaluationFallbackInterval;
+ return this;
+ }
+
+ public boolean isMetadataBasedTriggerEnabled() {
+ return evaluationFallbackInterval >= 0;
+ }
+
+ public long getEvaluationMseTolerance() {
+ return evaluationMseTolerance;
+ }
+
+ public OptimizingConfig setEvaluationMseTolerance(long
evaluationMseTolerance) {
+ this.evaluationMseTolerance = evaluationMseTolerance;
+ return this;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -320,7 +348,9 @@ public class OptimizingConfig {
&& baseRefreshInterval == that.baseRefreshInterval
&& hiveRefreshInterval == that.hiveRefreshInterval
&& Objects.equal(optimizerGroup, that.optimizerGroup)
- && Objects.equal(minPlanInterval, that.minPlanInterval);
+ && Objects.equal(minPlanInterval, that.minPlanInterval)
+ && Objects.equal(evaluationMseTolerance, that.evaluationMseTolerance)
+ && Objects.equal(evaluationFallbackInterval,
that.evaluationFallbackInterval);
}
@Override
@@ -347,7 +377,9 @@ public class OptimizingConfig {
baseHashBucket,
baseRefreshInterval,
hiveRefreshInterval,
- minPlanInterval);
+ minPlanInterval,
+ evaluationMseTolerance,
+ evaluationFallbackInterval);
}
@Override
@@ -373,6 +405,8 @@ public class OptimizingConfig {
.add("baseHashBucket", baseHashBucket)
.add("baseRefreshInterval", baseRefreshInterval)
.add("hiveRefreshInterval", hiveRefreshInterval)
+ .add("evaluationMseTolerance", evaluationMseTolerance)
+ .add("evaluationFallbackInterval", evaluationFallbackInterval)
.toString();
}
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/MetadataBasedEvaluationEvent.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/MetadataBasedEvaluationEvent.java
new file mode 100644
index 000000000..d22d92bc9
--- /dev/null
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/MetadataBasedEvaluationEvent.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizing.evaluation;
+
+import static org.apache.amoro.TableFormat.ICEBERG;
+import static org.apache.amoro.TableFormat.MIXED_HIVE;
+import static org.apache.amoro.TableFormat.MIXED_ICEBERG;
+
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.config.OptimizingConfig;
+import org.apache.amoro.table.MixedTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetadataBasedEvaluationEvent {
+ private static final Logger logger =
LoggerFactory.getLogger(MetadataBasedEvaluationEvent.class);
+
+ public static boolean isReachFallbackInterval(OptimizingConfig config, long
lastPlanTime) {
+ long fallbackInterval = config.getEvaluationFallbackInterval();
+ return fallbackInterval >= 0 && System.currentTimeMillis() - lastPlanTime
>= fallbackInterval;
+ }
+
+ public static boolean isEvaluatingNecessary(
+ OptimizingConfig config, MixedTable table, long lastPlanTime) {
+ // Step 1: Perform periodic scheduling according to the fallback interval
to avoid false
+ // positives or
+ // missed triggers based on metadata metric-driven evaluation
+ if (isReachFallbackInterval(config, lastPlanTime)) {
+ logger.debug("Maximum interval for evaluating table {} has reached.",
table.id());
+ return true;
+ }
+
+ TableStatsProvider.BasicFileStats basicStats = getTableStats(table);
+ // Currently only supports ICEBERG and mixed formats (MIXED_ICEBERG,
MIXED_HIVE).
+ // For other formats this will return null.
+ if (basicStats != null) {
+ // Step 2: Empty table should skip evaluating pending input
+ // TableStatsProvider provider = getTableStatsProvider(table);
+ if (basicStats.dataFileCnt == 0) {
+ logger.debug("Table {} contains no data files, skip evaluating pending
input.", table.id());
+ return false;
+ }
+
+ // Step 3: If the condition `delete file=0 && avg file size > target
size * ratio` is
+ // satisfied,
+ // then evaluating the pending input is considered unnecessary and will
be skipped.
+ long minTargetSize = (long) (config.getTargetSize() *
config.getMinTargetSizeRatio());
+ double avgFileSize =
+ basicStats.dataFileCnt > 0
+ ? (double) basicStats.totalFileSize / basicStats.dataFileCnt
+ : 0;
+
+ if (basicStats.deleteFileCnt == 0 && avgFileSize > minTargetSize) {
+ logger.debug(
+ "Table {} contains only appended data and no deleted files
(average file size: {}), skip evaluating pending input.",
+ table.id(),
+ avgFileSize);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static TableStatsProvider.BasicFileStats getTableStats(MixedTable
table) {
+ TableFormat format = table.format();
+ if (format.equals(ICEBERG) || format.equals(MIXED_ICEBERG) ||
format.equals(MIXED_HIVE)) {
+ TableStatsProvider provider = MixedAndIcebergTableStatsProvider.INSTANCE;
+ return provider.collect(table);
+ } else {
+ // Unsupported table format for metadata-based evaluation
+ // PAIMON and HUDI formats are currently not supported because obtaining
basic statistics
+ // requires traversing manifest files, which would add extra planning
overhead.
+ // In the future, if there is a more efficient way to get these stats,
+ // we can extend TableStatsProvider to support these formats.
+ return null;
+ }
+ }
+
+ public static boolean isPartitionPendingNecessary(
+ OptimizingConfig config, long partitionSquaredErrorSum, long
partitionFileCount) {
+ long mseTolerance = config.getEvaluationMseTolerance();
+ return partitionFileCount > 1
+ && (mseTolerance == 0
+ || (double) partitionSquaredErrorSum / partitionFileCount
+ >= (double) mseTolerance * mseTolerance);
+ }
+}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/MixedAndIcebergTableStatsProvider.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/MixedAndIcebergTableStatsProvider.java
new file mode 100644
index 000000000..8ec04e12b
--- /dev/null
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/MixedAndIcebergTableStatsProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizing.evaluation;
+
+import org.apache.amoro.table.MixedTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.util.PropertyUtil;
+
+import java.util.Map;
+
+public class MixedAndIcebergTableStatsProvider extends TableStatsProvider {
+
+ public static final MixedAndIcebergTableStatsProvider INSTANCE =
+ new MixedAndIcebergTableStatsProvider();
+
+ private MixedAndIcebergTableStatsProvider() {}
+
+ @Override
+ public BasicFileStats collect(MixedTable table) {
+ BasicFileStats stats = new BasicFileStats();
+ if (table.isUnkeyedTable()) {
+ acceptSnapshotIfPresent(stats, table.asUnkeyedTable().currentSnapshot());
+ } else {
+ acceptSnapshotIfPresent(stats,
table.asKeyedTable().baseTable().currentSnapshot());
+ acceptSnapshotIfPresent(stats,
table.asKeyedTable().changeTable().currentSnapshot());
+ }
+ return stats;
+ }
+
+ private void acceptSnapshotIfPresent(BasicFileStats stats, Snapshot
snapshot) {
+ if (snapshot != null) {
+ stats.accept(snapshot.summary());
+ }
+ }
+
+ public static class BasicFileStats extends TableStatsProvider.BasicFileStats
{
+ public void accept(Map<String, String> summary) {
+ deleteFileCnt +=
+ PropertyUtil.propertyAsInt(summary,
SnapshotSummary.TOTAL_DELETE_FILES_PROP, 0);
+ dataFileCnt += PropertyUtil.propertyAsInt(summary,
SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+ totalFileSize +=
+ PropertyUtil.propertyAsLong(summary,
SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);
+ }
+ }
+}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/TableStatsProvider.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/TableStatsProvider.java
new file mode 100644
index 000000000..99411eff2
--- /dev/null
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/evaluation/TableStatsProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizing.evaluation;
+
+import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
+import org.apache.amoro.table.MixedTable;
+
+public abstract class TableStatsProvider {
+ /** Collect basic file statistics for the given table. */
+ public abstract BasicFileStats collect(MixedTable table);
+
+ /** Statistics representation of basic file metrics. */
+ public static class BasicFileStats {
+ /** Count of delete files. */
+ int deleteFileCnt = 0;
+ /** Count of data files. */
+ int dataFileCnt = 0;
+ /** Total size of all files in bytes. */
+ long totalFileSize = 0;
+
+ @VisibleForTesting
+ BasicFileStats() {}
+
+ public void accept(int dataFileCnt, int deleteFileCnt, long totalFileSize)
{
+ this.dataFileCnt += dataFileCnt;
+ this.deleteFileCnt += deleteFileCnt;
+ this.totalFileSize += totalFileSize;
+ }
+ }
+}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java
index ed108c94c..5f9061987 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java
@@ -61,6 +61,7 @@ public abstract class AbstractOptimizingEvaluator {
protected final TableSnapshot currentSnapshot;
protected final long lastFullOptimizingTime;
protected final long lastMinorOptimizingTime;
+ protected final long lastMajorOptimizingTime;
protected final int maxPendingPartitions;
protected boolean isInitialized = false;
protected Map<String, PartitionEvaluator> needOptimizingPlanMap =
Maps.newHashMap();
@@ -73,7 +74,8 @@ public abstract class AbstractOptimizingEvaluator {
TableSnapshot currentSnapshot,
int maxPendingPartitions,
long lastMinorOptimizingTime,
- long lastFullOptimizingTime) {
+ long lastFullOptimizingTime,
+ long lastMajorOptimizingTime) {
this.identifier = identifier;
this.config = config;
this.mixedTable = table;
@@ -81,6 +83,7 @@ public abstract class AbstractOptimizingEvaluator {
this.maxPendingPartitions = maxPendingPartitions;
this.lastFullOptimizingTime = lastFullOptimizingTime;
this.lastMinorOptimizingTime = lastMinorOptimizingTime;
+ this.lastMajorOptimizingTime = lastMajorOptimizingTime;
}
protected void initEvaluator() {
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
index 69380c2c9..05bf82fab 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
@@ -65,7 +65,8 @@ public abstract class AbstractOptimizingPlanner extends
AbstractOptimizingEvalua
double availableCore,
long maxInputSizePerThread,
long lastMinorOptimizingTime,
- long lastFullOptimizingTime) {
+ long lastFullOptimizingTime,
+ long lastMajorOptimizingTime) {
super(
identifier,
config,
@@ -73,7 +74,8 @@ public abstract class AbstractOptimizingPlanner extends
AbstractOptimizingEvalua
snapshot,
Integer.MAX_VALUE,
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
this.partitionFilter = partitionFilter;
this.availableCore = availableCore;
this.planTime = System.currentTimeMillis();
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
index 0c0c6a422..313076339 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java
@@ -51,6 +51,7 @@ public abstract class AbstractPartitionPlan implements
PartitionEvaluator {
protected final ServerTableIdentifier identifier;
protected final long lastMinorOptimizingTime;
protected final long lastFullOptimizingTime;
+ protected final long lastMajorOptimizingTime;
private CommonPartitionEvaluator evaluator;
private TaskSplitter taskSplitter;
protected MixedTable tableObject;
@@ -82,7 +83,8 @@ public abstract class AbstractPartitionPlan implements
PartitionEvaluator {
Pair<Integer, StructLike> partition,
long planTime,
long lastMinorOptimizingTime,
- long lastFullOptimizingTime) {
+ long lastFullOptimizingTime,
+ long lastMajorOptimizingTime) {
this.identifier = identifier;
this.partition = partition;
this.tableObject = table;
@@ -90,6 +92,7 @@ public abstract class AbstractPartitionPlan implements
PartitionEvaluator {
this.planTime = planTime;
this.lastMinorOptimizingTime = lastMinorOptimizingTime;
this.lastFullOptimizingTime = lastFullOptimizingTime;
+ this.lastMajorOptimizingTime = lastMajorOptimizingTime;
}
@Override
@@ -106,7 +109,13 @@ public abstract class AbstractPartitionPlan implements
PartitionEvaluator {
protected CommonPartitionEvaluator buildEvaluator() {
return new CommonPartitionEvaluator(
- identifier, config, partition, planTime, lastMinorOptimizingTime,
lastFullOptimizingTime);
+ identifier,
+ config,
+ partition,
+ planTime,
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
}
@Override
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/CommonPartitionEvaluator.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/CommonPartitionEvaluator.java
index 7f1f44b19..2cd11cbd5 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/CommonPartitionEvaluator.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/CommonPartitionEvaluator.java
@@ -22,6 +22,7 @@ import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.optimizing.HealthScoreInfo;
import org.apache.amoro.optimizing.OptimizingType;
+import org.apache.amoro.optimizing.evaluation.MetadataBasedEvaluationEvent;
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -47,6 +48,7 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
protected final OptimizingConfig config;
protected final long lastFullOptimizingTime;
protected final long lastMinorOptimizingTime;
+ protected final long lastMajorOptimizingTime;
protected final long fragmentSize;
protected final long minTargetSize;
protected final long planTime;
@@ -80,6 +82,9 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
protected long posDeleteFileSize = 0L;
protected long posDeleteFileRecords = 0L;
+ // mse stat
+ protected long fileSizeSquaredErrorSum = 0L;
+
private long cost = -1;
private Boolean necessary = null;
private OptimizingType optimizingType = null;
@@ -91,7 +96,8 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
Pair<Integer, StructLike> partition,
long planTime,
long lastMinorOptimizingTime,
- long lastFullOptimizingTime) {
+ long lastFullOptimizingTime,
+ long lastMajorOptimizingTime) {
this.identifier = identifier;
this.config = config;
this.partition = partition;
@@ -104,6 +110,7 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
}
this.planTime = planTime;
this.lastMinorOptimizingTime = lastMinorOptimizingTime;
+ this.lastMajorOptimizingTime = lastMajorOptimizingTime;
this.lastFullOptimizingTime = lastFullOptimizingTime;
this.reachFullInterval =
config.getFullTriggerInterval() >= 0
@@ -128,6 +135,11 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
if (!config.isEnabled()) {
return false;
}
+ if (config.isMetadataBasedTriggerEnabled() &&
config.getEvaluationMseTolerance() > 0) {
+ // Update the file size squared error sum
+ updateFileSizeSquaredErrorSum(dataFile);
+ }
+
if (isFragmentFile(dataFile)) {
return addFragmentFile(dataFile, deletes);
} else if (isUndersizedSegmentFile(dataFile)) {
@@ -208,6 +220,31 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
return false;
}
+ private void updateFileSizeSquaredErrorSum(DataFile dataFile) {
+ // Only accumulate squared error for files smaller than `minTargetSize`
+ // For files larger than or equal to minTargetSize, diffSize will be 0,
contributing nothing to
+ // the error sum
+ long diffSize = minTargetSize - Math.min(dataFile.fileSizeInBytes(),
minTargetSize);
+ if (diffSize <= 0) {
+ return;
+ }
+
+ // Prevent overflow for diffSize * diffSize by saturating to Long.MAX_VALUE
+ final long prod;
+ if (diffSize > Long.MAX_VALUE / diffSize) {
+ prod = Long.MAX_VALUE;
+ } else {
+ prod = diffSize * diffSize;
+ }
+
+ // Prevent overflow when adding to fileSizeSquaredErrorSum (saturate to
Long.MAX_VALUE)
+ if (fileSizeSquaredErrorSum > Long.MAX_VALUE - prod) {
+ fileSizeSquaredErrorSum = Long.MAX_VALUE;
+ } else {
+ fileSizeSquaredErrorSum += prod;
+ }
+ }
+
protected boolean fileShouldFullOptimizing(DataFile dataFile,
List<ContentFile<?>> deleteFiles) {
if (config.isFullRewriteAllFiles()) {
return true;
@@ -288,6 +325,19 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
@Override
public boolean isNecessary() {
if (necessary == null) {
+ long lastPlanTime =
+ Math.max(
+ Math.max(lastMinorOptimizingTime, lastMajorOptimizingTime),
lastFullOptimizingTime);
+ if (config.isMetadataBasedTriggerEnabled()
+ && !MetadataBasedEvaluationEvent.isReachFallbackInterval(config,
lastPlanTime)) {
+ long fileCount = fragmentFileCount + undersizedSegmentFileCount;
+ if (!MetadataBasedEvaluationEvent.isPartitionPendingNecessary(
+ config, fileSizeSquaredErrorSum, fileCount)) {
+ LOG.debug("{} not necessary due to metadata-based evaluation, {}",
name(), this);
+ necessary = false;
+ return false;
+ }
+ }
if (isFullOptimizing()) {
necessary = isFullNecessary();
} else {
@@ -509,6 +559,10 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
return posDeleteFileRecords;
}
+ public long getFileSizeSquaredErrorSum() {
+ return fileSizeSquaredErrorSum;
+ }
+
public static class Weight implements PartitionEvaluator.Weight {
private final long cost;
@@ -532,6 +586,7 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
.add("undersizedSegmentSize", minTargetSize)
.add("planTime", planTime)
.add("lastMinorOptimizeTime", lastMinorOptimizingTime)
+ .add("lastMajorOptimizeTime", lastMajorOptimizingTime)
.add("lastFullOptimizeTime", lastFullOptimizingTime)
.add("fragmentFileCount", fragmentFileCount)
.add("fragmentFileSize", fragmentFileSize)
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizerEvaluator.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizerEvaluator.java
index 0f11a020d..cf6f5aec8 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizerEvaluator.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizerEvaluator.java
@@ -33,7 +33,8 @@ public class IcebergOptimizerEvaluator extends
AbstractOptimizingEvaluator {
TableSnapshot currentSnapshot,
int maxPendingPartitions,
long lastMinorOptimizingTime,
- long lastFullOptimizingTime) {
+ long lastFullOptimizingTime,
+ long lastMajorOptimizingTime) {
super(
identifier,
config,
@@ -41,7 +42,8 @@ public class IcebergOptimizerEvaluator extends
AbstractOptimizingEvaluator {
currentSnapshot,
maxPendingPartitions,
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
}
@Override
@@ -52,6 +54,7 @@ public class IcebergOptimizerEvaluator extends
AbstractOptimizingEvaluator {
partition,
System.currentTimeMillis(),
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
}
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizingPlanner.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizingPlanner.java
index 21833283f..aed5ec65d 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizingPlanner.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergOptimizingPlanner.java
@@ -37,7 +37,8 @@ public class IcebergOptimizingPlanner extends
AbstractOptimizingPlanner {
double availableCore,
long maxInputSizePerThread,
long lastMinorOptimizingTime,
- long lastFullOptimizingTime) {
+ long lastFullOptimizingTime,
+ long lastMajorOptimizingTime) {
super(
identifier,
config,
@@ -48,7 +49,8 @@ public class IcebergOptimizingPlanner extends
AbstractOptimizingPlanner {
availableCore,
maxInputSizePerThread,
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
}
@Override
@@ -60,6 +62,7 @@ public class IcebergOptimizingPlanner extends
AbstractOptimizingPlanner {
partition,
planTime,
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
}
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
index 998d08624..f1eb92104 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/IcebergPartitionPlan.java
@@ -40,7 +40,8 @@ public class IcebergPartitionPlan extends
AbstractPartitionPlan {
Pair<Integer, StructLike> partition,
long planTime,
long lastMinorOptimizingTime,
- long lastFullOptimizingTime) {
+ long lastFullOptimizingTime,
+ long lastMajorOptimizingTime) {
super(
identifier,
table,
@@ -48,7 +49,8 @@ public class IcebergPartitionPlan extends
AbstractPartitionPlan {
partition,
planTime,
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
}
@Override
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingEvaluator.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingEvaluator.java
index 9f6bd279d..0c33e0b9b 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingEvaluator.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingEvaluator.java
@@ -36,7 +36,8 @@ public class MixedIcebergOptimizingEvaluator extends
AbstractOptimizingEvaluator
TableSnapshot currentSnapshot,
int maxPendingPartitions,
long lastMinorOptimizingTime,
- long lastFullOptimizingTime) {
+ long lastFullOptimizingTime,
+ long lastMajorOptimizingTime) {
super(
identifier,
config,
@@ -44,7 +45,8 @@ public class MixedIcebergOptimizingEvaluator extends
AbstractOptimizingEvaluator
currentSnapshot,
maxPendingPartitions,
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
}
protected Map<String, String> partitionProperties(Pair<Integer, StructLike>
partition) {
@@ -62,6 +64,7 @@ public class MixedIcebergOptimizingEvaluator extends
AbstractOptimizingEvaluator
System.currentTimeMillis(),
mixedTable.isKeyedTable(),
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
}
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingPlanner.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingPlanner.java
index c33072262..8e18b2f68 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingPlanner.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergOptimizingPlanner.java
@@ -37,7 +37,8 @@ public class MixedIcebergOptimizingPlanner extends
AbstractOptimizingPlanner {
double availableCore,
long maxInputSizePerThread,
long lastMinorOptimizingTime,
- long lastFullOptimizingTime) {
+ long lastFullOptimizingTime,
+ long lastMajorOptimizingTime) {
super(
identifier,
config,
@@ -48,7 +49,8 @@ public class MixedIcebergOptimizingPlanner extends
AbstractOptimizingPlanner {
availableCore,
maxInputSizePerThread,
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
}
@Override
@@ -60,6 +62,7 @@ public class MixedIcebergOptimizingPlanner extends
AbstractOptimizingPlanner {
partition,
planTime,
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
}
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
index 5d77f76ff..ae116be6e 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/MixedIcebergPartitionPlan.java
@@ -56,7 +56,8 @@ public class MixedIcebergPartitionPlan extends
AbstractPartitionPlan {
Pair<Integer, StructLike> partition,
long planTime,
long lastMinorOptimizingTime,
- long lastFullOptimizingTime) {
+ long lastFullOptimizingTime,
+ long lastMajorOptimizingTime) {
super(
identifier,
table,
@@ -64,7 +65,8 @@ public class MixedIcebergPartitionPlan extends
AbstractPartitionPlan {
partition,
planTime,
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
this.partitionProperties = TablePropertyUtil.getPartitionProperties(table,
partition.second());
}
@@ -121,7 +123,8 @@ public class MixedIcebergPartitionPlan extends
AbstractPartitionPlan {
planTime,
isKeyedTable(),
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
}
public static class MixedIcebergPartitionEvaluator extends
CommonPartitionEvaluator {
@@ -137,9 +140,16 @@ public class MixedIcebergPartitionPlan extends
AbstractPartitionPlan {
long planTime,
boolean keyedTable,
long lastMinorOptimizingTime,
- long lastFullOptimizingTime) {
+ long lastFullOptimizingTime,
+ long lastMajorOptimizingTime) {
super(
- identifier, config, partition, planTime, lastMinorOptimizingTime,
lastFullOptimizingTime);
+ identifier,
+ config,
+ partition,
+ planTime,
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
this.keyedTable = keyedTable;
String optimizedTime =
partitionProperties.get(TableProperties.PARTITION_BASE_OPTIMIZED_TIME);
long lastBaseOptimizedTime = optimizedTime == null ? 0 :
Long.parseLong(optimizedTime);
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
index 8252147b3..0d804cdec 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
@@ -133,6 +133,19 @@ public class TableProperties {
"self-optimizing.min-plan-interval";
public static final long SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT = 60000;
+ /** metadata-based evaluation related properties */
+ public static final String SELF_OPTIMIZING_EVALUATION_FALLBACK_INTERVAL =
+ "self-optimizing.evaluation.fallback-interval"; // fallback evaluation
interval in
+ // milliseconds
+
+ public static final int SELF_OPTIMIZING_EVALUATION_FALLBACK_INTERVAL_DEFAULT
=
+ -1; // metadata-based evaluation not in effect
+
+ public static final String
SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE =
+ "self-optimizing.evaluation.file-size.mse-tolerance"; // Mean Squared
Error (MSE) tolerance
+ // for file size evaluation
+ public static final long
SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT = 0;
+
/** table clean related properties */
public static final String ENABLE_TABLE_EXPIRE = "table-expire.enabled";
diff --git
a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/evaluation/TestMetadataBasedEvaluationEvent.java
b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/evaluation/TestMetadataBasedEvaluationEvent.java
new file mode 100644
index 000000000..49229d481
--- /dev/null
+++
b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/evaluation/TestMetadataBasedEvaluationEvent.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizing.evaluation;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.catalog.BasicCatalogTestHelper;
+import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.catalog.TableTestBase;
+import org.apache.amoro.config.OptimizingConfig;
+import org.apache.amoro.data.ChangeAction;
+import org.apache.amoro.iceberg.Constants;
+import org.apache.amoro.io.IcebergDataTestHelpers;
+import org.apache.amoro.io.MixedDataTestHelpers;
+import org.apache.amoro.io.writer.RecordWithAction;
+import org.apache.amoro.optimizing.plan.CommonPartitionEvaluator;
+import org.apache.amoro.optimizing.plan.MixedIcebergPartitionPlan;
+import org.apache.amoro.optimizing.plan.PartitionEvaluator;
+import org.apache.amoro.optimizing.scan.IcebergTableFileScanHelper;
+import org.apache.amoro.optimizing.scan.KeyedTableFileScanHelper;
+import org.apache.amoro.optimizing.scan.TableFileScanHelper;
+import org.apache.amoro.optimizing.scan.UnkeyedTableFileScanHelper;
+import org.apache.amoro.properties.HiveTableProperties;
+import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.amoro.table.KeyedTable;
+import org.apache.amoro.table.KeyedTableSnapshot;
+import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.TableProperties;
+import org.apache.amoro.table.UnkeyedTable;
+import org.apache.amoro.utils.TablePropertyUtil;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@RunWith(Parameterized.class)
+public class TestMetadataBasedEvaluationEvent extends TableTestBase {
+
+ public TestMetadataBasedEvaluationEvent(
+ CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
+ super(catalogTestHelper, tableTestHelper);
+ }
+
+ public static final Schema TABLE_SCHEMA =
+ new Schema(
+ Lists.newArrayList(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "name", Types.StringType.get()),
+ Types.NestedField.required(3, "ts", Types.LongType.get()),
+ Types.NestedField.required(4, "op_time",
Types.TimestampType.withoutZone())),
+ Sets.newHashSet(1, 2, 3, 4));
+ public static final PartitionSpec SPEC =
+ PartitionSpec.builderFor(TABLE_SCHEMA).day("op_time").build();
+
+ @Parameterized.Parameters(name = "{0}, {1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ {
+ new BasicCatalogTestHelper(TableFormat.ICEBERG),
+ new BasicTableTestHelper(TABLE_SCHEMA, true, SPEC)
+ },
+ {new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG), new
BasicTableTestHelper(true, true)}
+ };
+ }
+
+ public void initData() throws IOException {
+ if (getMixedTable().isKeyedTable()) {
+ writeBaseStore(getMixedTable().asKeyedTable(), initRecords(1, 4, 0,
"2022-01-01T12:00:00"));
+ writeBaseStore(getMixedTable().asKeyedTable(), initRecords(5, 8, 0,
"2022-01-01T12:00:00"));
+
+ } else {
+ write(getMixedTable().asUnkeyedTable(), initRecords(1, "aaa", 0, 1,
ChangeAction.INSERT));
+ write(getMixedTable().asUnkeyedTable(), initRecords(2, "bbb", 0, 1,
ChangeAction.INSERT));
+ }
+ }
+
+ private List<RecordWithAction> initRecords(
+ int id, String name, long ts, int day, ChangeAction action) {
+ ImmutableList.Builder<RecordWithAction> builder = ImmutableList.builder();
+ builder.add(
+ new RecordWithAction(
+ MixedDataTestHelpers.createRecord(
+ id, name, ts, String.format("2022-01-%02dT12:00:00", day)),
+ action));
+
+ return builder.build();
+ }
+
+ private List<Record> initRecords(int from, int to, long ts, String opTime) {
+ ImmutableList.Builder<Record> builder = ImmutableList.builder();
+ for (int i = from; i <= to; i++) {
+ builder.add(tableTestHelper().generateTestRecord(i, i + "", ts, opTime));
+ }
+
+ return builder.build();
+ }
+
+ private void write(UnkeyedTable table, List<RecordWithAction> list) throws
IOException {
+ WriteResult result = IcebergDataTestHelpers.delta(table, list);
+
+ RowDelta rowDelta = table.newRowDelta();
+ Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
+ Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
+ rowDelta.commit();
+ }
+
+ private void writeBaseStore(KeyedTable keyedTable, List<Record> records) {
+ List<DataFile> baseFiles =
+ tableTestHelper()
+ .writeBaseStore(keyedTable, keyedTable.beginTransaction(""),
records, false);
+ AppendFiles baseAppend = keyedTable.baseTable().newAppend();
+ baseFiles.forEach(baseAppend::appendFile);
+ baseAppend.commit();
+ }
+
+ private void writeChangeStore(KeyedTable keyedTable, List<DataFile>
dataFiles) {
+ AppendFiles appendFiles = keyedTable.changeTable().newAppend();
+ dataFiles.forEach(appendFiles::appendFile);
+ appendFiles.commit();
+ }
+
+ public void addChangeStoreData() throws IOException {
+ MixedTable mixedTable = getMixedTable();
+ if (mixedTable.isUnkeyedTable()) {
+ write(mixedTable.asUnkeyedTable(), initRecords(1, "aaa", 0, 1,
ChangeAction.DELETE));
+ write(mixedTable.asUnkeyedTable(), initRecords(2, "ccc", 0, 1,
ChangeAction.UPDATE_AFTER));
+ } else {
+ writeChangeStore(
+ mixedTable.asKeyedTable(),
+ tableTestHelper()
+ .writeChangeStore(
+ mixedTable.asKeyedTable(),
+ mixedTable.asKeyedTable().beginTransaction(""),
+ ChangeAction.INSERT,
+ initRecords(1, 2, 0, "2022-01-01T12:00:00"),
+ false));
+ writeChangeStore(
+ mixedTable.asKeyedTable(),
+ tableTestHelper()
+ .writeChangeStore(
+ mixedTable.asKeyedTable(),
+ mixedTable.asKeyedTable().beginTransaction(""),
+ ChangeAction.INSERT,
+ initRecords(3, 8, 0, "2022-01-01T12:00:00"),
+ false));
+ writeChangeStore(
+ mixedTable.asKeyedTable(),
+ tableTestHelper()
+ .writeChangeStore(
+ mixedTable.asKeyedTable(),
+ mixedTable.asKeyedTable().beginTransaction(""),
+ ChangeAction.DELETE,
+ initRecords(1, 1, 0, "2022-01-01T12:00:00"),
+ false));
+ writeChangeStore(
+ mixedTable.asKeyedTable(),
+ tableTestHelper()
+ .writeChangeStore(
+ mixedTable.asKeyedTable(),
+ mixedTable.asKeyedTable().beginTransaction(""),
+ ChangeAction.DELETE,
+ initRecords(2, 2, 0, "2022-01-01T12:00:00"),
+ false));
+ writeChangeStore(
+ mixedTable.asKeyedTable(),
+ tableTestHelper()
+ .writeChangeStore(
+ mixedTable.asKeyedTable(),
+ mixedTable.asKeyedTable().beginTransaction(""),
+ ChangeAction.DELETE,
+ initRecords(3, 4, 0, "2022-01-01T12:00:00"),
+ false));
+ }
+ }
+
+ @Test
+ public void test_evaluating_metadataBasedTriggerEnabled() {
+ OptimizingConfig config = getDefaultOptimizingConfig();
+ Assert.assertFalse(config.isMetadataBasedTriggerEnabled());
+
+ config.setEvaluationFallbackInterval(Long.MAX_VALUE);
+ Assert.assertTrue(config.isMetadataBasedTriggerEnabled());
+ }
+
+ @Test
+ public void test_evaluating_emptyTable() {
+ // Temporarily set self-optimizing.evaluation.fallback-interval to
Long.MAX_VALUE to prevent
+ // triggering due to reaching the fallback interval.
+ OptimizingConfig config =
+
getDefaultOptimizingConfig().setEvaluationFallbackInterval(Long.MAX_VALUE);
+ MixedTable table = getMixedTable();
+
+ // Verify the empty table stats
+ TableStatsProvider.BasicFileStats stats =
+ MixedAndIcebergTableStatsProvider.INSTANCE.collect(table);
+ Assert.assertEquals(0, stats.dataFileCnt);
+ Assert.assertEquals(0, stats.totalFileSize);
+ Assert.assertEquals(0, stats.deleteFileCnt);
+
+ // Verify the empty table evaluation
+ Assert.assertTrue(config.isMetadataBasedTriggerEnabled());
+
Assert.assertFalse(MetadataBasedEvaluationEvent.isReachFallbackInterval(config,
0L));
+
Assert.assertFalse(MetadataBasedEvaluationEvent.isEvaluatingNecessary(config,
table, 0L));
+ }
+
+ @Test
+ public void test_evaluating_nonEmptyTable() throws IOException {
+ initData();
+ OptimizingConfig config = getDefaultOptimizingConfig();
+ MixedTable table = getMixedTable();
+
+ // Verify the nonEmpty table stats
+ TableStatsProvider.BasicFileStats stats =
+ MixedAndIcebergTableStatsProvider.INSTANCE.collect(table);
+ Assert.assertTrue(stats.dataFileCnt > 0);
+ Assert.assertTrue(stats.totalFileSize > 0);
+ Assert.assertEquals(0, stats.deleteFileCnt);
+
+ // 1. Test for metadata-based trigger disabled.
+ config.setEvaluationFallbackInterval(-1);
+ Assert.assertFalse(config.isMetadataBasedTriggerEnabled());
+
+ // 2. Test for metadata-based trigger enabled.
+ // Temporarily set self-optimizing.evaluation.fallback-interval to
Long.MAX_VALUE to prevent
+ // triggering due to reaching the fallback interval.
+ config.setEvaluationFallbackInterval(Long.MAX_VALUE);
+ Assert.assertTrue(config.isMetadataBasedTriggerEnabled());
+
Assert.assertFalse(MetadataBasedEvaluationEvent.isReachFallbackInterval(config,
0L));
+
+ // 2.1 Test for evaluating pendingInput necessary.
+ config.setTargetSize(134217728);
+
Assert.assertTrue(MetadataBasedEvaluationEvent.isEvaluatingNecessary(config,
table, 0L));
+
+ // 2.2 Test for evaluating pendingInput not necessary
+ // Temporarily set self-optimizing.target-size to a lower value than the
average file size
+ config.setTargetSize(100);
+
Assert.assertFalse(MetadataBasedEvaluationEvent.isEvaluatingNecessary(config,
table, 0L));
+
+ // 2.3 Test for evaluating pendingInput necessary because the fallback
interval has been
+ // reached.
+ config.setEvaluationFallbackInterval(0L);
+ Assert.assertTrue(config.isMetadataBasedTriggerEnabled());
+
Assert.assertTrue(MetadataBasedEvaluationEvent.isEvaluatingNecessary(config,
table, 0L));
+ }
+
+ @Test
+ public void test_evaluating_pendingInput_nonEmptyTable() throws IOException {
+ initData();
+ // Set metadata-based trigger enabled and fallback interval not reached.
+ OptimizingConfig config =
+
getDefaultOptimizingConfig().setEvaluationFallbackInterval(Long.MAX_VALUE);
+ MixedTable table = getMixedTable();
+
+ // 1. Test file size square error sum updates during partition plans
initialization using
+ // default mse tolerance (=0) , expecting no updates.
+ TableFileScanHelper tableFileScanHelper = initTableFileScanHelper();
+ Map<String, PartitionEvaluator> partitionPlanMap =
+ initPartitionPlans(tableFileScanHelper, config);
+
+ Assert.assertEquals(1, partitionPlanMap.size());
+
+ long sizeSquaredErrorSum =
+ ((CommonPartitionEvaluator) new
ArrayList<>(partitionPlanMap.values()).get(0))
+ .getFileSizeSquaredErrorSum();
+ Assert.assertEquals(0L, sizeSquaredErrorSum);
+
+ List<PartitionEvaluator> necessaryPartitions =
+ partitionPlanMap.values().stream()
+ .filter(PartitionEvaluator::isNecessary)
+ .collect(Collectors.toList());
+ Assert.assertEquals(1, necessaryPartitions.size());
+
+ // 2. Set mse tolerance > 0 to enabled file size square error sum update
during initializing
+ // Partition plans.
+ config.setEvaluationMseTolerance(120000000);
+ partitionPlanMap = initPartitionPlans(tableFileScanHelper, config);
+ Assert.assertEquals(1, partitionPlanMap.size());
+
+ sizeSquaredErrorSum =
+ ((CommonPartitionEvaluator) new
ArrayList<>(partitionPlanMap.values()).get(0))
+ .getFileSizeSquaredErrorSum();
+ Assert.assertTrue(sizeSquaredErrorSum > 0);
+
+ necessaryPartitions =
+ partitionPlanMap.values().stream()
+ .filter(PartitionEvaluator::isNecessary)
+ .collect(Collectors.toList());
+ Assert.assertEquals(0, necessaryPartitions.size());
+
+ // 3. Test the file size variance updated after adding change data.
+ addChangeStoreData();
+ partitionPlanMap = initPartitionPlans(initTableFileScanHelper(), config);
+ Assert.assertEquals(1, partitionPlanMap.size());
+
+ long sizeSquaredErrorSumUpdated1 =
+ ((CommonPartitionEvaluator) new
ArrayList<>(partitionPlanMap.values()).get(0))
+ .getFileSizeSquaredErrorSum();
+ Assert.assertNotEquals(sizeSquaredErrorSum, sizeSquaredErrorSumUpdated1);
+
+ necessaryPartitions =
+ partitionPlanMap.values().stream()
+ .filter(PartitionEvaluator::isNecessary)
+ .collect(Collectors.toList());
+ Assert.assertEquals(0, necessaryPartitions.size());
+
+ // 4. Set mse tolerance smaller for partitions to test necessary pending.
+ config.setEvaluationMseTolerance(100000000);
+ partitionPlanMap = initPartitionPlans(initTableFileScanHelper(), config);
+
+ long sizeSquaredErrorSumUpdated2 =
+ ((CommonPartitionEvaluator) new
ArrayList<>(partitionPlanMap.values()).get(0))
+ .getFileSizeSquaredErrorSum();
+ Assert.assertEquals(sizeSquaredErrorSumUpdated2,
sizeSquaredErrorSumUpdated1);
+
+ necessaryPartitions =
+ partitionPlanMap.values().stream()
+ .filter(PartitionEvaluator::isNecessary)
+ .collect(Collectors.toList());
+ Assert.assertEquals(1, necessaryPartitions.size());
+ }
+
+ private TableFileScanHelper initTableFileScanHelper() {
+ MixedTable mixedTable = getMixedTable();
+ TableFileScanHelper tableFileScanHelper;
+ if (TableFormat.ICEBERG.equals(mixedTable.format())) {
+ tableFileScanHelper =
+ new IcebergTableFileScanHelper(
+ mixedTable.asUnkeyedTable(),
+ mixedTable.asUnkeyedTable().currentSnapshot().snapshotId());
+ } else {
+ if (mixedTable.isUnkeyedTable()) {
+ tableFileScanHelper =
+ new UnkeyedTableFileScanHelper(
+ mixedTable.asUnkeyedTable(),
+ mixedTable.asUnkeyedTable().currentSnapshot().snapshotId());
+ } else {
+ Snapshot currentSnapshot =
mixedTable.asKeyedTable().baseTable().currentSnapshot();
+ Snapshot changeSnapshot =
mixedTable.asKeyedTable().changeTable().currentSnapshot();
+
+ tableFileScanHelper =
+ new KeyedTableFileScanHelper(
+ mixedTable.asKeyedTable(),
+ new KeyedTableSnapshot(
+ currentSnapshot != null
+ ? currentSnapshot.snapshotId()
+ : Constants.INVALID_SNAPSHOT_ID,
+ changeSnapshot != null
+ ? changeSnapshot.snapshotId()
+ : Constants.INVALID_SNAPSHOT_ID));
+ }
+ }
+ tableFileScanHelper.withPartitionFilter(Expressions.alwaysTrue());
+
+ return tableFileScanHelper;
+ }
+
+ private Map<String, PartitionEvaluator> initPartitionPlans(
+ TableFileScanHelper tableFileScanHelper, OptimizingConfig config) {
+ Map<String, PartitionEvaluator> partitionPlanMap = Maps.newHashMap();
+
+ long count = 0;
+ try (CloseableIterable<TableFileScanHelper.FileScanResult> results =
+ tableFileScanHelper.scan()) {
+ for (TableFileScanHelper.FileScanResult fileScanResult : results) {
+ PartitionSpec partitionSpec =
tableFileScanHelper.getSpec(fileScanResult.file().specId());
+ StructLike partition = fileScanResult.file().partition();
+ String partitionPath = partitionSpec.partitionToPath(partition);
+ PartitionEvaluator evaluator =
+ partitionPlanMap.computeIfAbsent(
+ partitionPath,
+ ignore -> buildEvaluator(Pair.of(partitionSpec.specId(),
partition), config));
+ evaluator.addFile(fileScanResult.file(), fileScanResult.deleteFiles());
+ count++;
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ return partitionPlanMap;
+ }
+
+ private PartitionEvaluator buildEvaluator(
+ Pair<Integer, StructLike> partition, OptimizingConfig config) {
+ if (getMixedTable().isUnkeyedTable()) {
+ return new CommonPartitionEvaluator(
+ ServerTableIdentifier.of(TableTestHelper.TEST_TABLE_ID,
TableFormat.ICEBERG),
+ config,
+ partition,
+ System.currentTimeMillis(),
+ 0L,
+ 0L,
+ 0L);
+ } else {
+ Map<String, String> partitionProperties =
+ TablePropertyUtil.getPartitionProperties(getMixedTable(),
partition.second());
+ return new MixedIcebergPartitionPlan.MixedIcebergPartitionEvaluator(
+ ServerTableIdentifier.of(TableTestHelper.TEST_TABLE_ID,
TableFormat.MIXED_ICEBERG),
+ config,
+ partition,
+ partitionProperties,
+ System.currentTimeMillis(),
+ getMixedTable().isKeyedTable(),
+ 0L,
+ 0L,
+ 0L);
+ }
+ }
+
+ @Test
+ public void test_setFileSizeMSETolerance() {
+ OptimizingConfig config = new OptimizingConfig();
+ Assert.assertEquals(0, config.getEvaluationMseTolerance());
+
+ config.setEvaluationMseTolerance(1000);
+ Assert.assertEquals(1000, config.getEvaluationMseTolerance());
+
+ config.setEvaluationMseTolerance(140000000);
+ Assert.assertEquals(140000000, config.getEvaluationMseTolerance());
+ }
+
+ @Test
+ public void testBasicStatsAccept() {
+ MixedAndIcebergTableStatsProvider.BasicFileStats stats =
+ new MixedAndIcebergTableStatsProvider.BasicFileStats();
+ // Initial state should be zeros
+ Assert.assertEquals(0, stats.deleteFileCnt);
+ Assert.assertEquals(0, stats.dataFileCnt);
+ Assert.assertEquals(0, stats.totalFileSize);
+ // Create first summary map
+ Map<String, String> summary1 = new HashMap<>();
+ summary1.put("total-delete-files", "5");
+ summary1.put("total-data-files", "10");
+ summary1.put("total-files-size", "1024");
+ stats.accept(summary1);
+ Assert.assertEquals(5, stats.deleteFileCnt);
+ Assert.assertEquals(10, stats.dataFileCnt);
+ Assert.assertEquals(1024, stats.totalFileSize);
+ // Create second summary map to test accumulation
+ Map<String, String> summary2 = new HashMap<>();
+ summary2.put("total-delete-files", "3");
+ summary2.put("total-data-files", "7");
+ summary2.put("total-files-size", "2048");
+ stats.accept(summary2);
+ // Values should be accumulated
+ Assert.assertEquals(8, stats.deleteFileCnt); // 5 + 3
+ Assert.assertEquals(17, stats.dataFileCnt); // 10 + 7
+ Assert.assertEquals(3072, stats.totalFileSize); // 1024 + 2048
+ }
+
+ @Test
+ public void testBasicTableStatsAcceptWithMissingProperties() {
+ MixedAndIcebergTableStatsProvider.BasicFileStats stats =
+ new MixedAndIcebergTableStatsProvider.BasicFileStats();
+ // Summary map with missing properties should use default values
+ Map<String, String> summary = new HashMap<>();
+ // Only provide one property, others should default to 0
+ summary.put("total-data-files", "15");
+ stats.accept(summary);
+ Assert.assertEquals(0, stats.deleteFileCnt); // default value
+ Assert.assertEquals(15, stats.dataFileCnt);
+ Assert.assertEquals(0, stats.totalFileSize); // default value
+ }
+
+ @Test
+ public void testBasicTableStatsAcceptWithInvalidValues() {
+ MixedAndIcebergTableStatsProvider.BasicFileStats stats =
+ new MixedAndIcebergTableStatsProvider.BasicFileStats();
+ // Summary map with invalid numeric values should use default values
+ Map<String, String> summary = new HashMap<>();
+ summary.put("total-delete-files", "invalid");
+ summary.put("total-data-files", "20");
+ summary.put("total-files-size", "not-a-number");
+
+ // Invalid values should throw NumberFormatException
+ Assert.assertThrows(NumberFormatException.class, () ->
stats.accept(summary));
+ }
+
+ OptimizingConfig getDefaultOptimizingConfig() {
+ return new OptimizingConfig()
+ .setEnabled(TableProperties.ENABLE_SELF_OPTIMIZING_DEFAULT)
+
.setAllowPartialCommit(TableProperties.SELF_OPTIMIZING_ALLOW_PARTIAL_COMMIT_DEFAULT)
+
.setMaxExecuteRetryCount(TableProperties.SELF_OPTIMIZING_EXECUTE_RETRY_NUMBER_DEFAULT)
+ .setOptimizerGroup(TableProperties.SELF_OPTIMIZING_GROUP_DEFAULT)
+
.setFragmentRatio(TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT)
+
.setMinTargetSizeRatio(TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO_DEFAULT)
+ .setMaxFileCount(TableProperties.SELF_OPTIMIZING_MAX_FILE_CNT_DEFAULT)
+ .setOpenFileCost(TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT)
+ .setTargetSize(TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT)
+ .setMaxTaskSize(TableProperties.SELF_OPTIMIZING_MAX_TASK_SIZE_DEFAULT)
+ .setTargetQuota(TableProperties.SELF_OPTIMIZING_QUOTA_DEFAULT)
+
.setMinorLeastFileCount(TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT_DEFAULT)
+
.setMinorLeastInterval(TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_INTERVAL_DEFAULT)
+ .setMajorDuplicateRatio(
+
TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO_DEFAULT)
+
.setFullTriggerInterval(TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL_DEFAULT)
+
.setFullRewriteAllFiles(TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT)
+ .setFilter(TableProperties.SELF_OPTIMIZING_FILTER_DEFAULT)
+ .setBaseHashBucket(TableProperties.BASE_FILE_INDEX_HASH_BUCKET_DEFAULT)
+ .setBaseRefreshInterval(TableProperties.BASE_REFRESH_INTERVAL_DEFAULT)
+
.setHiveRefreshInterval(HiveTableProperties.REFRESH_HIVE_INTERVAL_DEFAULT)
+
.setMinPlanInterval(TableProperties.SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT)
+ .setEvaluationFallbackInterval(
+
TableProperties.SELF_OPTIMIZING_EVALUATION_FALLBACK_INTERVAL_DEFAULT)
+ .setEvaluationMseTolerance(
+
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT);
+ }
+}
diff --git
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingEvaluator.java
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingEvaluator.java
index fdee3d7c4..8ff159d96 100644
---
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingEvaluator.java
+++
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingEvaluator.java
@@ -38,7 +38,8 @@ public class MixedHiveOptimizingEvaluator extends
MixedIcebergOptimizingEvaluato
TableSnapshot currentSnapshot,
int maxPendingPartitions,
long lastMinorOptimizingTime,
- long lastFullOptimizingTime) {
+ long lastFullOptimizingTime,
+ long lastMajorOptimizingTime) {
super(
identifier,
config,
@@ -46,7 +47,8 @@ public class MixedHiveOptimizingEvaluator extends
MixedIcebergOptimizingEvaluato
currentSnapshot,
maxPendingPartitions,
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
}
@Override
@@ -62,6 +64,7 @@ public class MixedHiveOptimizingEvaluator extends
MixedIcebergOptimizingEvaluato
System.currentTimeMillis(),
mixedTable.isKeyedTable(),
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
}
}
diff --git
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingPlanner.java
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingPlanner.java
index aaf6f648a..68a03cb0d 100644
---
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingPlanner.java
+++
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHiveOptimizingPlanner.java
@@ -42,7 +42,8 @@ public class MixedHiveOptimizingPlanner extends
AbstractOptimizingPlanner {
double availableCore,
long maxInputSizePerThread,
long lastMinorOptimizingTime,
- long lastFullOptimizingTime) {
+ long lastFullOptimizingTime,
+ long lastMajorOptimizingTime) {
super(
identifier,
config,
@@ -53,7 +54,8 @@ public class MixedHiveOptimizingPlanner extends
AbstractOptimizingPlanner {
availableCore,
maxInputSizePerThread,
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
this.hiveLocation = (((SupportHive) mixedTable).hiveLocation());
}
@@ -67,6 +69,7 @@ public class MixedHiveOptimizingPlanner extends
AbstractOptimizingPlanner {
hiveLocation,
planTime,
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
}
}
diff --git
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
index ac17e565c..760c441bf 100644
---
a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
+++
b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/plan/MixedHivePartitionPlan.java
@@ -52,7 +52,8 @@ public class MixedHivePartitionPlan extends
MixedIcebergPartitionPlan {
String hiveLocation,
long planTime,
long lastMinorOptimizingTime,
- long lastFullOptimizingTime) {
+ long lastFullOptimizingTime,
+ long lastMajorOptimizingTime) {
super(
identifier,
table,
@@ -60,7 +61,8 @@ public class MixedHivePartitionPlan extends
MixedIcebergPartitionPlan {
partition,
planTime,
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
this.hiveLocation = hiveLocation;
}
@@ -112,7 +114,8 @@ public class MixedHivePartitionPlan extends
MixedIcebergPartitionPlan {
planTime,
isKeyedTable(),
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
}
@Override
@@ -154,7 +157,8 @@ public class MixedHivePartitionPlan extends
MixedIcebergPartitionPlan {
long planTime,
boolean keyedTable,
long lastMinorOptimizingTime,
- long lastFullOptimizingTime) {
+ long lastFullOptimizingTime,
+ long lastMajorOptimizingTime) {
super(
identifier,
config,
@@ -163,7 +167,8 @@ public class MixedHivePartitionPlan extends
MixedIcebergPartitionPlan {
planTime,
keyedTable,
lastMinorOptimizingTime,
- lastFullOptimizingTime);
+ lastFullOptimizingTime,
+ lastMajorOptimizingTime);
this.hiveLocation = hiveLocation;
String optimizedTime =
partitionProperties.get(HiveTableProperties.PARTITION_PROPERTIES_KEY_TRANSIENT_TIME);