This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3329e4872 [core] clean up files generated by compactManager when 
AppendOnlyWriter is closed. (#1948)
3329e4872 is described below

commit 3329e48728531147cef3e35e80d2e8a4839785fb
Author: liming.1018 <[email protected]>
AuthorDate: Wed Sep 6 13:37:39 2023 +0800

    [core] clean up files generated by compactManager when AppendOnlyWriter is 
closed. (#1948)
---
 .../org/apache/paimon/append/AppendOnlyWriter.java |  7 +++
 .../apache/paimon/append/AppendOnlyWriterTest.java | 65 ++++++++++++++++++++--
 2 files changed, 66 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 067520949..84fd01d73 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -162,6 +162,13 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
         compactManager.cancelCompaction();
         sync();
 
+        compactManager.close();
+        for (DataFileMeta file : compactAfter) {
+            // appendOnlyCompactManager will rewrite the file and no file 
upgrade will occur, so we
+            // can directly delete the file in compactAfter.
+            fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
+        }
+
         if (writer != null) {
             writer.abort();
             writer = null;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index cf06c3884..680e483e2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -46,12 +46,17 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.io.IOException;
+import java.nio.file.Files;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -264,6 +269,43 @@ public class AppendOnlyWriterTest {
         assertThat(secInc.newFilesIncrement().newFiles()).hasSize(1);
     }
 
+    @Test
+    public void testCloseUnexpectedly() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        AppendOnlyWriter writer =
+                createWriter(20 * 1024, false, Collections.emptyList(), 
latch).getLeft();
+
+        for (int i = 0; i < 10; i++) { // create 10 small files.
+            for (int j = 0; j < 10; j++) {
+                writer.write(row(j, String.format("%03d", j), PART));
+            }
+            CommitIncrement increment = writer.prepareCommit(false);
+            assertThat(increment.compactIncrement().isEmpty()).isTrue();
+        }
+        Set<Path> committedFiles =
+                Files.walk(tempDir)
+                        .filter(Files::isRegularFile)
+                        .map(p -> new Path(p.toString()))
+                        .collect(Collectors.toSet());
+
+        // start compaction and write more records
+        latch.countDown();
+        for (int j = 0; j < 10; j++) {
+            writer.write(row(j, String.format("%03d", j), PART));
+        }
+        writer.sync();
+
+        // writer closed unexpectedly
+        writer.close();
+
+        Set<Path> afterClosedUnexpectedly =
+                Files.walk(tempDir)
+                        .filter(Files::isRegularFile)
+                        .map(p -> new Path(p.toString()))
+                        .collect(Collectors.toSet());
+        
assertThat(afterClosedUnexpectedly).containsExactlyInAnyOrderElementsOf(committedFiles);
+    }
+
     private FieldStats initStats(Integer min, Integer max, long nullCount) {
         return new FieldStats(min, max, nullCount);
     }
@@ -291,6 +333,14 @@ public class AppendOnlyWriterTest {
 
     private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
             long targetFileSize, boolean forceCompact, List<DataFileMeta> 
scannedFiles) {
+        return createWriter(targetFileSize, forceCompact, scannedFiles, new 
CountDownLatch(0));
+    }
+
+    private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
+            long targetFileSize,
+            boolean forceCompact,
+            List<DataFileMeta> scannedFiles,
+            CountDownLatch latch) {
         FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Options());
         LinkedList<DataFileMeta> toCompact = new LinkedList<>(scannedFiles);
         AppendOnlyCompactManager compactManager =
@@ -301,11 +351,13 @@ public class AppendOnlyWriterTest {
                         MIN_FILE_NUM,
                         MAX_FILE_NUM,
                         targetFileSize,
-                        compactBefore ->
-                                compactBefore.isEmpty()
-                                        ? Collections.emptyList()
-                                        : Collections.singletonList(
-                                                
generateCompactAfter(compactBefore)));
+                        compactBefore -> {
+                            latch.await();
+                            return compactBefore.isEmpty()
+                                    ? Collections.emptyList()
+                                    : Collections.singletonList(
+                                            
generateCompactAfter(compactBefore));
+                        });
         AppendOnlyWriter writer =
                 new AppendOnlyWriter(
                         LocalFileIO.create(),
@@ -325,11 +377,12 @@ public class AppendOnlyWriterTest {
         return Pair.of(writer, compactManager.allFiles());
     }
 
-    private DataFileMeta generateCompactAfter(List<DataFileMeta> toCompact) {
+    private DataFileMeta generateCompactAfter(List<DataFileMeta> toCompact) 
throws IOException {
         int size = toCompact.size();
         long minSeq = toCompact.get(0).minSequenceNumber();
         long maxSeq = toCompact.get(size - 1).maxSequenceNumber();
         String fileName = "compact-" + UUID.randomUUID();
+        LocalFileIO.create().newOutputStream(pathFactory.toPath(fileName), 
false).close();
         return DataFileMeta.forAppend(
                 fileName,
                 toCompact.stream().mapToLong(DataFileMeta::fileSize).sum(),

Reply via email to