JingsongLi commented on code in PR #852:
URL: https://github.com/apache/incubator-paimon/pull/852#discussion_r1161418186
##########
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java:
##########
@@ -226,28 +239,39 @@ private void expireUntil(long earliestId, long
endExclusiveId) {
writeEarliestHint(endExclusiveId);
}
- private void expireMergeTreeFiles(String manifestListName) {
-
expireMergeTreeFiles(getManifestEntriesFromManifestList(manifestListName));
+ // return a map of partition-buckets of which data files have been deleted
+ private Map<BinaryRow, Set<Integer>> expireMergeTreeFiles(String
manifestListName) {
+ return
expireMergeTreeFiles(getManifestEntriesFromManifestList(manifestListName));
}
@VisibleForTesting
- void expireMergeTreeFiles(Iterable<ManifestEntry> dataFileLog) {
+ Map<BinaryRow, Set<Integer>> expireMergeTreeFiles(Iterable<ManifestEntry>
dataFileLog) {
// we cannot delete a data file directly when we meet a DELETE entry,
because that
// file might be upgraded
Map<Path, List<Path>> dataFileToDelete = new HashMap<>();
Review Comment:
Just use `Map<Path, Triple<BinaryRow, Integer, List<Path>>> dataFileToDelete
= new HashMap<>();`?
##########
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java:
##########
@@ -180,6 +189,10 @@ private void expireUntil(long earliestId, long
endExclusiveId) {
}
}
+ // data files and changelog files in partition directories has been
deleted
+ // then delete changed partition directories if they are empty
+ tryDeletePartitionDirectories(changedPartitions);
Review Comment:
Just `tryDeleteDirectories`, may just delete bucket directories
##########
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java:
##########
@@ -327,4 +353,39 @@ private void writeEarliestHint(long earliest) {
throw new RuntimeException(e);
}
}
+
+ private void tryDeletePartitionDirectories(Map<BinaryRow, Set<Integer>>
changedPartitions) {
+ Map<Integer, Set<Path>> deduplicate = new HashMap<>();
+ for (Map.Entry<BinaryRow, Set<Integer>> entry :
changedPartitions.entrySet()) {
+ for (Integer bucket : entry.getValue()) {
+ // try to delete bucket directories
+ tryDeleteDirectory(pathFactory.bucketPath(entry.getKey(),
bucket));
+ }
+ List<Path> hierarchicalPaths =
pathFactory.getHierarchicalPartitionPath(entry.getKey());
+ int pathNum = hierarchicalPaths.size();
+ if (pathNum > 0) {
+ // try to delete the deepest partition directory
+ tryDeleteDirectory(hierarchicalPaths.get(pathNum - 1));
+ // deduplicate high level partition directories
+ for (int hierarchy = 0; hierarchy < pathNum - 1; hierarchy++) {
+ deduplicate
+ .computeIfAbsent(hierarchy, i -> new HashSet<>())
+ .add(hierarchicalPaths.get(hierarchy));
+ }
+ }
+ }
+
+ // from deepest to shallowest
+ for (int hierarchy = deduplicate.size() - 1; hierarchy >= 0;
hierarchy--) {
+ deduplicate.get(hierarchy).forEach(this::tryDeleteDirectory);
+ }
+ }
+
+ private void tryDeleteDirectory(Path path) {
Review Comment:
`tryDeleteEmptyDirectory`?
##########
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java:
##########
@@ -327,4 +353,39 @@ private void writeEarliestHint(long earliest) {
throw new RuntimeException(e);
}
}
+
+ private void tryDeletePartitionDirectories(Map<BinaryRow, Set<Integer>>
changedPartitions) {
+ Map<Integer, Set<Path>> deduplicate = new HashMap<>();
+ for (Map.Entry<BinaryRow, Set<Integer>> entry :
changedPartitions.entrySet()) {
+ for (Integer bucket : entry.getValue()) {
+ // try to delete bucket directories
+ tryDeleteDirectory(pathFactory.bucketPath(entry.getKey(),
bucket));
+ }
+ List<Path> hierarchicalPaths =
pathFactory.getHierarchicalPartitionPath(entry.getKey());
+ int pathNum = hierarchicalPaths.size();
+ if (pathNum > 0) {
+ // try to delete the deepest partition directory
+ tryDeleteDirectory(hierarchicalPaths.get(pathNum - 1));
+ // deduplicate high level partition directories
+ for (int hierarchy = 0; hierarchy < pathNum - 1; hierarchy++) {
+ deduplicate
+ .computeIfAbsent(hierarchy, i -> new HashSet<>())
+ .add(hierarchicalPaths.get(hierarchy));
+ }
+ }
+ }
+
+ // from deepest to shallowest
+ for (int hierarchy = deduplicate.size() - 1; hierarchy >= 0;
hierarchy--) {
+ deduplicate.get(hierarchy).forEach(this::tryDeleteDirectory);
Review Comment:
Why there is no NPE?
`deduplicate.get(hierarchy)` may returns null?
##########
paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java:
##########
@@ -81,6 +83,36 @@ public static String
generatePartitionPath(LinkedHashMap<String, String> partiti
return suffixBuf.toString();
}
+ /**
+ * Generate all hierarchical paths from partition spec.
+ *
+ * <p>For example, if the partition spec is (pt1: '0601', pt2: '12', pt3:
'30'), this method
+ * will return a list (start from index 0):
+ *
+ * <ul>
+ * <li>[root]/pt1=0601
+ * <li>[root]/pt1=0601/pt2=12
+ * <li>[root]/pt1=0601/pt2=12/pt3=30
+ * </ul>
+ */
+ public static List<String> generateHierarchicalPartitionPaths(
+ LinkedHashMap<String, String> partitionSpec) {
+ List<String> paths = new ArrayList<>();
+ if (partitionSpec.isEmpty()) {
+ paths.add("");
Review Comment:
Why not just return an empty list?
Do you want to delete table path?
##########
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java:
##########
@@ -327,4 +353,39 @@ private void writeEarliestHint(long earliest) {
throw new RuntimeException(e);
}
}
+
+ private void tryDeletePartitionDirectories(Map<BinaryRow, Set<Integer>>
changedPartitions) {
+ Map<Integer, Set<Path>> deduplicate = new HashMap<>();
+ for (Map.Entry<BinaryRow, Set<Integer>> entry :
changedPartitions.entrySet()) {
+ for (Integer bucket : entry.getValue()) {
+ // try to delete bucket directories
+ tryDeleteDirectory(pathFactory.bucketPath(entry.getKey(),
bucket));
+ }
+ List<Path> hierarchicalPaths =
pathFactory.getHierarchicalPartitionPath(entry.getKey());
+ int pathNum = hierarchicalPaths.size();
+ if (pathNum > 0) {
+ // try to delete the deepest partition directory
+ tryDeleteDirectory(hierarchicalPaths.get(pathNum - 1));
+ // deduplicate high level partition directories
+ for (int hierarchy = 0; hierarchy < pathNum - 1; hierarchy++) {
+ deduplicate
+ .computeIfAbsent(hierarchy, i -> new HashSet<>())
+ .add(hierarchicalPaths.get(hierarchy));
+ }
+ }
+ }
+
+ // from deepest to shallowest
+ for (int hierarchy = deduplicate.size() - 1; hierarchy >= 0;
hierarchy--) {
+ deduplicate.get(hierarchy).forEach(this::tryDeleteDirectory);
+ }
+ }
+
+ private void tryDeleteDirectory(Path path) {
Review Comment:
add return value? So we can check there has been a deletion here. If none of
the deletions have occurred, we can skip checking the directory of the
partition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]