xxubai commented on code in PR #3776:
URL: https://github.com/apache/amoro/pull/3776#discussion_r2480748442
##########
amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java:
##########
@@ -451,19 +462,132 @@ public List<PartitionBaseInfo>
getTablePartitions(AmoroTable<?> amoroTable) {
getTableFilesInternal(amoroTable, null, null);
try {
Review Comment:
Can use try catch with resource to close the io automaticly
```suggestion
try (CloseableIterable<PartitionFileBaseInfo> tableFiles
= getTableFilesInternal(amoroTable, null, null)) {
for (PartitionFileBaseInfo fileInfo : tableFiles) {
refreshPartitionBasicInfo(fileInfo, partitionBaseInfoHashMap);
}
} catch (IOException e) {
LOG.warn("Failed to close the manifest reader.", e);
}
```
##########
amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java:
##########
@@ -133,6 +134,19 @@ private TableProperties() {}
"self-optimizing.min-plan-interval";
public static final long SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT = 60000;
+ /** metric-based evaluation related properties */
+ public static final String
SELF_OPTIMIZING_EVALUATION_AVERAGE_FILE_SIZE_TOLERANCE =
+ "self-optimizing.evaluation.average-file-size.tolerance"; // the minimum
tolerance value for
+ // the average
+ // partition file size (between 0 and (self-optimizing.target-size))
+ public static final MemorySize
SELF_OPTIMIZING_EVALUATION_AVERAGE_FILE_SIZE_TOLERANCE_DEFAULT =
Review Comment:
Can we use **byte size** to unify the file size unit?
##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java:
##########
@@ -115,7 +116,11 @@ public void execute(TableRuntime tableRuntime) {
!= defaultTableRuntime.getCurrentChangeSnapshotId()))
|| (mixedTable.isUnkeyedTable()
&& lastOptimizedSnapshotId !=
defaultTableRuntime.getCurrentSnapshotId())) {
- tryEvaluatingPendingInput(defaultTableRuntime, mixedTable);
+ if
(!defaultTableRuntime.getOptimizingConfig().isEventBasedTriggerEnabled()
+ || MetricBasedRefreshEvent.isEvaluatingPendingInputNecessary(
Review Comment:
Will this cause an additional full table scan compared to before?
In addition, we should also check whether optimization is enabled, so it
would be better to combine this with `tryEvaluatingPendingInput` to avoid extra
overhead.
##########
amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java:
##########
@@ -609,6 +733,11 @@ public List<OptimizingTaskInfo> getOptimizingTaskInfos(
private CloseableIterable<PartitionFileBaseInfo> getTableFilesInternal(
AmoroTable<?> amoroTable, String partition, Integer specId) {
MixedTable mixedTable = getTable(amoroTable);
+ return getTableFilesInternal(mixedTable, partition, specId);
+ }
+
+ private CloseableIterable<PartitionFileBaseInfo> getTableFilesInternal(
Review Comment:
If I understand correctly, after the event-triggered evaluation, a full
table scan is performed to collect partition information, which can be very
expensive (especially for large tables with hundreds of thousands of files).
Perhaps we can optimize this part when upgrading the Iceberg version and
introducing **PartitionStatistics**.
##########
amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java:
##########
@@ -451,19 +462,132 @@ public List<PartitionBaseInfo>
getTablePartitions(AmoroTable<?> amoroTable) {
getTableFilesInternal(amoroTable, null, null);
try {
for (PartitionFileBaseInfo fileInfo : tableFiles) {
- if (!partitionBaseInfoHashMap.containsKey(fileInfo.getPartition())) {
- PartitionBaseInfo partitionBaseInfo = new PartitionBaseInfo();
- partitionBaseInfo.setPartition(fileInfo.getPartition());
- partitionBaseInfo.setSpecId(fileInfo.getSpecId());
- partitionBaseInfoHashMap.put(fileInfo.getPartition(),
partitionBaseInfo);
- }
- PartitionBaseInfo partitionInfo =
partitionBaseInfoHashMap.get(fileInfo.getPartition());
- partitionInfo.setFileCount(partitionInfo.getFileCount() + 1);
- partitionInfo.setFileSize(partitionInfo.getFileSize() +
fileInfo.getFileSize());
- partitionInfo.setLastCommitTime(
- partitionInfo.getLastCommitTime() > fileInfo.getCommitTime()
- ? partitionInfo.getLastCommitTime()
- : fileInfo.getCommitTime());
+ refreshPartitionBasicInfo(fileInfo, partitionBaseInfoHashMap);
+ }
+ } finally {
+ try {
+ tableFiles.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close the manifest reader.", e);
+ }
+ }
+ return new ArrayList<>(partitionBaseInfoHashMap.values());
+ }
+
+ /**
+ * Create partition base information from a PartitionFileBaseInfo instance.
+ *
+ * @param fileInfo Partition file base information, used to obtain partition
information
+ * @return Returns the partition base information corresponding to the
partition
+ */
+ private PartitionBaseInfo createPartitionBaseInfoFromPartitionFile(
+ PartitionFileBaseInfo fileInfo) {
+ PartitionBaseInfo partitionBaseInfo = new PartitionBaseInfo();
+ partitionBaseInfo.setPartition(fileInfo.getPartition());
+ partitionBaseInfo.setSpecId(fileInfo.getSpecId());
+ return partitionBaseInfo;
+ }
+
+ /**
+ * Refresh the basic information of a partition
+ *
+ * @param fileInfo Partition file base information
+ * @param partitionBaseInfoHashMap A hashmap containing the base information
of all partitions
+ */
+ private void refreshPartitionBasicInfo(
+ PartitionFileBaseInfo fileInfo, Map<String, PartitionBaseInfo>
partitionBaseInfoHashMap) {
+ // Get the partitionBaseInfo instance
+ PartitionBaseInfo partitionInfo =
+ partitionBaseInfoHashMap.computeIfAbsent(
+ fileInfo.getPartition(), key ->
createPartitionBaseInfoFromPartitionFile(fileInfo));
+ // Update the number of files
+ partitionInfo.setFileCount(partitionInfo.getFileCount() + 1);
+ // Update the total file size
+ partitionInfo.setFileSize(partitionInfo.getFileSize() +
fileInfo.getFileSize());
+ // Update the last commit time
+ partitionInfo.setLastCommitTime(
+ partitionInfo.getLastCommitTime() > fileInfo.getCommitTime()
+ ? partitionInfo.getLastCommitTime()
+ : fileInfo.getCommitTime());
+ }
+
+ /**
+ * Refresh and update the detailed properties of a partition based on file
information.
+ *
+ * <p>This method primarily updates statistical properties of the partition,
such as the sum of
+ * squared errors of file sizes, and the counts of base files, insert files,
eq-delete files, and
+ * pos-delete files.</>
+ *
+ * @param fileInfo Partition file base information
+ * @param partitionBaseInfoHashMap A hashmap containing basic information
about all partitions
+ * @param minTargetSize The minimum target size used to limit the file size
and calculate the sum
+ * of squared errors.
+ */
+ private void refreshPartitionDetailProperties(
+ PartitionFileBaseInfo fileInfo,
+ Map<String, PartitionBaseInfo> partitionBaseInfoHashMap,
+ long minTargetSize) {
+ PartitionBaseInfo partitionInfo =
+ partitionBaseInfoHashMap.computeIfAbsent(
+ fileInfo.getPartition(), key ->
createPartitionBaseInfoFromPartitionFile(fileInfo));
+ // Update the file-size-squared-error-sum
+ long actualSize = Math.min(fileInfo.getFileSize(), minTargetSize);
+ long diff = minTargetSize - actualSize;
+ partitionInfo.setProperty(
+ FILE_SIZE_SQUARED_ERROR_SUM,
+ (double)
+ partitionInfo.getPropertyOrDefault(
+ FILE_SIZE_SQUARED_ERROR_SUM,
FILE_SIZE_SQUARED_ERROR_SUM_DEFAULT)
+ + diff * diff);
+
+ // Update the count of base files, insert files, equality delete files,
and position delete
+ // files
+ switch (DataFileType.fromName(fileInfo.getFileType())) {
+ case BASE_FILE:
+ partitionInfo.setProperty(
+ BASE_FILE_COUNT,
+ (long) partitionInfo.getPropertyOrDefault(BASE_FILE_COUNT,
BASE_FILE_COUNT_DEFAULT)
+ + 1);
+ break;
+ case INSERT_FILE:
+ partitionInfo.setProperty(
+ INSERT_FILE_COUNT,
+ (long) partitionInfo.getPropertyOrDefault(INSERT_FILE_COUNT,
INSERT_FILE_COUNT_DEFAULT)
+ + 1);
+ break;
+ case EQ_DELETE_FILE:
+ partitionInfo.setProperty(
+ EQ_DELETE_FILE_COUNT,
+ (long)
+ partitionInfo.getPropertyOrDefault(
+ EQ_DELETE_FILE_COUNT, EQ_DELETE_FILE_COUNT_DEFAULT)
+ + 1);
+ break;
+ case POS_DELETE_FILE:
+ partitionInfo.setProperty(
+ POS_DELETE_FILE_COUNT,
+ (long)
+ partitionInfo.getPropertyOrDefault(
+ POS_DELETE_FILE_COUNT, POS_DELETE_FILE_COUNT_DEFAULT)
+ + 1);
+ break;
+ }
+ }
+
+ /**
+ * Get a list of partition information for a specific table that contains
the square error sum of
+ * the partition file size. This method calculates statistics for each
partition of a table,
+ * including file count, total file size, and the sum of squared errors of
file sizes, for use in
+ * further operations such as table optimization.
+ */
+ public List<PartitionBaseInfo> getTablePartitionsWithDetailProperties(
+ MixedTable table, long minTargetSize) {
+ Map<String, PartitionBaseInfo> partitionBaseInfoHashMap = new HashMap<>();
+ CloseableIterable<PartitionFileBaseInfo> tableFiles =
getTableFilesInternal(table, null, null);
+ try {
Review Comment:
You can simply use a try-with-resources statement.
##########
amoro-ams/src/main/java/org/apache/amoro/server/refresh/event/MetricBasedRefreshEvent.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.refresh.event;
+
+import static
org.apache.amoro.table.TablePartitionDetailProperties.FILE_SIZE_SQUARED_ERROR_SUM;
+import static
org.apache.amoro.table.TablePartitionDetailProperties.FILE_SIZE_SQUARED_ERROR_SUM_DEFAULT;
+
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.config.OptimizingConfig;
+import org.apache.amoro.server.dashboard.MixedAndIcebergTableDescriptor;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
+import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.descriptor.PartitionBaseInfo;
+import org.apache.amoro.utils.MemorySize;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+public class MetricBasedRefreshEvent {
+ private static final Logger logger =
LoggerFactory.getLogger(MetricBasedRefreshEvent.class);
+
+ /**
+ * Determines if evaluating pending input is necessary.
+ *
+ * @param tableRuntime The runtime information of the table.
+ * @param table The table to be evaluated.
+ * @return true if evaluation is necessary, otherwise false.
+ */
+ public static boolean isEvaluatingPendingInputNecessary(
+ DefaultTableRuntime tableRuntime, MixedTable table) {
+ if (table.format() != TableFormat.ICEBERG && table.format() !=
TableFormat.MIXED_ICEBERG) {
+ logger.debug(
+ "MetricBasedRefreshEvent only support ICEBERG/MIXED_ICEBERG tables.
Always return true for other table formats.");
+ return true;
+ }
+
+ // Perform periodic scheduling according to the fallback interval to avoid
false positives or
+ // missed triggers based on metadata metric-driven evaluation
+ OptimizingConfig config = tableRuntime.getOptimizingConfig();
+ if (reachFallbackInterval(
+ tableRuntime.getLastPlanTime(),
config.getEvaluationFallbackInterval())) {
+ logger.info("Maximum interval for evaluating table {} has reached.",
table.id());
+ return true;
+ }
+
+ ExecutorService executorService = ThreadPools.getWorkerPool();
Review Comment:
Use a dedicated thread pool to avoid thread congestion.
```suggestion
ExecutorService executorService =
IcebergThreadPools.getPlanningExecutor();
```
##########
amoro-format-iceberg/src/main/java/org/apache/amoro/table/TablePartitionDetailProperties.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.table;
+
+/** Detailed table partition properties list. */
+public class TablePartitionDetailProperties {
Review Comment:
Can we simply this name? such as `PartitionSummaryProperties`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]