This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3329e4872 [core] clean up files generated by compactManager when
AppendOnlyWriter is closed. (#1948)
3329e4872 is described below
commit 3329e48728531147cef3e35e80d2e8a4839785fb
Author: liming.1018 <[email protected]>
AuthorDate: Wed Sep 6 13:37:39 2023 +0800
[core] clean up files generated by compactManager when AppendOnlyWriter is
closed. (#1948)
---
.../org/apache/paimon/append/AppendOnlyWriter.java | 7 +++
.../apache/paimon/append/AppendOnlyWriterTest.java | 65 ++++++++++++++++++++--
2 files changed, 66 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 067520949..84fd01d73 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -162,6 +162,13 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow> {
compactManager.cancelCompaction();
sync();
+ compactManager.close();
+ for (DataFileMeta file : compactAfter) {
+ // appendOnlyCompactManager will rewrite the file and no file
upgrade will occur, so we
+ // can directly delete the file in compactAfter.
+ fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
+ }
+
if (writer != null) {
writer.abort();
writer = null;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index cf06c3884..680e483e2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -46,12 +46,17 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import java.io.IOException;
+import java.nio.file.Files;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
import static org.assertj.core.api.Assertions.assertThat;
@@ -264,6 +269,43 @@ public class AppendOnlyWriterTest {
assertThat(secInc.newFilesIncrement().newFiles()).hasSize(1);
}
+ @Test
+ public void testCloseUnexpectedly() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ AppendOnlyWriter writer =
+ createWriter(20 * 1024, false, Collections.emptyList(),
latch).getLeft();
+
+ for (int i = 0; i < 10; i++) { // create 10 small files.
+ for (int j = 0; j < 10; j++) {
+ writer.write(row(j, String.format("%03d", j), PART));
+ }
+ CommitIncrement increment = writer.prepareCommit(false);
+ assertThat(increment.compactIncrement().isEmpty()).isTrue();
+ }
+ Set<Path> committedFiles =
+ Files.walk(tempDir)
+ .filter(Files::isRegularFile)
+ .map(p -> new Path(p.toString()))
+ .collect(Collectors.toSet());
+
+ // start compaction and write more records
+ latch.countDown();
+ for (int j = 0; j < 10; j++) {
+ writer.write(row(j, String.format("%03d", j), PART));
+ }
+ writer.sync();
+
+ // writer closed unexpectedly
+ writer.close();
+
+ Set<Path> afterClosedUnexpectedly =
+ Files.walk(tempDir)
+ .filter(Files::isRegularFile)
+ .map(p -> new Path(p.toString()))
+ .collect(Collectors.toSet());
+
assertThat(afterClosedUnexpectedly).containsExactlyInAnyOrderElementsOf(committedFiles);
+ }
+
private FieldStats initStats(Integer min, Integer max, long nullCount) {
return new FieldStats(min, max, nullCount);
}
@@ -291,6 +333,14 @@ public class AppendOnlyWriterTest {
private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
long targetFileSize, boolean forceCompact, List<DataFileMeta>
scannedFiles) {
+ return createWriter(targetFileSize, forceCompact, scannedFiles, new
CountDownLatch(0));
+ }
+
+ private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
+ long targetFileSize,
+ boolean forceCompact,
+ List<DataFileMeta> scannedFiles,
+ CountDownLatch latch) {
FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Options());
LinkedList<DataFileMeta> toCompact = new LinkedList<>(scannedFiles);
AppendOnlyCompactManager compactManager =
@@ -301,11 +351,13 @@ public class AppendOnlyWriterTest {
MIN_FILE_NUM,
MAX_FILE_NUM,
targetFileSize,
- compactBefore ->
- compactBefore.isEmpty()
- ? Collections.emptyList()
- : Collections.singletonList(
-
generateCompactAfter(compactBefore)));
+ compactBefore -> {
+ latch.await();
+ return compactBefore.isEmpty()
+ ? Collections.emptyList()
+ : Collections.singletonList(
+
generateCompactAfter(compactBefore));
+ });
AppendOnlyWriter writer =
new AppendOnlyWriter(
LocalFileIO.create(),
@@ -325,11 +377,12 @@ public class AppendOnlyWriterTest {
return Pair.of(writer, compactManager.allFiles());
}
- private DataFileMeta generateCompactAfter(List<DataFileMeta> toCompact) {
+ private DataFileMeta generateCompactAfter(List<DataFileMeta> toCompact)
throws IOException {
int size = toCompact.size();
long minSeq = toCompact.get(0).minSequenceNumber();
long maxSeq = toCompact.get(size - 1).maxSequenceNumber();
String fileName = "compact-" + UUID.randomUUID();
+ LocalFileIO.create().newOutputStream(pathFactory.toPath(fileName),
false).close();
return DataFileMeta.forAppend(
fileName,
toCompact.stream().mapToLong(DataFileMeta::fileSize).sum(),