This is an automated email from the ASF dual-hosted git repository. jinsongzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push: new a1812818f [AMORO-3228] Rewrite pos delete files not written by optimizing process (#3229) a1812818f is described below commit a1812818ff247ff6216a70234c38a8f4dd0a54fe Author: ZhouJinsong <zhoujinsong0...@163.com> AuthorDate: Wed Oct 9 11:52:45 2024 +0800 [AMORO-3228] Rewrite pos delete files not written by optimizing process (#3229) Rewrite pos delete files not written by optimizing --- .../amoro/server/optimizing/plan/CommonPartitionEvaluator.java | 9 +++++++-- .../apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java | 5 ++++- .../src/main/java/org/apache/amoro/utils/TableFileUtil.java | 10 ++++++++++ 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java index 013f11f87..8038e9761 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java @@ -24,6 +24,7 @@ import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; +import org.apache.amoro.utils.TableFileUtil; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileContent; @@ -226,8 +227,12 @@ public class CommonPartitionEvaluator implements PartitionEvaluator { public boolean segmentShouldRewritePos(DataFile dataFile, List<ContentFile<?>> deletes) { Preconditions.checkArgument(!isFragmentFile(dataFile), "Unsupported fragment file."); - if (deletes.stream().filter(delete -> delete.content() == FileContent.POSITION_DELETES).count() - >= 2) { + long posDeleteFileCount = + deletes.stream().filter(delete -> delete.content() == FileContent.POSITION_DELETES).count(); + if (posDeleteFileCount == 1) { + return !TableFileUtil.isOptimizingPosDeleteFile( + dataFile.path().toString(), deletes.get(0).path().toString()); + } else if (posDeleteFileCount > 1) { combinePosSegmentFileCount++; return true; } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java index b709e18f2..ea3b7ef89 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java @@ -157,7 +157,10 @@ public class IcebergFanoutPosDeleteWriter<T> String fileDir = TableFileUtil.getFileDir(filePath.get().toString()); String deleteFilePath = format.addExtension( - String.format("%s/%s-delete-%s", fileDir, fileName, fileNameSuffix)); + String.format( + "%s/%s", + fileDir, + TableFileUtil.optimizingPosDeleteFileName(fileName, fileNameSuffix))); EncryptedOutputFile outputFile = encryptionManager.encrypt(fileIO.newOutputFile(deleteFilePath)); diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java index e1b57e45d..e140c79eb 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger; public class TableFileUtil { private static final Logger LOG = LoggerFactory.getLogger(TableFileUtil.class); + private static final String POS_DELETE_FILE_IDENTIFIER = "delete"; /** * Parse file name form file path @@ -192,4 +193,13 @@ public class TableFileUtil { Path p = new Path(path); return p.getParent().toString(); } + + public static String optimizingPosDeleteFileName(String dataFileName, String suffix) { + return String.format("%s-%s-%s", dataFileName, POS_DELETE_FILE_IDENTIFIER, suffix); + } + + public static boolean isOptimizingPosDeleteFile(String dataFilePath, String posDeleteFilePath) { + return getFileName(posDeleteFilePath) + .startsWith(String.format("%s-%s", getFileName(dataFilePath), POS_DELETE_FILE_IDENTIFIER)); + } }