This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ca88ed985 [core] Reduce small files while batch write append only 
table with a lot of partitions. (#3160)
ca88ed985 is described below

commit ca88ed98595fb5692cfc6555754531f83a671810
Author: YeJunHao <[email protected]>
AuthorDate: Sun Apr 7 15:02:17 2024 +0800

    [core] Reduce small files while batch write append only table with a lot of 
partitions. (#3160)
---
 .../org/apache/paimon/append/AppendOnlyWriter.java | 23 +++++++++--
 .../paimon/operation/AppendOnlyFileStoreWrite.java | 27 +++++++++----
 .../apache/paimon/append/AppendOnlyWriterTest.java |  1 +
 .../apache/paimon/format/FileFormatSuffixTest.java |  1 +
 .../operation/AppendOnlyFileStoreWriteTest.java    | 46 ++++++++++++++++++++++
 5 files changed, 87 insertions(+), 11 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 4b00eae1f..9d9f41897 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -33,7 +33,9 @@ import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.io.RowDataRollingFileWriter;
 import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
 import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
@@ -64,6 +66,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
     private final RowType writeSchema;
     private final DataFilePathFactory pathFactory;
     private final CompactManager compactManager;
+    private final AppendOnlyFileStoreWrite.BucketFileRead bucketFileRead;
     private final boolean forceCompact;
     private final List<DataFileMeta> newFiles;
     private final List<DataFileMeta> deletedFiles;
@@ -88,6 +91,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
             RowType writeSchema,
             long maxSequenceNumber,
             CompactManager compactManager,
+            AppendOnlyFileStoreWrite.BucketFileRead bucketFileRead,
             boolean forceCompact,
             DataFilePathFactory pathFactory,
             @Nullable CommitIncrement increment,
@@ -104,6 +108,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
         this.writeSchema = writeSchema;
         this.pathFactory = pathFactory;
         this.compactManager = compactManager;
+        this.bucketFileRead = bucketFileRead;
         this.forceCompact = forceCompact;
         this.newFiles = new ArrayList<>();
         this.deletedFiles = new ArrayList<>();
@@ -209,13 +214,25 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
     }
 
     public void toBufferedWriter() throws Exception {
-        if (sinkWriter != null && !sinkWriter.bufferSpillableWriter()) {
-            flush(false, false);
-            trySyncLatestCompaction(true);
+        if (sinkWriter != null && !sinkWriter.bufferSpillableWriter() && 
bucketFileRead != null) {
+            // fetch the written results
+            List<DataFileMeta> files = sinkWriter.flush();
 
             sinkWriter.close();
             sinkWriter = new BufferedSinkWriter(true, maxDiskSize, 
spillCompression);
             sinkWriter.setMemoryPool(memorySegmentPool);
+
+            // rewrite small files
+            try (RecordReaderIterator<InternalRow> reader = 
bucketFileRead.read(files)) {
+                while (reader.hasNext()) {
+                    sinkWriter.write(reader.next());
+                }
+            } finally {
+                // remove small files
+                for (DataFileMeta file : files) {
+                    fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
+                }
+            }
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index f794b160c..a0d863371 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -47,6 +47,7 @@ import org.apache.paimon.utils.StatsCollectorFactories;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -145,6 +146,7 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
                 rowType,
                 maxSequenceNumber,
                 compactManager,
+                bucketReader(partition, bucket),
                 commitForceCompact,
                 factory,
                 restoreIncrement,
@@ -174,14 +176,7 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
                             fileCompression,
                             statsCollectors);
             try {
-                rewriter.write(
-                        new RecordReaderIterator<>(
-                                read.createReader(
-                                        DataSplit.builder()
-                                                .withPartition(partition)
-                                                .withBucket(bucket)
-                                                .withDataFiles(toCompact)
-                                                .build())));
+                rewriter.write(bucketReader(partition, 
bucket).read(toCompact));
             } finally {
                 rewriter.close();
             }
@@ -189,6 +184,17 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
         };
     }
 
+    public BucketFileRead bucketReader(BinaryRow partition, int bucket) {
+        return files ->
+                new RecordReaderIterator<>(
+                        read.createReader(
+                                DataSplit.builder()
+                                        .withPartition(partition)
+                                        .withBucket(bucket)
+                                        .withDataFiles(files)
+                                        .build()));
+    }
+
     public AppendOnlyFileStoreWrite withBucketMode(BucketMode bucketMode) {
         // AppendOnlyFileStoreWrite is sensitive with bucket mode. It will act 
difference in
         // unaware-bucket mode (no compaction and force empty-writer).
@@ -215,4 +221,9 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
             }
         }
     }
