This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new a1812818f [AMORO-3228] Rewrite pos delete files not written by 
optimizing process (#3229)
a1812818f is described below

commit a1812818ff247ff6216a70234c38a8f4dd0a54fe
Author: ZhouJinsong <zhoujinsong0...@163.com>
AuthorDate: Wed Oct 9 11:52:45 2024 +0800

    [AMORO-3228] Rewrite pos delete files not written by optimizing process 
(#3229)
    
    Rewrite pos delete files not written by optimizing
---
 .../amoro/server/optimizing/plan/CommonPartitionEvaluator.java |  9 +++++++--
 .../apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java   |  5 ++++-
 .../src/main/java/org/apache/amoro/utils/TableFileUtil.java    | 10 ++++++++++
 3 files changed, 21 insertions(+), 3 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
index 013f11f87..8038e9761 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
@@ -24,6 +24,7 @@ import org.apache.amoro.server.table.TableRuntime;
 import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.amoro.utils.TableFileUtil;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileContent;
@@ -226,8 +227,12 @@ public class CommonPartitionEvaluator implements 
PartitionEvaluator {
 
   public boolean segmentShouldRewritePos(DataFile dataFile, 
List<ContentFile<?>> deletes) {
     Preconditions.checkArgument(!isFragmentFile(dataFile), "Unsupported 
fragment file.");
-    if (deletes.stream().filter(delete -> delete.content() == 
FileContent.POSITION_DELETES).count()
-        >= 2) {
+    long posDeleteFileCount =
+        deletes.stream().filter(delete -> delete.content() == 
FileContent.POSITION_DELETES).count();
+    if (posDeleteFileCount == 1) {
+      return !TableFileUtil.isOptimizingPosDeleteFile(
+          dataFile.path().toString(), deletes.get(0).path().toString());
+    } else if (posDeleteFileCount > 1) {
       combinePosSegmentFileCount++;
       return true;
     }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java
index b709e18f2..ea3b7ef89 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java
@@ -157,7 +157,10 @@ public class IcebergFanoutPosDeleteWriter<T>
           String fileDir = TableFileUtil.getFileDir(filePath.get().toString());
           String deleteFilePath =
               format.addExtension(
-                  String.format("%s/%s-delete-%s", fileDir, fileName, 
fileNameSuffix));
+                  String.format(
+                      "%s/%s",
+                      fileDir,
+                      TableFileUtil.optimizingPosDeleteFileName(fileName, 
fileNameSuffix)));
           EncryptedOutputFile outputFile =
               encryptionManager.encrypt(fileIO.newOutputFile(deleteFilePath));
 
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java
index e1b57e45d..e140c79eb 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class TableFileUtil {
   private static final Logger LOG = 
LoggerFactory.getLogger(TableFileUtil.class);
+  private static final String POS_DELETE_FILE_IDENTIFIER = "delete";
 
   /**
    * Parse file name form file path
@@ -192,4 +193,13 @@ public class TableFileUtil {
     Path p = new Path(path);
     return p.getParent().toString();
   }
+
+  public static String optimizingPosDeleteFileName(String dataFileName, String 
suffix) {
+    return String.format("%s-%s-%s", dataFileName, POS_DELETE_FILE_IDENTIFIER, 
suffix);
+  }
+
+  public static boolean isOptimizingPosDeleteFile(String dataFilePath, String 
posDeleteFilePath) {
+    return getFileName(posDeleteFilePath)
+        .startsWith(String.format("%s-%s", getFileName(dataFilePath), 
POS_DELETE_FILE_IDENTIFIER));
+  }
 }

Reply via email to