This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ca88ed985 [core] Reduce small files while batch write append only
table with a lot of partitions. (#3160)
ca88ed985 is described below
commit ca88ed98595fb5692cfc6555754531f83a671810
Author: YeJunHao <[email protected]>
AuthorDate: Sun Apr 7 15:02:17 2024 +0800
[core] Reduce small files while batch write append only table with a lot of
partitions. (#3160)
---
.../org/apache/paimon/append/AppendOnlyWriter.java | 23 +++++++++--
.../paimon/operation/AppendOnlyFileStoreWrite.java | 27 +++++++++----
.../apache/paimon/append/AppendOnlyWriterTest.java | 1 +
.../apache/paimon/format/FileFormatSuffixTest.java | 1 +
.../operation/AppendOnlyFileStoreWriteTest.java | 46 ++++++++++++++++++++++
5 files changed, 87 insertions(+), 11 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 4b00eae1f..9d9f41897 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -33,7 +33,9 @@ import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
@@ -64,6 +66,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
private final RowType writeSchema;
private final DataFilePathFactory pathFactory;
private final CompactManager compactManager;
+ private final AppendOnlyFileStoreWrite.BucketFileRead bucketFileRead;
private final boolean forceCompact;
private final List<DataFileMeta> newFiles;
private final List<DataFileMeta> deletedFiles;
@@ -88,6 +91,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
RowType writeSchema,
long maxSequenceNumber,
CompactManager compactManager,
+ AppendOnlyFileStoreWrite.BucketFileRead bucketFileRead,
boolean forceCompact,
DataFilePathFactory pathFactory,
@Nullable CommitIncrement increment,
@@ -104,6 +108,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
this.writeSchema = writeSchema;
this.pathFactory = pathFactory;
this.compactManager = compactManager;
+ this.bucketFileRead = bucketFileRead;
this.forceCompact = forceCompact;
this.newFiles = new ArrayList<>();
this.deletedFiles = new ArrayList<>();
@@ -209,13 +214,25 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
}
public void toBufferedWriter() throws Exception {
- if (sinkWriter != null && !sinkWriter.bufferSpillableWriter()) {
- flush(false, false);
- trySyncLatestCompaction(true);
+ if (sinkWriter != null && !sinkWriter.bufferSpillableWriter() &&
bucketFileRead != null) {
+ // fetch the written results
+ List<DataFileMeta> files = sinkWriter.flush();
sinkWriter.close();
sinkWriter = new BufferedSinkWriter(true, maxDiskSize,
spillCompression);
sinkWriter.setMemoryPool(memorySegmentPool);
+
+ // rewrite small files
+ try (RecordReaderIterator<InternalRow> reader =
bucketFileRead.read(files)) {
+ while (reader.hasNext()) {
+ sinkWriter.write(reader.next());
+ }
+ } finally {
+ // remove small files
+ for (DataFileMeta file : files) {
+ fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
+ }
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index f794b160c..a0d863371 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -47,6 +47,7 @@ import org.apache.paimon.utils.StatsCollectorFactories;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -145,6 +146,7 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
rowType,
maxSequenceNumber,
compactManager,
+ bucketReader(partition, bucket),
commitForceCompact,
factory,
restoreIncrement,
@@ -174,14 +176,7 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
fileCompression,
statsCollectors);
try {
- rewriter.write(
- new RecordReaderIterator<>(
- read.createReader(
- DataSplit.builder()
- .withPartition(partition)
- .withBucket(bucket)
- .withDataFiles(toCompact)
- .build())));
+ rewriter.write(bucketReader(partition,
bucket).read(toCompact));
} finally {
rewriter.close();
}
@@ -189,6 +184,17 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
};
}
+ public BucketFileRead bucketReader(BinaryRow partition, int bucket) {
+ return files ->
+ new RecordReaderIterator<>(
+ read.createReader(
+ DataSplit.builder()
+ .withPartition(partition)
+ .withBucket(bucket)
+ .withDataFiles(files)
+ .build()));
+ }
+
public AppendOnlyFileStoreWrite withBucketMode(BucketMode bucketMode) {
// AppendOnlyFileStoreWrite is sensitive with bucket mode. It will act
difference in
// unaware-bucket mode (no compaction and force empty-writer).
@@ -215,4 +221,9 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
}
}
}
+
+ /** Read for one bucket. */
+ public interface BucketFileRead {
+ RecordReaderIterator<InternalRow> read(List<DataFileMeta> files)
throws IOException;
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 87bb14745..565be9f90 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -594,6 +594,7 @@ public class AppendOnlyWriterTest {
AppendOnlyWriterTest.SCHEMA,
getMaxSequenceNumber(toCompact),
compactManager,
+ null,
forceCompact,
pathFactory,
null,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 22826d5bc..ffa290e0a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -79,6 +79,7 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
0,
new AppendOnlyCompactManager(
null, toCompact, 4, 10, 10, null, null), //
not used
+ null,
false,
dataFilePathFactory,
null,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
index 9281a7b35..c38fb4fba 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
@@ -42,10 +42,13 @@ import org.junit.jupiter.api.io.TempDir;
import java.util.List;
import java.util.Map;
+import java.util.Random;
/** Tests for {@link AppendOnlyFileStoreWrite}. */
public class AppendOnlyFileStoreWriteTest {
+ private static final Random RANDOM = new Random();
+
@TempDir java.nio.file.Path tempDir;
@Test
@@ -101,6 +104,49 @@ public class AppendOnlyFileStoreWriteTest {
Assertions.assertThat(records).isEqualTo(11);
}
+ @Test
+ public void testWritesInBatchWithNoExtraFiles() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ AppendOnlyFileStoreWrite write = (AppendOnlyFileStoreWrite)
table.store().newWrite("ss");
+ write.withExecutionMode(false);
+
+ write.write(partition(0), 0, GenericRow.of(0, 0, 0));
+ write.write(partition(1), 1, GenericRow.of(1, 1, 0));
+ write.write(partition(2), 2, GenericRow.of(2, 2, 0));
+ write.write(partition(3), 3, GenericRow.of(3, 3, 0));
+ write.write(partition(4), 4, GenericRow.of(4, 4, 0));
+ write.write(partition(5), 5, GenericRow.of(5, 5, 0));
+ write.write(partition(6), 6, GenericRow.of(6, 6, 0));
+
+ for (int i = 0; i < 1000; i++) {
+ int number = RANDOM.nextInt(7);
+ write.write(partition(number), number, GenericRow.of(number,
number, 0));
+ }
+
+ List<CommitMessage> commit = write.prepareCommit(true, Long.MAX_VALUE);
+
+ Assertions.assertThat(commit.size()).isEqualTo(7);
+
+ long files =
+ commit.stream()
+ .map(s -> (CommitMessageImpl) s)
+ .mapToLong(s ->
s.newFilesIncrement().newFiles().size())
+ .sum();
+ Assertions.assertThat(files).isEqualTo(7);
+
+ long records =
+ commit.stream()
+ .map(s -> (CommitMessageImpl) s)
+ .mapToLong(
+ s ->
+
s.newFilesIncrement().newFiles().stream()
+
.mapToLong(DataFileMeta::rowCount)
+ .sum())
+ .sum();
+ Assertions.assertThat(records).isEqualTo(1007);
+ }
+
protected FileStoreTable createFileStoreTable() throws Exception {
Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new
Path(tempDir.toString()));
Schema schema =