This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7510c576d [fix] Fix problem: after compaction, all the result files
are regarded as small files (#1680)
7510c576d is described below
commit 7510c576dd32ce48d09d4e3f1ef56c3de581cbd5
Author: YeJunHao <[email protected]>
AuthorDate: Sat Jul 29 22:05:12 2023 +0800
[fix] Fix problem: after compaction, all the result files are regarded as
small files (#1680)
---
paimon-common/src/main/java/org/apache/paimon/CoreOptions.java | 7 +++++++
.../paimon/append/AppendOnlyTableCompactionCoordinator.java | 4 +++-
.../apache/paimon/mergetree/compact/MergeTreeCompactManager.java | 9 +++++----
.../java/org/apache/paimon/operation/KeyValueFileStoreWrite.java | 2 +-
.../paimon/append/AppendOnlyTableCompactionCoordinatorTest.java | 8 ++++++++
.../test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java | 2 +-
6 files changed, 25 insertions(+), 7 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 85bb06460..2711ccbca 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -976,6 +976,13 @@ public class CoreOptions implements Serializable {
return options.get(TARGET_FILE_SIZE).getBytes();
}
+ public long compactionFileSize() {
+ // file size to join the compaction, we don't process on middle file
size to avoid
+ // compact a same file twice (the compression is not calculate so
accurately. the output
+ // file maybe be less than target file generated by rolling file
write).
+ return options.get(TARGET_FILE_SIZE).getBytes() / 10 * 7;
+ }
+
public int numSortedRunCompactionTrigger() {
return options.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
index 7277d5e26..cd284af7f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
@@ -62,6 +62,7 @@ public class AppendOnlyTableCompactionCoordinator {
private final InnerTableScan scan;
private final long targetFileSize;
+ private final long compactionFileSize;
private final int minFileNum;
private final int maxFileNum;
private final boolean streamingMode;
@@ -93,6 +94,7 @@ public class AppendOnlyTableCompactionCoordinator {
this.streamingMode = isStreaming;
CoreOptions coreOptions = table.coreOptions();
this.targetFileSize = coreOptions.targetFileSize();
+ this.compactionFileSize = coreOptions.compactionFileSize();
this.minFileNum = coreOptions.compactionMinFileNum();
this.maxFileNum = coreOptions.compactionMaxFileNum();
}
@@ -132,7 +134,7 @@ public class AppendOnlyTableCompactionCoordinator {
.computeIfAbsent(partition, PartitionCompactCoordinator::new)
.addFiles(
files.stream()
- .filter(file -> file.fileSize() <
targetFileSize)
+ .filter(file -> file.fileSize() <
compactionFileSize)
.collect(Collectors.toList()));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index f56f63624..300bfe84b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -49,7 +49,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
private final Levels levels;
private final CompactStrategy strategy;
private final Comparator<InternalRow> keyComparator;
- private final long minFileSize;
+ private final long compactionFileSize;
private final int numSortedRunStopTrigger;
private final CompactRewriter rewriter;
@@ -58,13 +58,13 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
Levels levels,
CompactStrategy strategy,
Comparator<InternalRow> keyComparator,
- long minFileSize,
+ long compactionFileSize,
int numSortedRunStopTrigger,
CompactRewriter rewriter) {
this.executor = executor;
this.levels = levels;
this.strategy = strategy;
- this.minFileSize = minFileSize;
+ this.compactionFileSize = compactionFileSize;
this.numSortedRunStopTrigger = numSortedRunStopTrigger;
this.keyComparator = keyComparator;
this.rewriter = rewriter;
@@ -161,7 +161,8 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
private void submitCompaction(CompactUnit unit, boolean dropDelete) {
MergeTreeCompactTask task =
- new MergeTreeCompactTask(keyComparator, minFileSize, rewriter,
unit, dropDelete);
+ new MergeTreeCompactTask(
+ keyComparator, compactionFileSize, rewriter, unit,
dropDelete);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Pick these files (name, level, size) for compaction: {}",
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index ffd7c6081..6a27d67d2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -196,7 +196,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
levels,
compactStrategy,
keyComparator,
- options.targetFileSize(),
+ options.compactionFileSize(),
options.numSortedRunStopTrigger(),
rewriter);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
index 2ea765d47..15231c3a9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
@@ -72,6 +72,14 @@ public class AppendOnlyTableCompactionCoordinatorTest {
assertTasks(files, 100 / 3);
}
+ @Test
+ public void testFilterMiddleFile() {
+ List<DataFileMeta> files =
+ generateNewFiles(
+ 100,
appendOnlyFileStoreTable.coreOptions().targetFileSize() / 10 * 8);
+ assertTasks(files, 0);
+ }
+
@Test
public void testEliminatePartitionCoordinator() {
List<DataFileMeta> files = generateNewFiles(1, 0);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index b2592e70a..cdcd906a5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -444,7 +444,7 @@ public abstract class MergeTreeTestBase {
new Levels(comparator, files, options.numLevels()),
strategy,
comparator,
- options.targetFileSize(),
+ options.compactionFileSize(),
options.numSortedRunStopTrigger(),
new TestRewriter());
}