This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1ebe09958 [core][bug] The empty data file directories are not deleted
after expiring file store (#852)
1ebe09958 is described below
commit 1ebe099589c29d8b2e601f3d19e8e2cdedf77a2f
Author: yuzelin <[email protected]>
AuthorDate: Wed Apr 12 11:50:57 2023 +0800
[core][bug] The empty data file directories are not deleted after expiring
file store (#852)
---
.../main/java/org/apache/paimon/utils/Triple.java | 65 +++++
.../paimon/operation/FileStoreCommitImpl.java | 4 +-
.../paimon/operation/FileStoreExpireImpl.java | 80 +++++-
.../apache/paimon/utils/FileStorePathFactory.java | 13 +
.../apache/paimon/utils/PartitionPathUtils.java | 31 +++
.../test/java/org/apache/paimon/TestFileStore.java | 6 +-
.../org/apache/paimon/TestKeyValueGenerator.java | 30 ++-
.../operation/CleanedFileStoreExpireTest.java | 44 ++++
.../operation/FileStoreExpireDeleteDirTest.java | 290 +++++++++++++++++++++
.../paimon/operation/FileStoreTestUtils.java | 125 +++++++++
10 files changed, 676 insertions(+), 12 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Triple.java
b/paimon-common/src/main/java/org/apache/paimon/utils/Triple.java
new file mode 100644
index 000000000..8068c0624
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Triple.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Container that accommodates three fields. */
+public final class Triple<T0, T1, T2> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public final T0 f0;
+ public final T1 f1;
+ public final T2 f2;
+
+ public static <T0, T1, T2> Triple<T0, T1, T2> of(T0 f0, T1 f1, T2 f2) {
+ return new Triple<>(f0, f1, f2);
+ }
+
+ private Triple(final T0 f0, final T1 f1, final T2 f2) {
+ this.f0 = f0;
+ this.f1 = f1;
+ this.f2 = f2;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Triple<?, ?, ?> triple = (Triple<?, ?, ?>) o;
+ return Objects.equals(f0, triple.f0)
+ && Objects.equals(f1, triple.f1)
+ && Objects.equals(f2, triple.f2);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(f0, f1, f2);
+ }
+
+ @Override
+ public String toString() {
+ return "(" + f0 + ',' + f1 + ',' + f2 + ')';
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index bc9b57d4c..1d307896d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -19,6 +19,7 @@
package org.apache.paimon.operation;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
@@ -479,7 +480,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
}
- private boolean tryCommitOnce(
+ @VisibleForTesting
+ public boolean tryCommitOnce(
List<ManifestEntry> tableFiles,
List<ManifestEntry> changelogFiles,
long identifier,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
index d4e1e1754..29c0e3356 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestEntry;
@@ -29,10 +30,12 @@ import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -161,12 +164,19 @@ public class FileStoreExpireImpl implements
FileStoreExpire {
// delete merge tree files
// deleted merge tree files in a snapshot are not used by the next
snapshot, so the range of
// id should be (beginInclusiveId, endExclusiveId]
+ Map<BinaryRow, Set<Integer>> changedBuckets = new HashMap<>();
for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete merge tree files not used by
snapshot #" + id);
}
Snapshot snapshot = snapshotManager.snapshot(id);
- expireMergeTreeFiles(snapshot.deltaManifestList());
+ // expire merge tree files and collect changed buckets
+ expireMergeTreeFiles(snapshot.deltaManifestList())
+ .forEach(
+ (partition, buckets) ->
+ changedBuckets
+ .computeIfAbsent(partition, p ->
new HashSet<>())
+ .addAll(buckets));
}
// delete changelog files
@@ -180,6 +190,10 @@ public class FileStoreExpireImpl implements
FileStoreExpire {
}
}
+ // data files and changelog files in bucket directories has been
deleted
+ // then delete changed bucket directories if they are empty
+ tryDeleteDirectories(changedBuckets);
+
// delete manifests
Snapshot exclusiveSnapshot = snapshotManager.snapshot(endExclusiveId);
Set<ManifestFileMeta> manifestsInUse =
@@ -226,15 +240,17 @@ public class FileStoreExpireImpl implements
FileStoreExpire {
writeEarliestHint(endExclusiveId);
}
- private void expireMergeTreeFiles(String manifestListName) {
-
expireMergeTreeFiles(getManifestEntriesFromManifestList(manifestListName));
+ // return a map of partition-buckets of which some data files have been
deleted
+ private Map<BinaryRow, Set<Integer>> expireMergeTreeFiles(String
manifestListName) {
+ return
expireMergeTreeFiles(getManifestEntriesFromManifestList(manifestListName));
}
@VisibleForTesting
- void expireMergeTreeFiles(Iterable<ManifestEntry> dataFileLog) {
+ Map<BinaryRow, Set<Integer>> expireMergeTreeFiles(Iterable<ManifestEntry>
dataFileLog) {
// we cannot delete a data file directly when we meet a DELETE entry,
because that
// file might be upgraded
- Map<Path, List<Path>> dataFileToDelete = new HashMap<>();
+ // data file path -> (partition, bucket, extra file paths)
+ Map<Path, Triple<BinaryRow, Integer, List<Path>>> dataFileToDelete =
new HashMap<>();
for (ManifestEntry entry : dataFileLog) {
Path bucketPath = pathFactory.bucketPath(entry.partition(),
entry.bucket());
Path dataFilePath = new Path(bucketPath, entry.file().fileName());
@@ -247,18 +263,26 @@ public class FileStoreExpireImpl implements
FileStoreExpire {
for (String file : entry.file().extraFiles()) {
extraFiles.add(new Path(bucketPath, file));
}
- dataFileToDelete.put(dataFilePath, extraFiles);
+ dataFileToDelete.put(
+ dataFilePath, Triple.of(entry.partition(),
entry.bucket(), extraFiles));
break;
default:
throw new UnsupportedOperationException(
"Unknown value kind " + entry.kind().name());
}
}
+
+ Map<BinaryRow, Set<Integer>> changedBuckets = new HashMap<>();
dataFileToDelete.forEach(
- (path, extraFiles) -> {
+ (path, triple) -> {
+ // delete data files
fileIO.deleteQuietly(path);
- extraFiles.forEach(fileIO::deleteQuietly);
+ triple.f2.forEach(fileIO::deleteQuietly);
+ // record changed buckets
+ changedBuckets.computeIfAbsent(triple.f0, p -> new
HashSet<>()).add(triple.f1);
});
+
+ return changedBuckets;
}
private void expireChangelogFiles(String manifestListName) {
@@ -327,4 +351,44 @@ public class FileStoreExpireImpl implements
FileStoreExpire {
throw new RuntimeException(e);
}
}
+
+ private void tryDeleteDirectories(Map<BinaryRow, Set<Integer>>
changedBuckets) {
+ // All directory paths are deduplicated and sorted by hierarchy level
+ Map<Integer, Set<Path>> deduplicate = new HashMap<>();
+ for (Map.Entry<BinaryRow, Set<Integer>> entry :
changedBuckets.entrySet()) {
+ // try to delete bucket directories
+ for (Integer bucket : entry.getValue()) {
+ tryDeleteEmptyDirectory(pathFactory.bucketPath(entry.getKey(),
bucket));
+ }
+
+ List<Path> hierarchicalPaths =
pathFactory.getHierarchicalPartitionPath(entry.getKey());
+ int hierarchies = hierarchicalPaths.size();
+ if (hierarchies == 0) {
+ continue;
+ }
+
+ if (tryDeleteEmptyDirectory(hierarchicalPaths.get(hierarchies -
1))) {
+ // deduplicate high level partition directories
+ for (int hierarchy = 0; hierarchy < hierarchies - 1;
hierarchy++) {
+ Path path = hierarchicalPaths.get(hierarchy);
+ deduplicate.computeIfAbsent(hierarchy, i -> new
HashSet<>()).add(path);
+ }
+ }
+ }
+
+ // from deepest to shallowest
+ for (int hierarchy = deduplicate.size() - 1; hierarchy >= 0;
hierarchy--) {
+ deduplicate.get(hierarchy).forEach(this::tryDeleteEmptyDirectory);
+ }
+ }
+
+ private boolean tryDeleteEmptyDirectory(Path path) {
+ try {
+ fileIO.delete(path, false);
+ return true;
+ } catch (IOException e) {
+ LOG.debug("Failed to delete directory '{}'. Check whether it is
empty.", path);
+ return false;
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 242dd8648..cf4220b8c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -28,8 +28,10 @@ import org.apache.paimon.types.RowType;
import javax.annotation.concurrent.ThreadSafe;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import static org.apache.paimon.options.ConfigOptions.key;
@@ -124,6 +126,17 @@ public class FileStorePathFactory {
partition, "Partition row data is null. This
is unexpected.")));
}
+ public List<Path> getHierarchicalPartitionPath(BinaryRow partition) {
+ return PartitionPathUtils.generateHierarchicalPartitionPaths(
+ partitionComputer.generatePartValues(
+ Preconditions.checkNotNull(
+ partition,
+ "Partition binary row is null. This is
unexpected.")))
+ .stream()
+ .map(p -> new Path(root + "/" + p))
+ .collect(Collectors.toList());
+ }
+
@VisibleForTesting
public String uuid() {
return uuid;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
index 113c15daa..30e359ffd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
@@ -20,8 +20,10 @@ package org.apache.paimon.utils;
import org.apache.paimon.fs.Path;
+import java.util.ArrayList;
import java.util.BitSet;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
/** Utils for file system. */
@@ -81,6 +83,35 @@ public class PartitionPathUtils {
return suffixBuf.toString();
}
+ /**
+ * Generate all hierarchical paths from partition spec.
+ *
+ * <p>For example, if the partition spec is (pt1: '0601', pt2: '12', pt3:
'30'), this method
+ * will return a list (start from index 0):
+ *
+ * <ul>
+ * <li>pt1=0601
+ * <li>pt1=0601/pt2=12
+ * <li>pt1=0601/pt2=12/pt3=30
+ * </ul>
+ */
+ public static List<String> generateHierarchicalPartitionPaths(
+ LinkedHashMap<String, String> partitionSpec) {
+ List<String> paths = new ArrayList<>();
+ if (partitionSpec.isEmpty()) {
+ return paths;
+ }
+ StringBuilder suffixBuf = new StringBuilder();
+ for (Map.Entry<String, String> e : partitionSpec.entrySet()) {
+ suffixBuf.append(escapePathName(e.getKey()));
+ suffixBuf.append('=');
+ suffixBuf.append(escapePathName(e.getValue()));
+ suffixBuf.append(Path.SEPARATOR);
+ paths.add(suffixBuf.toString());
+ }
+ return paths;
+ }
+
/**
* Escapes a path name.
*
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 9c860928f..85ec2b221 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -117,6 +117,10 @@ public class TestFileStore extends KeyValueFileStore {
this.commitIdentifier = 0L;
}
+ public AbstractFileStoreWrite<KeyValue> newWrite() {
+ return super.newWrite(commitUser);
+ }
+
public FileStoreCommitImpl newCommit() {
return super.newCommit(commitUser);
}
@@ -217,7 +221,7 @@ public class TestFileStore extends KeyValueFileStore {
Long watermark,
BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
throws Exception {
- AbstractFileStoreWrite<KeyValue> write = newWrite(commitUser);
+ AbstractFileStoreWrite<KeyValue> write = newWrite();
Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new
HashMap<>();
for (KeyValue kv : kvs) {
BinaryRow partition = partitionCalculator.apply(kv);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java
b/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java
index eb5590679..3030ff980 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java
@@ -214,6 +214,32 @@ public class TestKeyValueGenerator {
rowSerializer.toBinaryRow(convertToRow(order)).copy());
}
+ // used for FileStoreExpireDeleteDirTest to generate data in specified
partition
+ public KeyValue nextPartitionedData(RowKind kind, Object... partitionSpec)
{
+ Order order = new Order();
+ switch (mode) {
+ case MULTI_PARTITIONED:
+ assert partitionSpec.length == 2;
+ order.dt = (String) partitionSpec[0];
+ order.hr = (int) partitionSpec[1];
+ break;
+ case SINGLE_PARTITIONED:
+ assert partitionSpec.length == 1;
+ order.dt = (String) partitionSpec[0];
+ break;
+ default:
+ // do nothing
+ }
+ return new KeyValue()
+ .replace(
+ KEY_SERIALIZER
+ .toBinaryRow(GenericRow.of(order.shopId,
order.orderId))
+ .copy(),
+ sequenceNumber++,
+ kind,
+ rowSerializer.toBinaryRow(convertToRow(order)).copy());
+ }
+
private InternalRow convertToRow(Order order) {
List<Object> values =
new ArrayList<>(
@@ -307,8 +333,8 @@ public class TestKeyValueGenerator {
}
private class Order {
- private final String dt;
- private final int hr;
+ private String dt;
+ private int hr;
private final int shopId;
private final long orderId;
@Nullable private Long itemId;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
index c99868cb2..ef3342771 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
import org.junit.jupiter.api.AfterEach;
@@ -34,7 +35,9 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
@@ -212,4 +215,45 @@ public class CleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
}
}
}
+
+ @Test
+ public void testExpireWithUpgradedFile() throws Exception {
+ // write & commit data
+ List<KeyValue> data = FileStoreTestUtils.partitionedData(5, gen,
"0401", 8);
+ BinaryRow partition = gen.getPartition(data.get(0));
+ RecordWriter<KeyValue> writer = FileStoreTestUtils.writeData(store,
data, partition, 0);
+ Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers =
+ Collections.singletonMap(partition,
Collections.singletonMap(0, writer));
+ FileStoreTestUtils.commitData(store, 0, writers);
+
+ // check
+ List<ManifestEntry> entries = store.newScan().plan().files();
+ assertThat(entries.size()).isEqualTo(1);
+ ManifestEntry entry = entries.get(0);
+ assertThat(entry.file().level()).isEqualTo(0);
+ Path dataFilePath1 =
+ new Path(store.pathFactory().bucketPath(partition, 0),
entry.file().fileName());
+ FileStoreTestUtils.assertPathExists(fileIO, dataFilePath1);
+
+ // compact & commit
+ writer.compact(true);
+ writer.sync();
+ FileStoreTestUtils.commitData(store, 1, writers);
+
+ // check
+ entries = store.newScan().plan().files(FileKind.ADD);
+ assertThat(entries.size()).isEqualTo(1);
+ entry = entries.get(0);
+ // data file has been upgraded due to compact
+ assertThat(entry.file().level()).isEqualTo(5);
+ Path dataFilePath2 =
+ new Path(store.pathFactory().bucketPath(partition, 0),
entry.file().fileName());
+ assertThat(dataFilePath1).isEqualTo(dataFilePath2);
+ FileStoreTestUtils.assertPathExists(fileIO, dataFilePath2);
+
+ // the data file still exists after expire
+ FileStoreExpire expire = store.newExpire(1, 1, Long.MAX_VALUE);
+ expire.expire();
+ FileStoreTestUtils.assertPathExists(fileIO, dataFilePath2);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireDeleteDirTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireDeleteDirTest.java
new file mode 100644
index 000000000..f7c6bd223
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireDeleteDirTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.TestFileStore;
+import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.RecordWriter;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.operation.FileStoreTestUtils.assertPathExists;
+import static
org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists;
+import static org.apache.paimon.operation.FileStoreTestUtils.commitData;
+import static org.apache.paimon.operation.FileStoreTestUtils.partitionedData;
+
+/**
+ * Tests for {@link FileStoreExpireImpl}. After expiration, empty data file
directories (buckets and
+ * partitions) are deleted. It didn't extend {@link FileStoreExpireTestBase}
because there are not
+ * too many codes can be reused.
+ */
+public class FileStoreExpireDeleteDirTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private final FileIO fileIO = new LocalFileIO();
+
+ private long commitIdentifier;
+ private String root;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ commitIdentifier = 0L;
+ root = tempDir.toString();
+ }
+
+ /**
+ * This test checks FileStoreExpire can delete empty partition directories
in multiple partition
+ * situation. The partition keys are (dt, hr). Test process:
+ *
+ * <ul>
+ * <li>1. Generate snapshot 1 with (0401, 8/12), (0402, 8/12). Each
partition has two buckets.
+ * <li>2. Generate snapshot 2 by deleting all data of partition dt=0401
(thus directory
+ * dt=0401 will be deleted after expiring).
+ * <li>3. Generate snapshot 3 by deleting all data of partition
dt=0402/hr=8 (thus directory
+ * dt=0402/hr=8 will be deleted after expiring).
+ * <li>4. Generate snapshot 4 by deleting all data of partition
dt=0402/hr=12/bucket-0 (thus
+ * directory dt=0402/hr=12/bucket-0 will be deleted after expiring).
+ * <li>5. Expire snapshot 1-3 (dt=0402/hr=20/bucket-1 survives) and
check.
+ * </ul>
+ */
+ @Test
+ public void testMultiPartitions() throws Exception {
+ TestFileStore store =
createStore(TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED);
+ TestKeyValueGenerator gen =
+ new
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED);
+ FileStorePathFactory pathFactory = store.pathFactory();
+
+ // step 1: generate snapshot 1 by writing 5 randomly generated records
to each bucket
+ // writers for each bucket
+ Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new
HashMap<>();
+
+ List<BinaryRow> partitions = new ArrayList<>();
+ for (String dt : Arrays.asList("0401", "0402")) {
+ for (int hr : Arrays.asList(8, 12)) {
+ for (int bucket : Arrays.asList(0, 1)) {
+ List<KeyValue> kvs = partitionedData(5, gen, dt, hr);
+ BinaryRow partition = gen.getPartition(kvs.get(0));
+ partitions.add(partition);
+ writeData(store, kvs, partition, bucket, writers);
+ }
+ }
+ }
+
+ commitData(store, commitIdentifier++, writers);
+ // check all paths exist
+ for (BinaryRow partition : partitions) {
+ for (int bucket : Arrays.asList(0, 1)) {
+ assertPathExists(fileIO, pathFactory.bucketPath(partition,
bucket));
+ }
+ }
+
+ // step 2: generate snapshot 2 by cleaning partition dt=0401 (through
overwriting with an
+ // empty ManifestCommittable)
+ FileStoreCommitImpl commit = store.newCommit();
+ Map<String, String> partitionSpec = new HashMap<>();
+ partitionSpec.put("dt", "0401");
+ commit.overwrite(
+ partitionSpec, new ManifestCommittable(commitIdentifier++),
Collections.emptyMap());
+
+ // step 3: generate snapshot 3 by cleaning partition dt=0402/hr=10
+ partitionSpec.put("dt", "0402");
+ partitionSpec.put("hr", "8");
+ commit.overwrite(
+ partitionSpec, new ManifestCommittable(commitIdentifier++),
Collections.emptyMap());
+
+ // step 4: generate snapshot 4 by cleaning dt=0402/hr=12/bucket-0
+ // manually make delete ManifestEntry
+ BinaryRow partition = partitions.get(7);
+ Predicate partitionFilter =
+ PredicateBuilder.equalPartition(partition,
TestKeyValueGenerator.DEFAULT_PART_TYPE);
+ List<ManifestEntry> bucketEntries =
+ store.newScan()
+ .withSnapshot(3)
+ .withPartitionFilter(partitionFilter)
+ .withBucket(0)
+ .plan()
+ .files();
+ List<ManifestEntry> delete =
+ bucketEntries.stream()
+ .map(
+ entry ->
+ new ManifestEntry(
+ FileKind.DELETE, partition, 0,
2, entry.file()))
+ .collect(Collectors.toList());
+ // commit
+ commit.tryCommitOnce(
+ delete,
+ Collections.emptyList(),
+ commitIdentifier++,
+ null,
+ Collections.emptyMap(),
+ Snapshot.CommitKind.APPEND,
+ 3L,
+ null);
+
+ // step 5: expire and check file paths
+ store.newExpire(1, 1, Long.MAX_VALUE).expire();
+ // whole dt=0401 is deleted
+ assertPathNotExists(fileIO, new Path(root, "dt=0401"));
+ // whole dt=0402/hr=8 is deleted
+ assertPathNotExists(fileIO, new Path(root, "dt=0402/hr=8"));
+ // for dt=0402/hr=12, bucket-0 is delete but bucket-1 survives
+ assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 0));
+ assertPathExists(fileIO, pathFactory.bucketPath(partition, 1));
+ }
+
+ // only exists bucket directories
+ @Test
+ public void testNoPartitions() throws Exception {
+ TestFileStore store =
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+ TestKeyValueGenerator gen =
+ new
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+ FileStorePathFactory pathFactory = store.pathFactory();
+
+ Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new
HashMap<>();
+ for (int bucket : Arrays.asList(0, 1)) {
+ List<KeyValue> kvs = partitionedData(5, gen);
+ BinaryRow partition = gen.getPartition(kvs.get(0));
+ writeData(store, kvs, partition, bucket, writers);
+ }
+ commitData(store, commitIdentifier++, writers);
+
+ // cleaning bucket 0
+ List<ManifestEntry> bucketEntries =
+ store.newScan().withSnapshot(1).withBucket(0).plan().files();
+ BinaryRow partition = gen.getPartition(gen.next());
+ List<ManifestEntry> delete =
+ bucketEntries.stream()
+ .map(
+ entry ->
+ new ManifestEntry(
+ FileKind.DELETE, partition, 0,
2, entry.file()))
+ .collect(Collectors.toList());
+ // commit
+ store.newCommit()
+ .tryCommitOnce(
+ delete,
+ Collections.emptyList(),
+ commitIdentifier++,
+ null,
+ Collections.emptyMap(),
+ Snapshot.CommitKind.APPEND,
+ 1L,
+ null);
+
+ // check before expiring
+ assertPathExists(fileIO, pathFactory.bucketPath(partition, 0));
+ assertPathExists(fileIO, pathFactory.bucketPath(partition, 1));
+
+ // check after expiring
+ store.newExpire(1, 1, Long.MAX_VALUE).expire();
+ assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 0));
+ assertPathExists(fileIO, pathFactory.bucketPath(partition, 1));
+ }
+
+ private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode
mode) throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ CoreOptions.ChangelogProducer changelogProducer;
+ if (random.nextBoolean()) {
+ changelogProducer = CoreOptions.ChangelogProducer.INPUT;
+ } else {
+ changelogProducer = CoreOptions.ChangelogProducer.NONE;
+ }
+
+ RowType rowType, partitionType;
+ switch (mode) {
+ case NON_PARTITIONED:
+ rowType = TestKeyValueGenerator.NON_PARTITIONED_ROW_TYPE;
+ partitionType =
TestKeyValueGenerator.NON_PARTITIONED_PART_TYPE;
+ break;
+ case SINGLE_PARTITIONED:
+ rowType = TestKeyValueGenerator.SINGLE_PARTITIONED_ROW_TYPE;
+ partitionType =
TestKeyValueGenerator.SINGLE_PARTITIONED_PART_TYPE;
+ break;
+ case MULTI_PARTITIONED:
+ rowType = TestKeyValueGenerator.DEFAULT_ROW_TYPE;
+ partitionType = TestKeyValueGenerator.DEFAULT_PART_TYPE;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported generator
mode: " + mode);
+ }
+
+ SchemaManager schemaManager = new SchemaManager(fileIO, new
Path(root));
+ schemaManager.createTable(
+ new Schema(
+ rowType.getFields(),
+ partitionType.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(mode),
+ Collections.emptyMap(),
+ null));
+
+ return new TestFileStore.Builder(
+ "avro",
+ root,
+ 2,
+ partitionType,
+ TestKeyValueGenerator.KEY_TYPE,
+ rowType,
+
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+ DeduplicateMergeFunction.factory())
+ .changelogProducer(changelogProducer)
+ .build();
+ }
+
+ private void writeData(
+ TestFileStore store,
+ List<KeyValue> kvs,
+ BinaryRow partition,
+ int bucket,
+ Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers)
+ throws Exception {
+ writers.computeIfAbsent(partition, p -> new HashMap<>())
+ .put(bucket, FileStoreTestUtils.writeData(store, kvs,
partition, bucket));
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
new file mode 100644
index 000000000..e4743f770
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.TestFileStore;
+import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemoryOwner;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.RecordWriter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Utils for {@link FileStore} of {@link KeyValue}, such as writing and
committing. */
+public class FileStoreTestUtils {
+
+ // generate partitioned data
+ public static List<KeyValue> partitionedData(
+ int num, TestKeyValueGenerator gen, Object... partitionSpec) {
+ List<KeyValue> keyValues = new ArrayList<>();
+ for (int i = 0; i < num; i++) {
+ keyValues.add(gen.nextPartitionedData(RowKind.INSERT,
partitionSpec));
+ }
+ return keyValues;
+ }
+
+ public static void assertPathExists(FileIO fileIO, Path path) throws
IOException {
+ assertThat(fileIO.exists(path)).isTrue();
+ }
+
+ public static void assertPathNotExists(FileIO fileIO, Path path) throws
IOException {
+ assertThat(fileIO.exists(path)).isFalse();
+ }
+
+ //
--------------------------------------------------------------------------------
+ // writeData & commitData are copied from TestFileStore#commitDataImpl and
modified
+ //
--------------------------------------------------------------------------------
+
+ // create a RecordWriter and write data
+ public static RecordWriter<KeyValue> writeData(
+ TestFileStore store, List<KeyValue> keyValues, BinaryRow
partition, int bucket)
+ throws Exception {
+ AbstractFileStoreWrite<KeyValue> write = store.newWrite();
+ RecordWriter<KeyValue> writer =
+ write.createWriterContainer(partition, bucket, false).writer;
+ ((MemoryOwner) writer)
+ .setMemoryPool(
+ new HeapMemorySegmentPool(
+ TestFileStore.WRITE_BUFFER_SIZE.getBytes(),
+ (int) TestFileStore.PAGE_SIZE.getBytes()));
+ for (KeyValue kv : keyValues) {
+ writer.write(kv);
+ }
+ return writer;
+ }
+
+ // commit data in writers
+ public static void commitData(
+ TestFileStore store,
+ long commitIdentifier,
+ Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers)
+ throws Exception {
+ FileStoreCommit commit = store.newCommit();
+ ManifestCommittable committable = new
ManifestCommittable(commitIdentifier, null);
+ for (Map.Entry<BinaryRow, Map<Integer, RecordWriter<KeyValue>>>
entryWithPartition :
+ writers.entrySet()) {
+ for (Map.Entry<Integer, RecordWriter<KeyValue>> entryWithBucket :
+ entryWithPartition.getValue().entrySet()) {
+ CommitIncrement increment =
entryWithBucket.getValue().prepareCommit(false);
+ committable.addFileCommittable(
+ new CommitMessageImpl(
+ entryWithPartition.getKey(),
+ entryWithBucket.getKey(),
+ increment.newFilesIncrement(),
+ increment.compactIncrement()));
+ }
+ }
+
+ commit.commit(committable, Collections.emptyMap());
+
+ writers.values().stream()
+ .flatMap(m -> m.values().stream())
+ .forEach(
+ w -> {
+ try {
+ // wait for compaction to end, otherwise
orphan files may occur
+ // see CompactManager#cancelCompaction for
more info
+ w.sync();
+ w.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+}