This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new db0076f47 [core] Introduce compaction.optimization-interval (#2711)
db0076f47 is described below
commit db0076f471350f6825aab2ca57368389dcea52be
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jan 16 21:17:35 2024 +0800
[core] Introduce compaction.optimization-interval (#2711)
---
docs/content/maintenance/read-performance.md | 30 ++---------
.../shortcodes/generated/core_configuration.html | 6 +++
.../main/java/org/apache/paimon/CoreOptions.java | 13 +++++
.../mergetree/compact/UniversalCompaction.java | 37 ++++++++++++-
.../paimon/operation/KeyValueFileStoreWrite.java | 3 +-
.../mergetree/compact/UniversalCompactionTest.java | 61 +++++++++++++++++++---
6 files changed, 117 insertions(+), 33 deletions(-)
diff --git a/docs/content/maintenance/read-performance.md
b/docs/content/maintenance/read-performance.md
index 796391c68..6b9ba3462 100644
--- a/docs/content/maintenance/read-performance.md
+++ b/docs/content/maintenance/read-performance.md
@@ -26,17 +26,7 @@ under the License.
# Read Performance
-## Full Compaction
-
-Configure 'full-compaction.delta-commits' perform full-compaction periodically
in Flink writing.
-And it can ensure that partitions are full compacted before writing ends.
-
-{{< hint info >}}
-Paimon defaults to handling small files and providing decent read performance.
Please do not configure
-this full-compaction option without any requirements, as it will have a
significant impact on performance.
-{{< /hint >}}
-
-### Primary Key Table
+## Primary Key Table
For Primary Key Table, it's a 'MergeOnRead' technology. When reading data,
multiple layers of LSM data are merged,
and the number of parallelism will be limited by the number of buckets.
Although Paimon's merge performance is efficient,
@@ -44,19 +34,9 @@ it still cannot catch up with the ordinary AppendOnly table.
If you want to query fast enough in certain scenarios, but can only find older
data, you can:
-1. Configure 'full-compaction.delta-commits' when writing data (currently only
in Flink). For streaming jobs, full compaction will then be performed
periodically; For batch jobs, full compaction will be carried out when the job
ends.
-2. Query from [read-optimized system table]({{< ref
"how-to/system-tables#read-optimized-table" >}}). Reading from results of full
compaction avoids merging records with the same key, thus improving reading
performance.
+1. Configure 'compaction.optimization-interval' when writing data. For
streaming jobs, optimized compaction will then
+ be performed periodically; For batch jobs, optimized compaction will be
carried out when the job ends.
+2. Query from [read-optimized system table]({{< ref
"how-to/system-tables#read-optimized-table" >}}). Reading from
+ results of optimized files avoids merging records with the same key, thus
improving reading performance.
You can flexibly balance query performance and data latency when reading.
-
-### Append Only Table
-
-Small files will slow down reading performance and affect the stability of
DFS. By default, when there are more than
-'compaction.max.file-num' (default 50) small files in a single bucket, a
compaction task will be triggered to compact
-them. Furthermore, if there are multiple buckets, many small files will be
generated.
-
-You can use full-compaction to reduce small files. Full-compaction will
eliminate most small files.
-
-## Format
-
-Paimon has some query optimizations to parquet reading, so parquet will be
slightly faster that orc.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index d5f596be2..946681796 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -92,6 +92,12 @@ under the License.
<td>Integer</td>
<td>For file set [f_0,...,f_N], the minimum file number which
satisfies sum(size(f_i)) >= targetFileSize to trigger a compaction for
append-only table. This value avoids almost-full-file to be compacted, which is
not cost-effective.</td>
</tr>
+ <tr>
+ <td><h5>compaction.optimization-interval</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>Implying how often to perform an optimization compaction, this
configuration is used to ensure the query timeliness of the read-optimized
system table.</td>
+ </tr>
<tr>
<td><h5>compaction.size-ratio</h5></td>
<td style="word-wrap: break-word;">1</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index d1b8840d8..8337db566 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -408,6 +408,14 @@ public class CoreOptions implements Serializable {
+ "size is 1% smaller than the next sorted
run's size, then include next sorted run "
+ "into this candidate set.");
+ public static final ConfigOption<Duration>
COMPACTION_OPTIMIZATION_INTERVAL =
+ key("compaction.optimization-interval")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "Implying how often to perform an optimization
compaction, this configuration is used to "
+ + "ensure the query timeliness of the
read-optimized system table.");
+
public static final ConfigOption<Integer> COMPACTION_MIN_FILE_NUM =
key("compaction.min.file-num")
.intType()
@@ -1253,6 +1261,11 @@ public class CoreOptions implements Serializable {
return options.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER);
}
+ @Nullable
+ public Duration optimizedCompactionInterval() {
+ return options.get(COMPACTION_OPTIMIZATION_INTERVAL);
+ }
+
public int numSortedRunStopTrigger() {
Integer stopTrigger = options.get(NUM_SORTED_RUNS_STOP_TRIGGER);
if (stopTrigger == null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
index 5db0fa5e5..c31aec682 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
@@ -26,6 +26,9 @@ import org.apache.paimon.mergetree.SortedRun;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
+import java.time.Duration;
import java.util.List;
import java.util.Optional;
@@ -44,16 +47,38 @@ public class UniversalCompaction implements CompactStrategy
{
private final int sizeRatio;
private final int numRunCompactionTrigger;
+ @Nullable private final Long opCompactionInterval;
+ @Nullable private Long lastOptimizedCompaction;
+
public UniversalCompaction(int maxSizeAmp, int sizeRatio, int
numRunCompactionTrigger) {
+ this(maxSizeAmp, sizeRatio, numRunCompactionTrigger, null);
+ }
+
+ public UniversalCompaction(
+ int maxSizeAmp,
+ int sizeRatio,
+ int numRunCompactionTrigger,
+ @Nullable Duration opCompactionInterval) {
this.maxSizeAmp = maxSizeAmp;
this.sizeRatio = sizeRatio;
this.numRunCompactionTrigger = numRunCompactionTrigger;
+ this.opCompactionInterval =
+ opCompactionInterval == null ? null :
opCompactionInterval.toMillis();
}
@Override
public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun>
runs) {
int maxLevel = numLevels - 1;
+ if (opCompactionInterval != null) {
+ if (lastOptimizedCompaction == null
+ || currentTimeMillis() - lastOptimizedCompaction >
opCompactionInterval) {
+ LOG.debug("Universal compaction due to optimized compaction
interval");
+ updateLastOptimizedCompaction();
+ return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+ }
+ }
+
// 1 checking for reducing size amplification
CompactUnit unit = pickForSizeAmp(maxLevel, runs);
if (unit != null) {
@@ -101,6 +126,7 @@ public class UniversalCompaction implements CompactStrategy
{
// size amplification = percentage of additional size
if (candidateSize * 100 > maxSizeAmp * earliestRunSize) {
+ updateLastOptimizedCompaction();
return CompactUnit.fromLevelRuns(maxLevel, runs);
}
@@ -150,7 +176,7 @@ public class UniversalCompaction implements CompactStrategy
{
}
@VisibleForTesting
- static CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int
runCount) {
+ CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int
runCount) {
int outputLevel;
if (runCount == runs.size()) {
outputLevel = maxLevel;
@@ -172,9 +198,18 @@ public class UniversalCompaction implements
CompactStrategy {
}
if (runCount == runs.size()) {
+ updateLastOptimizedCompaction();
outputLevel = maxLevel;
}
return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0,
runCount));
}
+
+ private void updateLastOptimizedCompaction() {
+ lastOptimizedCompaction = currentTimeMillis();
+ }
+
+ long currentTimeMillis() {
+ return System.currentTimeMillis();
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 0bb4cf1fc..fd71dfce4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -156,7 +156,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
new UniversalCompaction(
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
- options.numSortedRunCompactionTrigger());
+ options.numSortedRunCompactionTrigger(),
+ options.optimizedCompactionInterval());
CompactStrategy compactStrategy =
options.changelogProducer() == ChangelogProducer.LOOKUP
? new LookupCompaction(universalCompaction)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index 6e5127c63..328f2e7c4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -25,13 +25,14 @@ import org.apache.paimon.mergetree.SortedRun;
import org.junit.jupiter.api.Test;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import static
org.apache.paimon.mergetree.compact.UniversalCompaction.createUnit;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link UniversalCompaction}. */
@@ -39,11 +40,17 @@ public class UniversalCompactionTest {
@Test
public void testOutputLevel() {
- assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5,
1).outputLevel()).isEqualTo(1);
- assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5,
2).outputLevel()).isEqualTo(1);
- assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5,
3).outputLevel()).isEqualTo(2);
- assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5,
4).outputLevel()).isEqualTo(3);
- assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5,
5).outputLevel()).isEqualTo(5);
+ UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);
+ assertThat(compaction.createUnit(createLevels(0, 0, 1, 3, 4), 5,
1).outputLevel())
+ .isEqualTo(1);
+ assertThat(compaction.createUnit(createLevels(0, 0, 1, 3, 4), 5,
2).outputLevel())
+ .isEqualTo(1);
+ assertThat(compaction.createUnit(createLevels(0, 0, 1, 3, 4), 5,
3).outputLevel())
+ .isEqualTo(2);
+ assertThat(compaction.createUnit(createLevels(0, 0, 1, 3, 4), 5,
4).outputLevel())
+ .isEqualTo(3);
+ assertThat(compaction.createUnit(createLevels(0, 0, 1, 3, 4), 5,
5).outputLevel())
+ .isEqualTo(5);
}
@Test
@@ -74,6 +81,48 @@ public class UniversalCompactionTest {
assertThat(results).isEqualTo(new long[] {1, 2, 3});
}
+ @Test
+ public void testOptimizedCompactionInterval() {
+ AtomicLong time = new AtomicLong(0);
+ UniversalCompaction compaction =
+ new UniversalCompaction(100, 1, 3, Duration.ofMillis(1000)) {
+ @Override
+ long currentTimeMillis() {
+ return time.get();
+ }
+ };
+
+ // first time, force optimized compaction
+ Optional<CompactUnit> pick =
+ compaction.pick(3, Arrays.asList(level(0, 1), level(1, 3),
level(2, 5)));
+ assertThat(pick.isPresent()).isTrue();
+ long[] results =
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+ assertThat(results).isEqualTo(new long[] {1, 3, 5});
+
+ // modify time, optimized compaction
+ time.set(1001);
+ pick = compaction.pick(3, Arrays.asList(level(0, 1), level(1, 3),
level(2, 5)));
+ assertThat(pick.isPresent()).isTrue();
+ results =
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+ assertThat(results).isEqualTo(new long[] {1, 3, 5});
+
+ // third time, no compaction
+ pick = compaction.pick(3, Arrays.asList(level(0, 1), level(1, 3),
level(2, 5)));
+ assertThat(pick.isPresent()).isFalse();
+
+ // 4 time, pickForSizeAmp
+ time.set(1500);
+ pick = compaction.pick(3, Arrays.asList(level(0, 3), level(1, 3),
level(2, 5)));
+ assertThat(pick.isPresent()).isTrue();
+ results =
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+ assertThat(results).isEqualTo(new long[] {3, 3, 5});
+
+ // 5 time, no compaction because pickForSizeAmp already done
+ time.set(2001);
+ pick = compaction.pick(3, Arrays.asList(level(0, 1), level(1, 3),
level(2, 5)));
+ assertThat(pick.isPresent()).isFalse();
+ }
+
@Test
public void testNoOutputLevel0() {
UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);