This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new db0076f47 [core] Introduce compaction.optimization-interval (#2711)
db0076f47 is described below

commit db0076f471350f6825aab2ca57368389dcea52be
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jan 16 21:17:35 2024 +0800

    [core] Introduce compaction.optimization-interval (#2711)
---
 docs/content/maintenance/read-performance.md       | 30 ++---------
 .../shortcodes/generated/core_configuration.html   |  6 +++
 .../main/java/org/apache/paimon/CoreOptions.java   | 13 +++++
 .../mergetree/compact/UniversalCompaction.java     | 37 ++++++++++++-
 .../paimon/operation/KeyValueFileStoreWrite.java   |  3 +-
 .../mergetree/compact/UniversalCompactionTest.java | 61 +++++++++++++++++++---
 6 files changed, 117 insertions(+), 33 deletions(-)

diff --git a/docs/content/maintenance/read-performance.md 
b/docs/content/maintenance/read-performance.md
index 796391c68..6b9ba3462 100644
--- a/docs/content/maintenance/read-performance.md
+++ b/docs/content/maintenance/read-performance.md
@@ -26,17 +26,7 @@ under the License.
 
 # Read Performance
 
-## Full Compaction
-
-Configure 'full-compaction.delta-commits' perform full-compaction periodically 
in Flink writing.
-And it can ensure that partitions are full compacted before writing ends.
-
-{{< hint info >}}
-Paimon defaults to handling small files and providing decent read performance. 
Please do not configure
-this full-compaction option without any requirements, as it will have a 
significant impact on performance.
-{{< /hint >}}
-
-### Primary Key Table
+## Primary Key Table
 
 For Primary Key Table, it's a 'MergeOnRead' technology. When reading data, 
multiple layers of LSM data are merged,
 and the number of parallelism will be limited by the number of buckets. 
Although Paimon's merge performance is efficient,
@@ -44,19 +34,9 @@ it still cannot catch up with the ordinary AppendOnly table.
 
 If you want to query fast enough in certain scenarios, but can only find older 
data, you can:
 
-1. Configure 'full-compaction.delta-commits' when writing data (currently only 
in Flink). For streaming jobs, full compaction will then be performed 
periodically; For batch jobs, full compaction will be carried out when the job 
ends.
-2. Query from [read-optimized system table]({{< ref 
"how-to/system-tables#read-optimized-table" >}}). Reading from results of full 
compaction avoids merging records with the same key, thus improving reading 
performance.
+1. Configure 'compaction.optimization-interval' when writing data. For 
streaming jobs, optimized compaction will then
+   be performed periodically; For batch jobs, optimized compaction will be 
carried out when the job ends.
+2. Query from [read-optimized system table]({{< ref 
"how-to/system-tables#read-optimized-table" >}}). Reading from
+   results of optimized files avoids merging records with the same key, thus 
improving reading performance.
 
 You can flexibly balance query performance and data latency when reading.
-
-### Append Only Table
-
-Small files will slow down reading performance and affect the stability of 
DFS. By default, when there are more than 
-'compaction.max.file-num' (default 50) small files in a single bucket, a 
compaction task will be triggered to compact 
-them. Furthermore, if there are multiple buckets, many small files will be 
generated.
-
-You can use full-compaction to reduce small files. Full-compaction will 
eliminate most small files.
-
-## Format
-
-Paimon has some query optimizations to parquet reading, so parquet will be 
slightly faster that orc.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index d5f596be2..946681796 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -92,6 +92,12 @@ under the License.
             <td>Integer</td>
             <td>For file set [f_0,...,f_N], the minimum file number which 
satisfies sum(size(f_i)) &gt;= targetFileSize to trigger a compaction for 
append-only table. This value avoids almost-full-file to be compacted, which is 
not cost-effective.</td>
         </tr>
+        <tr>
+            <td><h5>compaction.optimization-interval</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>Implying how often to perform an optimization compaction, this 
configuration is used to ensure the query timeliness of the read-optimized 
system table.</td>
+        </tr>
         <tr>
             <td><h5>compaction.size-ratio</h5></td>
             <td style="word-wrap: break-word;">1</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index d1b8840d8..8337db566 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -408,6 +408,14 @@ public class CoreOptions implements Serializable {
                                     + "size is 1% smaller than the next sorted 
run's size, then include next sorted run "
                                     + "into this candidate set.");
 
+    public static final ConfigOption<Duration> 
COMPACTION_OPTIMIZATION_INTERVAL =
+            key("compaction.optimization-interval")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Implying how often to perform an optimization 
compaction, this configuration is used to "
+                                    + "ensure the query timeliness of the 
read-optimized system table.");
+
     public static final ConfigOption<Integer> COMPACTION_MIN_FILE_NUM =
             key("compaction.min.file-num")
                     .intType()
@@ -1253,6 +1261,11 @@ public class CoreOptions implements Serializable {
         return options.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER);
     }
 
+    @Nullable
+    public Duration optimizedCompactionInterval() {
+        return options.get(COMPACTION_OPTIMIZATION_INTERVAL);
+    }
+
     public int numSortedRunStopTrigger() {
         Integer stopTrigger = options.get(NUM_SORTED_RUNS_STOP_TRIGGER);
         if (stopTrigger == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
index 5db0fa5e5..c31aec682 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
@@ -26,6 +26,9 @@ import org.apache.paimon.mergetree.SortedRun;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
 
@@ -44,16 +47,38 @@ public class UniversalCompaction implements CompactStrategy 
{
     private final int sizeRatio;
     private final int numRunCompactionTrigger;
 
+    @Nullable private final Long opCompactionInterval;
+    @Nullable private Long lastOptimizedCompaction;
+
     public UniversalCompaction(int maxSizeAmp, int sizeRatio, int 
numRunCompactionTrigger) {
+        this(maxSizeAmp, sizeRatio, numRunCompactionTrigger, null);
+    }
+
+    public UniversalCompaction(
+            int maxSizeAmp,
+            int sizeRatio,
+            int numRunCompactionTrigger,
+            @Nullable Duration opCompactionInterval) {
         this.maxSizeAmp = maxSizeAmp;
         this.sizeRatio = sizeRatio;
         this.numRunCompactionTrigger = numRunCompactionTrigger;
+        this.opCompactionInterval =
+                opCompactionInterval == null ? null : 
opCompactionInterval.toMillis();
     }
 
     @Override
     public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> 
runs) {
         int maxLevel = numLevels - 1;
 
+        if (opCompactionInterval != null) {
+            if (lastOptimizedCompaction == null
+                    || currentTimeMillis() - lastOptimizedCompaction > 
opCompactionInterval) {
+                LOG.debug("Universal compaction due to optimized compaction 
interval");
+                updateLastOptimizedCompaction();
+                return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+            }
+        }
+
         // 1 checking for reducing size amplification
         CompactUnit unit = pickForSizeAmp(maxLevel, runs);
         if (unit != null) {
@@ -101,6 +126,7 @@ public class UniversalCompaction implements CompactStrategy 
{
 
         // size amplification = percentage of additional size
         if (candidateSize * 100 > maxSizeAmp * earliestRunSize) {
+            updateLastOptimizedCompaction();
             return CompactUnit.fromLevelRuns(maxLevel, runs);
         }
 
@@ -150,7 +176,7 @@ public class UniversalCompaction implements CompactStrategy 
{
     }
 
     @VisibleForTesting
-    static CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int 
runCount) {
+    CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int 
runCount) {
         int outputLevel;
         if (runCount == runs.size()) {
             outputLevel = maxLevel;
@@ -172,9 +198,18 @@ public class UniversalCompaction implements 
CompactStrategy {
         }
 
         if (runCount == runs.size()) {
+            updateLastOptimizedCompaction();
             outputLevel = maxLevel;
         }
 
         return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0, 
runCount));
     }
+
+    private void updateLastOptimizedCompaction() {
+        lastOptimizedCompaction = currentTimeMillis();
+    }
+
+    long currentTimeMillis() {
+        return System.currentTimeMillis();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 0bb4cf1fc..fd71dfce4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -156,7 +156,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 new UniversalCompaction(
                         options.maxSizeAmplificationPercent(),
                         options.sortedRunSizeRatio(),
-                        options.numSortedRunCompactionTrigger());
+                        options.numSortedRunCompactionTrigger(),
+                        options.optimizedCompactionInterval());
         CompactStrategy compactStrategy =
                 options.changelogProducer() == ChangelogProducer.LOOKUP
                         ? new LookupCompaction(universalCompaction)
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index 6e5127c63..328f2e7c4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -25,13 +25,14 @@ import org.apache.paimon.mergetree.SortedRun;
 
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import static 
org.apache.paimon.mergetree.compact.UniversalCompaction.createUnit;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link UniversalCompaction}. */
@@ -39,11 +40,17 @@ public class UniversalCompactionTest {
 
     @Test
     public void testOutputLevel() {
-        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 
1).outputLevel()).isEqualTo(1);
-        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 
2).outputLevel()).isEqualTo(1);
-        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 
3).outputLevel()).isEqualTo(2);
-        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 
4).outputLevel()).isEqualTo(3);
-        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 
5).outputLevel()).isEqualTo(5);
+        UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);
+        assertThat(compaction.createUnit(createLevels(0, 0, 1, 3, 4), 5, 
1).outputLevel())
+                .isEqualTo(1);
+        assertThat(compaction.createUnit(createLevels(0, 0, 1, 3, 4), 5, 
2).outputLevel())
+                .isEqualTo(1);
+        assertThat(compaction.createUnit(createLevels(0, 0, 1, 3, 4), 5, 
3).outputLevel())
+                .isEqualTo(2);
+        assertThat(compaction.createUnit(createLevels(0, 0, 1, 3, 4), 5, 
4).outputLevel())
+                .isEqualTo(3);
+        assertThat(compaction.createUnit(createLevels(0, 0, 1, 3, 4), 5, 
5).outputLevel())
+                .isEqualTo(5);
     }
 
     @Test
