This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ebe09958 [core][bug] The empty data file directories are not deleted 
after expiring file store (#852)
1ebe09958 is described below

commit 1ebe099589c29d8b2e601f3d19e8e2cdedf77a2f
Author: yuzelin <[email protected]>
AuthorDate: Wed Apr 12 11:50:57 2023 +0800

    [core][bug] The empty data file directories are not deleted after expiring 
file store (#852)
---
 .../main/java/org/apache/paimon/utils/Triple.java  |  65 +++++
 .../paimon/operation/FileStoreCommitImpl.java      |   4 +-
 .../paimon/operation/FileStoreExpireImpl.java      |  80 +++++-
 .../apache/paimon/utils/FileStorePathFactory.java  |  13 +
 .../apache/paimon/utils/PartitionPathUtils.java    |  31 +++
 .../test/java/org/apache/paimon/TestFileStore.java |   6 +-
 .../org/apache/paimon/TestKeyValueGenerator.java   |  30 ++-
 .../operation/CleanedFileStoreExpireTest.java      |  44 ++++
 .../operation/FileStoreExpireDeleteDirTest.java    | 290 +++++++++++++++++++++
 .../paimon/operation/FileStoreTestUtils.java       | 125 +++++++++
 10 files changed, 676 insertions(+), 12 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Triple.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/Triple.java
new file mode 100644
index 000000000..8068c0624
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Triple.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Container that accommodates three fields. */
+public final class Triple<T0, T1, T2> implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public final T0 f0;
+    public final T1 f1;
+    public final T2 f2;
+
+    public static <T0, T1, T2> Triple<T0, T1, T2> of(T0 f0, T1 f1, T2 f2) {
+        return new Triple<>(f0, f1, f2);
+    }
+
+    private Triple(final T0 f0, final T1 f1, final T2 f2) {
+        this.f0 = f0;
+        this.f1 = f1;
+        this.f2 = f2;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Triple<?, ?, ?> triple = (Triple<?, ?, ?>) o;
+        return Objects.equals(f0, triple.f0)
+                && Objects.equals(f1, triple.f1)
+                && Objects.equals(f2, triple.f2);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(f0, f1, f2);
+    }
+
+    @Override
+    public String toString() {
+        return "(" + f0 + ',' + f1 + ',' + f2 + ')';
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index bc9b57d4c..1d307896d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.operation;
 
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
@@ -479,7 +480,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         }
     }
 
-    private boolean tryCommitOnce(
+    @VisibleForTesting
+    public boolean tryCommitOnce(
             List<ManifestEntry> tableFiles,
             List<ManifestEntry> changelogFiles,
             long identifier,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
index d4e1e1754..29c0e3356 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestEntry;
@@ -29,10 +30,12 @@ import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.Triple;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -161,12 +164,19 @@ public class FileStoreExpireImpl implements 
FileStoreExpire {
         // delete merge tree files
         // deleted merge tree files in a snapshot are not used by the next 
snapshot, so the range of
         // id should be (beginInclusiveId, endExclusiveId]
+        Map<BinaryRow, Set<Integer>> changedBuckets = new HashMap<>();
         for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Ready to delete merge tree files not used by 
snapshot #" + id);
             }
             Snapshot snapshot = snapshotManager.snapshot(id);
-            expireMergeTreeFiles(snapshot.deltaManifestList());
+            // expire merge tree files and collect changed buckets
+            expireMergeTreeFiles(snapshot.deltaManifestList())
+                    .forEach(
+                            (partition, buckets) ->
+                                    changedBuckets
+                                            .computeIfAbsent(partition, p -> 
new HashSet<>())
+                                            .addAll(buckets));
         }
 
         // delete changelog files
@@ -180,6 +190,10 @@ public class FileStoreExpireImpl implements 
FileStoreExpire {
             }
         }
 
+        // data files and changelog files in bucket directories has been 
deleted
+        // then delete changed bucket directories if they are empty
+        tryDeleteDirectories(changedBuckets);
+
         // delete manifests
         Snapshot exclusiveSnapshot = snapshotManager.snapshot(endExclusiveId);
         Set<ManifestFileMeta> manifestsInUse =
@@ -226,15 +240,17 @@ public class FileStoreExpireImpl implements 
FileStoreExpire {
         writeEarliestHint(endExclusiveId);
     }
 
-    private void expireMergeTreeFiles(String manifestListName) {
-        
expireMergeTreeFiles(getManifestEntriesFromManifestList(manifestListName));
+    // return a map of partition-buckets of which some data files have been 
deleted
+    private Map<BinaryRow, Set<Integer>> expireMergeTreeFiles(String 
manifestListName) {
+        return 
expireMergeTreeFiles(getManifestEntriesFromManifestList(manifestListName));
     }
 
     @VisibleForTesting
-    void expireMergeTreeFiles(Iterable<ManifestEntry> dataFileLog) {
+    Map<BinaryRow, Set<Integer>> expireMergeTreeFiles(Iterable<ManifestEntry> 
dataFileLog) {
         // we cannot delete a data file directly when we meet a DELETE entry, 
because that
         // file might be upgraded
-        Map<Path, List<Path>> dataFileToDelete = new HashMap<>();
+        // data file path -> (partition, bucket, extra file paths)
+        Map<Path, Triple<BinaryRow, Integer, List<Path>>> dataFileToDelete = 
new HashMap<>();
         for (ManifestEntry entry : dataFileLog) {
             Path bucketPath = pathFactory.bucketPath(entry.partition(), 
entry.bucket());
             Path dataFilePath = new Path(bucketPath, entry.file().fileName());
@@ -247,18 +263,26 @@ public class FileStoreExpireImpl implements 
FileStoreExpire {
                     for (String file : entry.file().extraFiles()) {
                         extraFiles.add(new Path(bucketPath, file));
                     }
-                    dataFileToDelete.put(dataFilePath, extraFiles);
+                    dataFileToDelete.put(
+                            dataFilePath, Triple.of(entry.partition(), 
entry.bucket(), extraFiles));
                     break;
                 default:
                     throw new UnsupportedOperationException(
                             "Unknown value kind " + entry.kind().name());
             }
         }
+
+        Map<BinaryRow, Set<Integer>> changedBuckets = new HashMap<>();
         dataFileToDelete.forEach(
-                (path, extraFiles) -> {
+                (path, triple) -> {
+                    // delete data files
                     fileIO.deleteQuietly(path);
-                    extraFiles.forEach(fileIO::deleteQuietly);
+                    triple.f2.forEach(fileIO::deleteQuietly);
+                    // record changed buckets
+                    changedBuckets.computeIfAbsent(triple.f0, p -> new 
HashSet<>()).add(triple.f1);
                 });
+
+        return changedBuckets;
     }
 
     private void expireChangelogFiles(String manifestListName) {
@@ -327,4 +351,44 @@ public class FileStoreExpireImpl implements 
FileStoreExpire {
             throw new RuntimeException(e);
         }
     }
+
+    private void tryDeleteDirectories(Map<BinaryRow, Set<Integer>> 
changedBuckets) {
+        // All directory paths are deduplicated and sorted by hierarchy level
+        Map<Integer, Set<Path>> deduplicate = new HashMap<>();
+        for (Map.Entry<BinaryRow, Set<Integer>> entry : 
changedBuckets.entrySet()) {
+            // try to delete bucket directories
+            for (Integer bucket : entry.getValue()) {
+                tryDeleteEmptyDirectory(pathFactory.bucketPath(entry.getKey(), 
bucket));
+            }
+
+            List<Path> hierarchicalPaths = 
pathFactory.getHierarchicalPartitionPath(entry.getKey());
+            int hierarchies = hierarchicalPaths.size();
+            if (hierarchies == 0) {
+                continue;
+            }
+
+            if (tryDeleteEmptyDirectory(hierarchicalPaths.get(hierarchies - 
1))) {
+                // deduplicate high level partition directories
+                for (int hierarchy = 0; hierarchy < hierarchies - 1; 
hierarchy++) {
+                    Path path = hierarchicalPaths.get(hierarchy);
+                    deduplicate.computeIfAbsent(hierarchy, i -> new 
HashSet<>()).add(path);
+                }
+            }
+        }
+
+        // from deepest to shallowest
+        for (int hierarchy = deduplicate.size() - 1; hierarchy >= 0; 
hierarchy--) {
+            deduplicate.get(hierarchy).forEach(this::tryDeleteEmptyDirectory);
+        }
+    }
+
+    private boolean tryDeleteEmptyDirectory(Path path) {
+        try {
+            fileIO.delete(path, false);
+            return true;
+        } catch (IOException e) {
+            LOG.debug("Failed to delete directory '{}'. Check whether it is 
empty.", path);
+            return false;
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 242dd8648..cf4220b8c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -28,8 +28,10 @@ import org.apache.paimon.types.RowType;
 
 import javax.annotation.concurrent.ThreadSafe;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.options.ConfigOptions.key;
 
@@ -124,6 +126,17 @@ public class FileStorePathFactory {
                                 partition, "Partition row data is null. This 
is unexpected.")));
     }
 
+    public List<Path> getHierarchicalPartitionPath(BinaryRow partition) {
+        return PartitionPathUtils.generateHierarchicalPartitionPaths(
+                        partitionComputer.generatePartValues(
+                                Preconditions.checkNotNull(
+                                        partition,
+                                        "Partition binary row is null. This is 
unexpected.")))
+                .stream()
+                .map(p -> new Path(root + "/" + p))
+                .collect(Collectors.toList());
+    }
+
     @VisibleForTesting
     public String uuid() {
         return uuid;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
index 113c15daa..30e359ffd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
@@ -20,8 +20,10 @@ package org.apache.paimon.utils;
 
 import org.apache.paimon.fs.Path;
 
+import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 /** Utils for file system. */
@@ -81,6 +83,35 @@ public class PartitionPathUtils {
         return suffixBuf.toString();
     }
 
+    /**
+     * Generate all hierarchical paths from partition spec.
+     *
+     * <p>For example, if the partition spec is (pt1: '0601', pt2: '12', pt3: 
'30'), this method
+     * will return a list (start from index 0):
+     *
+     * <ul>
+     *   <li>pt1=0601
+     *   <li>pt1=0601/pt2=12
+     *   <li>pt1=0601/pt2=12/pt3=30
+     * </ul>
+     */
+    public static List<String> generateHierarchicalPartitionPaths(
+            LinkedHashMap<String, String> partitionSpec) {
+        List<String> paths = new ArrayList<>();
+        if (partitionSpec.isEmpty()) {
+            return paths;
+        }
+        StringBuilder suffixBuf = new StringBuilder();
+        for (Map.Entry<String, String> e : partitionSpec.entrySet()) {
+            suffixBuf.append(escapePathName(e.getKey()));
+            suffixBuf.append('=');
+            suffixBuf.append(escapePathName(e.getValue()));
+            suffixBuf.append(Path.SEPARATOR);
+            paths.add(suffixBuf.toString());
+        }
+        return paths;
+    }
+
     /**
      * Escapes a path name.
      *
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 9c860928f..85ec2b221 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -117,6 +117,10 @@ public class TestFileStore extends KeyValueFileStore {
         this.commitIdentifier = 0L;
     }
 
+    public AbstractFileStoreWrite<KeyValue> newWrite() {
+        return super.newWrite(commitUser);
+    }
+
     public FileStoreCommitImpl newCommit() {
         return super.newCommit(commitUser);
     }
@@ -217,7 +221,7 @@ public class TestFileStore extends KeyValueFileStore {
             Long watermark,
             BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
             throws Exception {
-        AbstractFileStoreWrite<KeyValue> write = newWrite(commitUser);
+        AbstractFileStoreWrite<KeyValue> write = newWrite();
         Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new 
HashMap<>();
         for (KeyValue kv : kvs) {
             BinaryRow partition = partitionCalculator.apply(kv);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java 
b/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java
index eb5590679..3030ff980 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java
@@ -214,6 +214,32 @@ public class TestKeyValueGenerator {
                         rowSerializer.toBinaryRow(convertToRow(order)).copy());
     }
 
+    // used for FileStoreExpireDeleteDirTest to generate data in specified 
partition
+    public KeyValue nextPartitionedData(RowKind kind, Object... partitionSpec) 
{
+        Order order = new Order();
+        switch (mode) {
+            case MULTI_PARTITIONED:
+                assert partitionSpec.length == 2;
+                order.dt = (String) partitionSpec[0];
+                order.hr = (int) partitionSpec[1];
+                break;
+            case SINGLE_PARTITIONED:
+                assert partitionSpec.length == 1;
+                order.dt = (String) partitionSpec[0];
+                break;
+            default:
+                // do nothing
+        }
+        return new KeyValue()
+                .replace(
+                        KEY_SERIALIZER
+                                .toBinaryRow(GenericRow.of(order.shopId, 
order.orderId))
+                                .copy(),
+                        sequenceNumber++,
+                        kind,
+                        rowSerializer.toBinaryRow(convertToRow(order)).copy());
+    }
+
     private InternalRow convertToRow(Order order) {
         List<Object> values =
                 new ArrayList<>(
@@ -307,8 +333,8 @@ public class TestKeyValueGenerator {
     }
 
     private class Order {
-        private final String dt;
-        private final int hr;
+        private String dt;
+        private int hr;
         private final int shopId;
         private final long orderId;
         @Nullable private Long itemId;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
index c99868cb2..ef3342771 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.junit.jupiter.api.AfterEach;
@@ -34,7 +35,9 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
@@ -212,4 +215,45 @@ public class CleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
             }
         }
     }
+
+    @Test
+    public void testExpireWithUpgradedFile() throws Exception {
+        // write & commit data
+        List<KeyValue> data = FileStoreTestUtils.partitionedData(5, gen, 
"0401", 8);
+        BinaryRow partition = gen.getPartition(data.get(0));
+        RecordWriter<KeyValue> writer = FileStoreTestUtils.writeData(store, 
data, partition, 0);
+        Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers =
+                Collections.singletonMap(partition, 
Collections.singletonMap(0, writer));
+        FileStoreTestUtils.commitData(store, 0, writers);
+
+        // check
+        List<ManifestEntry> entries = store.newScan().plan().files();
+        assertThat(entries.size()).isEqualTo(1);
+        ManifestEntry entry = entries.get(0);
+        assertThat(entry.file().level()).isEqualTo(0);
+        Path dataFilePath1 =
+                new Path(store.pathFactory().bucketPath(partition, 0), 
entry.file().fileName());
+        FileStoreTestUtils.assertPathExists(fileIO, dataFilePath1);
+
+        // compact & commit
+        writer.compact(true);
+        writer.sync();
+        FileStoreTestUtils.commitData(store, 1, writers);
+
+        // check
+        entries = store.newScan().plan().files(FileKind.ADD);
+        assertThat(entries.size()).isEqualTo(1);
+        entry = entries.get(0);
+        // data file has been upgraded due to compact
+        assertThat(entry.file().level()).isEqualTo(5);
+        Path dataFilePath2 =
+                new Path(store.pathFactory().bucketPath(partition, 0), 
entry.file().fileName());
+        assertThat(dataFilePath1).isEqualTo(dataFilePath2);
+        FileStoreTestUtils.assertPathExists(fileIO, dataFilePath2);
+
+        // the data file still exists after expire
+        FileStoreExpire expire = store.newExpire(1, 1, Long.MAX_VALUE);
+        expire.expire();
+        FileStoreTestUtils.assertPathExists(fileIO, dataFilePath2);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireDeleteDirTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireDeleteDirTest.java
new file mode 100644
index 000000000..f7c6bd223
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireDeleteDirTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.TestFileStore;
+import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.RecordWriter;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.operation.FileStoreTestUtils.assertPathExists;
+import static 
org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists;
+import static org.apache.paimon.operation.FileStoreTestUtils.commitData;
+import static org.apache.paimon.operation.FileStoreTestUtils.partitionedData;
+
+/**
+ * Tests for {@link FileStoreExpireImpl}. After expiration, empty data file 
directories (buckets and
+ * partitions) are deleted. It didn't extend {@link FileStoreExpireTestBase} 
because there are not
+ * too many codes can be reused.
+ */
+public class FileStoreExpireDeleteDirTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private final FileIO fileIO = new LocalFileIO();
+
+    private long commitIdentifier;
+    private String root;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        commitIdentifier = 0L;
+        root = tempDir.toString();
+    }
+
+    /**
+     * This test checks FileStoreExpire can delete empty partition directories 
in multiple partition
+     * situation. The partition keys are (dt, hr). Test process:
+     *
+     * <ul>
+     *   <li>1. Generate snapshot 1 with (0401, 8/12), (0402, 8/12). Each 
partition has two buckets.
+     *   <li>2. Generate snapshot 2 by deleting all data of partition dt=0401 
(thus directory
+     *       dt=0401 will be deleted after expiring).
+     *   <li>3. Generate snapshot 3 by deleting all data of partition 
dt=0402/hr=8 (thus directory
+     *       dt=0402/hr=8 will be deleted after expiring).
+     *   <li>4. Generate snapshot 4 by deleting all data of partition 
dt=0402/hr=12/bucket-0 (thus
+     *       directory dt=0402/hr=12/bucket-0 will be deleted after expiring).
+     *   <li>5. Expire snapshot 1-3 (dt=0402/hr=20/bucket-1 survives) and 
check.
+     * </ul>
+     */
+    @Test
+    public void testMultiPartitions() throws Exception {
+        TestFileStore store = 
createStore(TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED);
+        TestKeyValueGenerator gen =
+                new 
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED);
+        FileStorePathFactory pathFactory = store.pathFactory();
+
+        // step 1: generate snapshot 1 by writing 5 randomly generated records 
to each bucket
+        // writers for each bucket
+        Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new 
HashMap<>();
+
+        List<BinaryRow> partitions = new ArrayList<>();
+        for (String dt : Arrays.asList("0401", "0402")) {
+            for (int hr : Arrays.asList(8, 12)) {
+                for (int bucket : Arrays.asList(0, 1)) {
+                    List<KeyValue> kvs = partitionedData(5, gen, dt, hr);
+                    BinaryRow partition = gen.getPartition(kvs.get(0));
+                    partitions.add(partition);
+                    writeData(store, kvs, partition, bucket, writers);
+                }
+            }
+        }
+
+        commitData(store, commitIdentifier++, writers);
+        // check all paths exist
+        for (BinaryRow partition : partitions) {
+            for (int bucket : Arrays.asList(0, 1)) {
+                assertPathExists(fileIO, pathFactory.bucketPath(partition, 
bucket));
+            }
+        }
+
+        // step 2: generate snapshot 2 by cleaning partition dt=0401 (through 
overwriting with an
+        // empty ManifestCommittable)
+        FileStoreCommitImpl commit = store.newCommit();
+        Map<String, String> partitionSpec = new HashMap<>();
+        partitionSpec.put("dt", "0401");
+        commit.overwrite(
+                partitionSpec, new ManifestCommittable(commitIdentifier++), 
Collections.emptyMap());
+
+        // step 3: generate snapshot 3 by cleaning partition dt=0402/hr=10
+        partitionSpec.put("dt", "0402");
+        partitionSpec.put("hr", "8");
+        commit.overwrite(
+                partitionSpec, new ManifestCommittable(commitIdentifier++), 
Collections.emptyMap());
+
+        // step 4: generate snapshot 4 by cleaning dt=0402/hr=12/bucket-0
+        // manually make delete ManifestEntry
+        BinaryRow partition = partitions.get(7);
+        Predicate partitionFilter =
+                PredicateBuilder.equalPartition(partition, 
TestKeyValueGenerator.DEFAULT_PART_TYPE);
+        List<ManifestEntry> bucketEntries =
+                store.newScan()
+                        .withSnapshot(3)
+                        .withPartitionFilter(partitionFilter)
+                        .withBucket(0)
+                        .plan()
+                        .files();
+        List<ManifestEntry> delete =
+                bucketEntries.stream()
+                        .map(
+                                entry ->
+                                        new ManifestEntry(
+                                                FileKind.DELETE, partition, 0, 
2, entry.file()))
+                        .collect(Collectors.toList());
+        // commit
+        commit.tryCommitOnce(
+                delete,
+                Collections.emptyList(),
+                commitIdentifier++,
+                null,
+                Collections.emptyMap(),
+                Snapshot.CommitKind.APPEND,
+                3L,
+                null);
+
+        // step 5: expire and check file paths
+        store.newExpire(1, 1, Long.MAX_VALUE).expire();
+        // whole dt=0401 is deleted
+        assertPathNotExists(fileIO, new Path(root, "dt=0401"));
+        // whole dt=0402/hr=8 is deleted
+        assertPathNotExists(fileIO, new Path(root, "dt=0402/hr=8"));
+        // for dt=0402/hr=12, bucket-0 is delete but bucket-1 survives
+        assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 0));
+        assertPathExists(fileIO, pathFactory.bucketPath(partition, 1));
+    }
+
+    // only exists bucket directories
+    @Test
+    public void testNoPartitions() throws Exception {
+        TestFileStore store = 
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+        TestKeyValueGenerator gen =
+                new 
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+        FileStorePathFactory pathFactory = store.pathFactory();
+
+        Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new 
HashMap<>();
+        for (int bucket : Arrays.asList(0, 1)) {
+            List<KeyValue> kvs = partitionedData(5, gen);
+            BinaryRow partition = gen.getPartition(kvs.get(0));
+            writeData(store, kvs, partition, bucket, writers);
+        }
+        commitData(store, commitIdentifier++, writers);
+
+        // cleaning bucket 0
+        List<ManifestEntry> bucketEntries =
+                store.newScan().withSnapshot(1).withBucket(0).plan().files();
+        BinaryRow partition = gen.getPartition(gen.next());
+        List<ManifestEntry> delete =
+                bucketEntries.stream()
+                        .map(
+                                entry ->
+                                        new ManifestEntry(
+                                                FileKind.DELETE, partition, 0, 
2, entry.file()))
+                        .collect(Collectors.toList());
+        // commit
+        store.newCommit()
+                .tryCommitOnce(
+                        delete,
+                        Collections.emptyList(),
+                        commitIdentifier++,
+                        null,
+                        Collections.emptyMap(),
+                        Snapshot.CommitKind.APPEND,
+                        1L,
+                        null);
+
+        // check before expiring
+        assertPathExists(fileIO, pathFactory.bucketPath(partition, 0));
+        assertPathExists(fileIO, pathFactory.bucketPath(partition, 1));
+
+        // check after expiring
+        store.newExpire(1, 1, Long.MAX_VALUE).expire();
+        assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 0));
+        assertPathExists(fileIO, pathFactory.bucketPath(partition, 1));
+    }
+
+    private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode 
mode) throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        CoreOptions.ChangelogProducer changelogProducer;
+        if (random.nextBoolean()) {
+            changelogProducer = CoreOptions.ChangelogProducer.INPUT;
+        } else {
+            changelogProducer = CoreOptions.ChangelogProducer.NONE;
+        }
+
+        RowType rowType, partitionType;
+        switch (mode) {
+            case NON_PARTITIONED:
+                rowType = TestKeyValueGenerator.NON_PARTITIONED_ROW_TYPE;
+                partitionType = 
TestKeyValueGenerator.NON_PARTITIONED_PART_TYPE;
+                break;
+            case SINGLE_PARTITIONED:
+                rowType = TestKeyValueGenerator.SINGLE_PARTITIONED_ROW_TYPE;
+                partitionType = 
TestKeyValueGenerator.SINGLE_PARTITIONED_PART_TYPE;
+                break;
+            case MULTI_PARTITIONED:
+                rowType = TestKeyValueGenerator.DEFAULT_ROW_TYPE;
+                partitionType = TestKeyValueGenerator.DEFAULT_PART_TYPE;
+                break;
+            default:
+                throw new UnsupportedOperationException("Unsupported generator 
mode: " + mode);
+        }
+
+        SchemaManager schemaManager = new SchemaManager(fileIO, new 
Path(root));
+        schemaManager.createTable(
+                new Schema(
+                        rowType.getFields(),
+                        partitionType.getFieldNames(),
+                        TestKeyValueGenerator.getPrimaryKeys(mode),
+                        Collections.emptyMap(),
+                        null));
+
+        return new TestFileStore.Builder(
+                        "avro",
+                        root,
+                        2,
+                        partitionType,
+                        TestKeyValueGenerator.KEY_TYPE,
+                        rowType,
+                        
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+                        DeduplicateMergeFunction.factory())
+                .changelogProducer(changelogProducer)
+                .build();
+    }
+
+    private void writeData(
+            TestFileStore store,
+            List<KeyValue> kvs,
+            BinaryRow partition,
+            int bucket,
+            Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers)
+            throws Exception {
+        writers.computeIfAbsent(partition, p -> new HashMap<>())
+                .put(bucket, FileStoreTestUtils.writeData(store, kvs, 
partition, bucket));
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
new file mode 100644
index 000000000..e4743f770
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.TestFileStore;
+import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemoryOwner;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.RecordWriter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Utils for {@link FileStore} of {@link KeyValue}, such as writing and 
committing. */
+public class FileStoreTestUtils {
+
+    // generate partitioned data
+    public static List<KeyValue> partitionedData(
+            int num, TestKeyValueGenerator gen, Object... partitionSpec) {
+        List<KeyValue> keyValues = new ArrayList<>();
+        for (int i = 0; i < num; i++) {
+            keyValues.add(gen.nextPartitionedData(RowKind.INSERT, 
partitionSpec));
+        }
+        return keyValues;
+    }
+
+    public static void assertPathExists(FileIO fileIO, Path path) throws 
IOException {
+        assertThat(fileIO.exists(path)).isTrue();
+    }
+
+    public static void assertPathNotExists(FileIO fileIO, Path path) throws 
IOException {
+        assertThat(fileIO.exists(path)).isFalse();
+    }
+
+    // 
--------------------------------------------------------------------------------
+    // writeData & commitData are copied from TestFileStore#commitDataImpl and 
modified
+    // 
--------------------------------------------------------------------------------
+
+    // create a RecordWriter and write data
+    public static RecordWriter<KeyValue> writeData(
+            TestFileStore store, List<KeyValue> keyValues, BinaryRow 
partition, int bucket)
+            throws Exception {
+        AbstractFileStoreWrite<KeyValue> write = store.newWrite();
+        RecordWriter<KeyValue> writer =
+                write.createWriterContainer(partition, bucket, false).writer;
+        ((MemoryOwner) writer)
+                .setMemoryPool(
+                        new HeapMemorySegmentPool(
+                                TestFileStore.WRITE_BUFFER_SIZE.getBytes(),
+                                (int) TestFileStore.PAGE_SIZE.getBytes()));
+        for (KeyValue kv : keyValues) {
+            writer.write(kv);
+        }
+        return writer;
+    }
+
+    // commit data in writers
+    public static void commitData(
+            TestFileStore store,
+            long commitIdentifier,
+            Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers)
+            throws Exception {
+        FileStoreCommit commit = store.newCommit();
+        ManifestCommittable committable = new 
ManifestCommittable(commitIdentifier, null);
+        for (Map.Entry<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> 
entryWithPartition :
+                writers.entrySet()) {
+            for (Map.Entry<Integer, RecordWriter<KeyValue>> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                CommitIncrement increment = 
entryWithBucket.getValue().prepareCommit(false);
+                committable.addFileCommittable(
+                        new CommitMessageImpl(
+                                entryWithPartition.getKey(),
+                                entryWithBucket.getKey(),
+                                increment.newFilesIncrement(),
+                                increment.compactIncrement()));
+            }
+        }
+
+        commit.commit(committable, Collections.emptyMap());
+
+        writers.values().stream()
+                .flatMap(m -> m.values().stream())
+                .forEach(
+                        w -> {
+                            try {
+                                // wait for compaction to end, otherwise 
orphan files may occur
+                                // see CompactManager#cancelCompaction for 
more info
+                                w.sync();
+                                w.close();
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+    }
+}


Reply via email to