This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch 0.7.x
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/0.7.x by this push:
new a2ec9a42e [AMORO-3228] Rewrite pos delete files not written by
optimizing process (#3229)
a2ec9a42e is described below
commit a2ec9a42ed065163f9e01a1e235aceaef4e1d07e
Author: ZhouJinsong <[email protected]>
AuthorDate: Wed Oct 9 11:52:45 2024 +0800
[AMORO-3228] Rewrite pos delete files not written by optimizing process
(#3229)
Rewrite pos delete files not written by optimizing
(cherry picked from commit a1812818ff247ff6216a70234c38a8f4dd0a54fe)
Signed-off-by: zhoujinsong <[email protected]>
---
.../amoro/server/optimizing/plan/CommonPartitionEvaluator.java | 9 +++++++--
.../apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java | 5 ++++-
.../src/main/java/org/apache/amoro/utils/TableFileUtil.java | 10 ++++++++++
3 files changed, 21 insertions(+), 3 deletions(-)
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
index 6ed8214b1..357fc6ce4 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
@@ -24,6 +24,7 @@ import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.amoro.utils.TableFileUtil;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
@@ -215,8 +216,12 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
public boolean segmentShouldRewritePos(DataFile dataFile,
List<ContentFile<?>> deletes) {
Preconditions.checkArgument(!isFragmentFile(dataFile), "Unsupported
fragment file.");
- if (deletes.stream().filter(delete -> delete.content() ==
FileContent.POSITION_DELETES).count()
- >= 2) {
+ long posDeleteFileCount =
+ deletes.stream().filter(delete -> delete.content() ==
FileContent.POSITION_DELETES).count();
+ if (posDeleteFileCount == 1) {
+ return !TableFileUtil.isOptimizingPosDeleteFile(
+ dataFile.path().toString(), deletes.get(0).path().toString());
+ } else if (posDeleteFileCount > 1) {
combinePosSegmentFileCount++;
return true;
}
diff --git
a/amoro-core/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java
b/amoro-core/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java
index b709e18f2..ea3b7ef89 100644
---
a/amoro-core/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java
+++
b/amoro-core/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java
@@ -157,7 +157,10 @@ public class IcebergFanoutPosDeleteWriter<T>
String fileDir = TableFileUtil.getFileDir(filePath.get().toString());
String deleteFilePath =
format.addExtension(
- String.format("%s/%s-delete-%s", fileDir, fileName,
fileNameSuffix));
+ String.format(
+ "%s/%s",
+ fileDir,
+ TableFileUtil.optimizingPosDeleteFileName(fileName,
fileNameSuffix)));
EncryptedOutputFile outputFile =
encryptionManager.encrypt(fileIO.newOutputFile(deleteFilePath));
diff --git a/amoro-core/src/main/java/org/apache/amoro/utils/TableFileUtil.java
b/amoro-core/src/main/java/org/apache/amoro/utils/TableFileUtil.java
index e1b57e45d..e140c79eb 100644
--- a/amoro-core/src/main/java/org/apache/amoro/utils/TableFileUtil.java
+++ b/amoro-core/src/main/java/org/apache/amoro/utils/TableFileUtil.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class TableFileUtil {
private static final Logger LOG =
LoggerFactory.getLogger(TableFileUtil.class);
+ private static final String POS_DELETE_FILE_IDENTIFIER = "delete";
/**
* Parse file name form file path
@@ -192,4 +193,13 @@ public class TableFileUtil {
Path p = new Path(path);
return p.getParent().toString();
}
+
+ public static String optimizingPosDeleteFileName(String dataFileName, String
suffix) {
+ return String.format("%s-%s-%s", dataFileName, POS_DELETE_FILE_IDENTIFIER,
suffix);
+ }
+
+ public static boolean isOptimizingPosDeleteFile(String dataFilePath, String
posDeleteFilePath) {
+ return getFileName(posDeleteFilePath)
+ .startsWith(String.format("%s-%s", getFileName(dataFilePath),
POS_DELETE_FILE_IDENTIFIER));
+ }
}