+
+    /** Read for one bucket. */
+    public interface BucketFileRead {
+        RecordReaderIterator<InternalRow> read(List<DataFileMeta> files) 
throws IOException;
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 87bb14745..565be9f90 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -594,6 +594,7 @@ public class AppendOnlyWriterTest {
                         AppendOnlyWriterTest.SCHEMA,
                         getMaxSequenceNumber(toCompact),
                         compactManager,
+                        null,
                         forceCompact,
                         pathFactory,
                         null,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 22826d5bc..ffa290e0a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -79,6 +79,7 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         0,
                         new AppendOnlyCompactManager(
                                 null, toCompact, 4, 10, 10, null, null), // 
not used
+                        null,
                         false,
                         dataFilePathFactory,
                         null,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
index 9281a7b35..c38fb4fba 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
@@ -42,10 +42,13 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 /** Tests for {@link AppendOnlyFileStoreWrite}. */
 public class AppendOnlyFileStoreWriteTest {
 
+    private static final Random RANDOM = new Random();
+
     @TempDir java.nio.file.Path tempDir;
 
     @Test
@@ -101,6 +104,49 @@ public class AppendOnlyFileStoreWriteTest {
         Assertions.assertThat(records).isEqualTo(11);
     }
 
+    @Test
+    public void testWritesInBatchWithNoExtraFiles() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        AppendOnlyFileStoreWrite write = (AppendOnlyFileStoreWrite) 
table.store().newWrite("ss");
+        write.withExecutionMode(false);
+
+        write.write(partition(0), 0, GenericRow.of(0, 0, 0));
+        write.write(partition(1), 1, GenericRow.of(1, 1, 0));
+        write.write(partition(2), 2, GenericRow.of(2, 2, 0));
+        write.write(partition(3), 3, GenericRow.of(3, 3, 0));
+        write.write(partition(4), 4, GenericRow.of(4, 4, 0));
+        write.write(partition(5), 5, GenericRow.of(5, 5, 0));
+        write.write(partition(6), 6, GenericRow.of(6, 6, 0));
+
+        for (int i = 0; i < 1000; i++) {
+            int number = RANDOM.nextInt(7);
+            write.write(partition(number), number, GenericRow.of(number, 
number, 0));
+        }
+
+        List<CommitMessage> commit = write.prepareCommit(true, Long.MAX_VALUE);
+
+        Assertions.assertThat(commit.size()).isEqualTo(7);
+
+        long files =
+                commit.stream()
+                        .map(s -> (CommitMessageImpl) s)
+                        .mapToLong(s -> 
s.newFilesIncrement().newFiles().size())
+                        .sum();
+        Assertions.assertThat(files).isEqualTo(7);
+
+        long records =
+                commit.stream()
+                        .map(s -> (CommitMessageImpl) s)
+                        .mapToLong(
+                                s ->
+                                        
s.newFilesIncrement().newFiles().stream()
+                                                
.mapToLong(DataFileMeta::rowCount)
+                                                .sum())
+                        .sum();
+        Assertions.assertThat(records).isEqualTo(1007);
+    }
+
     protected FileStoreTable createFileStoreTable() throws Exception {
         Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new 
Path(tempDir.toString()));
         Schema schema =

Reply via email to