@@ -74,6 +81,48 @@ public class UniversalCompactionTest {
         assertThat(results).isEqualTo(new long[] {1, 2, 3});
     }
 
+    @Test
+    public void testOptimizedCompactionInterval() {
+        AtomicLong time = new AtomicLong(0);
+        UniversalCompaction compaction =
+                new UniversalCompaction(100, 1, 3, Duration.ofMillis(1000)) {
+                    @Override
+                    long currentTimeMillis() {
+                        return time.get();
+                    }
+                };
+
+        // first time, force optimized compaction
+        Optional<CompactUnit> pick =
+                compaction.pick(3, Arrays.asList(level(0, 1), level(1, 3), 
level(2, 5)));
+        assertThat(pick.isPresent()).isTrue();
+        long[] results = 
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+        assertThat(results).isEqualTo(new long[] {1, 3, 5});
+
+        // modify time, optimized compaction
+        time.set(1001);
+        pick = compaction.pick(3, Arrays.asList(level(0, 1), level(1, 3), 
level(2, 5)));
+        assertThat(pick.isPresent()).isTrue();
+        results = 
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+        assertThat(results).isEqualTo(new long[] {1, 3, 5});
+
+        // third time, no compaction
+        pick = compaction.pick(3, Arrays.asList(level(0, 1), level(1, 3), 
level(2, 5)));
+        assertThat(pick.isPresent()).isFalse();
+
+        // 4 time, pickForSizeAmp
+        time.set(1500);
+        pick = compaction.pick(3, Arrays.asList(level(0, 3), level(1, 3), 
level(2, 5)));
+        assertThat(pick.isPresent()).isTrue();
+        results = 
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+        assertThat(results).isEqualTo(new long[] {3, 3, 5});
+
+        // 5 time, no compaction because pickForSizeAmp already done
+        time.set(2001);
+        pick = compaction.pick(3, Arrays.asList(level(0, 1), level(1, 3), 
level(2, 5)));
+        assertThat(pick.isPresent()).isFalse();
+    }
+
     @Test
     public void testNoOutputLevel0() {
         UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);

Reply via email to