This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 7448de9c77 [core] Support off-peak hours in universal compaction (#5928) 7448de9c77 is described below commit 7448de9c77e079a442909dd6236f1ed2bd2479f6 Author: xiaochen <598457...@qq.com> AuthorDate: Mon Jul 21 21:12:09 2025 +0800 [core] Support off-peak hours in universal compaction (#5928) --- .../shortcodes/generated/core_configuration.html | 18 +++ .../main/java/org/apache/paimon/CoreOptions.java | 43 +++++++ .../main/java/org/apache/paimon/OffPeakHours.java | 108 ++++++++++++++++ .../apache/paimon/offpeak/OffPeakHoursTest.java | 140 +++++++++++++++++++++ .../mergetree/compact/UniversalCompaction.java | 46 ++++++- .../paimon/operation/KeyValueFileStoreWrite.java | 12 +- .../mergetree/compact/UniversalCompactionTest.java | 24 +++- 7 files changed, 381 insertions(+), 10 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index ed6ba5da13..f4276640e8 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -242,6 +242,24 @@ under the License. <td>Integer</td> <td>For file set [f_0,...,f_N], the minimum file number to trigger a compaction for append-only table.</td> </tr> + <tr> + <td><h5>compaction.offpeak-ratio</h5></td> + <td style="word-wrap: break-word;">0</td> + <td>Integer</td> + <td>Allows you to set a different (by default, more aggressive) percentage ratio for determining whether larger sorted run's size are included in compactions during off-peak hours. Works in the same way as compaction.size-ratio. Only applies if offpeak.start.hour and offpeak.end.hour are also enabled. <br /> For instance, if your cluster experiences low pressure between 2 AM and 6 PM , you can configure `compaction.offpeak.start.hour=2` and `compaction.offpeak.end.hour=1 [...] + </tr> + <tr> + <td><h5>compaction.offpeak.end.hour</h5></td> + <td style="word-wrap: break-word;">-1</td> + <td>Integer</td> + <td>The end of off-peak hours, expressed as an integer between 0 and 23, inclusive. Set to -1 to disable off-peak.</td> + </tr> + <tr> + <td><h5>compaction.offpeak.start.hour</h5></td> + <td style="word-wrap: break-word;">-1</td> + <td>Integer</td> + <td>The start of off-peak hours, expressed as an integer between 0 and 23, inclusive Set to -1 to disable off-peak</td> + </tr> <tr> <td><h5>compaction.optimization-interval</h5></td> <td style="word-wrap: break-word;">(none)</td> diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 19ca49737a..ba5cd54cad 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -683,6 +683,40 @@ public class CoreOptions implements Serializable { + "size is 1% smaller than the next sorted run's size, then include next sorted run " + "into this candidate set."); + public static final ConfigOption<Integer> COMPACT_OFFPEAK_START_HOUR = + key("compaction.offpeak.start.hour") + .intType() + .defaultValue(-1) + .withDescription( + "The start of off-peak hours, expressed as an integer between 0 and 23, inclusive" + + " Set to -1 to disable off-peak"); + + public static final ConfigOption<Integer> COMPACT_OFFPEAK_END_HOUR = + key("compaction.offpeak.end.hour") + .intType() + .defaultValue(-1) + .withDescription( + "The end of off-peak hours, expressed as an integer between 0 and 23, inclusive. Set" + + " to -1 to disable off-peak."); + + public static final ConfigOption<Integer> COMPACTION_OFFPEAK_RATIO = + key("compaction.offpeak-ratio") + .intType() + .defaultValue(0) + .withDescription( + Description.builder() + .text( + "Allows you to set a different (by default, more aggressive) percentage ratio for determining " + + " whether larger sorted run's size are included in compactions during off-peak hours. Works in the " + + " same way as compaction.size-ratio. Only applies if offpeak.start.hour and " + + " offpeak.end.hour are also enabled. ") + .linebreak() + .text( + " For instance, if your cluster experiences low pressure between 2 AM and 6 PM , " + + " you can configure `compaction.offpeak.start.hour=2` and `compaction.offpeak.end.hour=18` to define this period as off-peak hours. " + + " During these hours, you can increase the off-peak compaction ratio (e.g. `compaction.offpeak-ratio=20`) to enable more aggressive data compaction") + .build()); + public static final ConfigOption<Duration> COMPACTION_OPTIMIZATION_INTERVAL = key("compaction.optimization-interval") .durationType() @@ -2344,6 +2378,15 @@ public class CoreOptions implements Serializable { return options.get(COMPACTION_SIZE_RATIO); } + public OffPeakHours offPeakHours() { + return OffPeakHours.getInstance( + options.get(COMPACT_OFFPEAK_START_HOUR), options.get(COMPACT_OFFPEAK_END_HOUR)); + } + + public int compactOffPeakRatio() { + return options.get(COMPACTION_OFFPEAK_RATIO); + } + public int compactionMinFileNum() { return options.get(COMPACTION_MIN_FILE_NUM); } diff --git a/paimon-api/src/main/java/org/apache/paimon/OffPeakHours.java b/paimon-api/src/main/java/org/apache/paimon/OffPeakHours.java new file mode 100644 index 0000000000..35e934cf49 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/OffPeakHours.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.ZoneId; +import java.time.ZonedDateTime; + +/** OffPeakHours. */ +public abstract class OffPeakHours { + private static final Logger LOG = LoggerFactory.getLogger(OffPeakHours.class); + + public static final OffPeakHours DISABLED = + new OffPeakHours() { + @Override + public boolean isOffPeak() { + return false; + } + + @Override + public boolean isOffPeak(int targetHour) { + return false; + } + }; + + /** + * @param startHour inclusive + * @param endHour exclusive + */ + public static OffPeakHours getInstance(int startHour, int endHour) { + if (startHour == -1 && endHour == -1) { + return DISABLED; + } + + if (!isValidHour(startHour) || !isValidHour(endHour)) { + if (LOG.isWarnEnabled()) { + LOG.warn( + "Ignoring invalid start/end hour for peak hour : start = " + + startHour + + " end = " + + endHour + + ". Valid numbers are [0-23]"); + } + return DISABLED; + } + + if (startHour == endHour) { + return DISABLED; + } + + return new OffPeakHoursImpl(startHour, endHour); + } + + private static boolean isValidHour(int hour) { + return 0 <= hour && hour <= 23; + } + + /** Returns whether {@code targetHour} is off-peak hour. */ + public abstract boolean isOffPeak(int targetHour); + + /** Returns whether it is off-peak hour. */ + public abstract boolean isOffPeak(); + + private static class OffPeakHoursImpl extends OffPeakHours { + final int startHour; + final int endHour; + + /** + * @param startHour inclusive + * @param endHour exclusive + */ + OffPeakHoursImpl(int startHour, int endHour) { + this.startHour = startHour; + this.endHour = endHour; + } + + @Override + public boolean isOffPeak() { + return isOffPeak(ZonedDateTime.now(ZoneId.systemDefault()).getHour()); + } + + @Override + public boolean isOffPeak(int targetHour) { + if (startHour <= endHour) { + return startHour <= targetHour && targetHour < endHour; + } + return targetHour < endHour || startHour <= targetHour; + } + } +} diff --git a/paimon-api/src/test/java/org/apache/paimon/offpeak/OffPeakHoursTest.java b/paimon-api/src/test/java/org/apache/paimon/offpeak/OffPeakHoursTest.java new file mode 100644 index 0000000000..92c786d35c --- /dev/null +++ b/paimon-api/src/test/java/org/apache/paimon/offpeak/OffPeakHoursTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.offpeak; + +import org.apache.paimon.OffPeakHours; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link OffPeakHours}. */ +public class OffPeakHoursTest { + + @Test + public void testDisabledInstance() { + OffPeakHours disabled = OffPeakHours.DISABLED; + assertThat(disabled.isOffPeak()).isFalse(); + for (int hour = 0; hour < 24; hour++) { + assertThat(disabled.isOffPeak(hour)).isFalse(); + } + } + + @Test + public void testGetInstanceWithDisabledValues() { + OffPeakHours offPeakHours = OffPeakHours.getInstance(-1, -1); + assertThat(offPeakHours).isSameAs(OffPeakHours.DISABLED); + } + + @Test + public void testGetInstanceWithSameStartAndEnd() { + for (int hour = 0; hour < 24; hour++) { + OffPeakHours offPeakHours = OffPeakHours.getInstance(hour, hour); + assertThat(offPeakHours).isSameAs(OffPeakHours.DISABLED); + } + } + + @ParameterizedTest + @ValueSource(ints = {-2, -1, 24, 25, 100}) + public void testGetInstanceWithInvalidStartHour(int invalidHour) { + OffPeakHours offPeakHours = OffPeakHours.getInstance(invalidHour, 10); + assertThat(offPeakHours).isSameAs(OffPeakHours.DISABLED); + } + + @ParameterizedTest + @ValueSource(ints = {-2, -1, 24, 25, 100}) + public void testGetInstanceWithInvalidEndHour(int invalidHour) { + OffPeakHours offPeakHours = OffPeakHours.getInstance(10, invalidHour); + assertThat(offPeakHours).isSameAs(OffPeakHours.DISABLED); + } + + @Test + public void testNormalRangeOffPeakHours() { + // Test normal range: 9 AM to 5 PM (9-17) + OffPeakHours offPeakHours = OffPeakHours.getInstance(9, 17); + + // Hours before start should not be off-peak + for (int hour = 0; hour < 9; hour++) { + assertThat(offPeakHours.isOffPeak(hour)) + .as("Hour %d should not be off-peak", hour) + .isFalse(); + } + + // Hours in range should be off-peak (start inclusive, end exclusive) + for (int hour = 9; hour < 17; hour++) { + assertThat(offPeakHours.isOffPeak(hour)) + .as("Hour %d should be off-peak", hour) + .isTrue(); + } + + // Hours after end should not be off-peak + for (int hour = 17; hour < 24; hour++) { + assertThat(offPeakHours.isOffPeak(hour)) + .as("Hour %d should not be off-peak", hour) + .isFalse(); + } + } + + @Test + public void testWrapAroundRangeOffPeakHours() { + OffPeakHours offPeakHours = OffPeakHours.getInstance(22, 6); + + // Hours before end (0-5) should be off-peak + for (int hour = 0; hour < 6; hour++) { + assertThat(offPeakHours.isOffPeak(hour)) + .as("Hour %d should be off-peak", hour) + .isTrue(); + } + + // Hours between end and start (6-21) should not be off-peak + for (int hour = 6; hour < 22; hour++) { + assertThat(offPeakHours.isOffPeak(hour)) + .as("Hour %d should not be off-peak", hour) + .isFalse(); + } + + // Hours from start to end of day (22-23) should be off-peak + for (int hour = 22; hour < 24; hour++) { + assertThat(offPeakHours.isOffPeak(hour)) + .as("Hour %d should be off-peak", hour) + .isTrue(); + } + } + + @Test + public void testSingleHourRange() { + // Test single hour range: 12 to 13 + OffPeakHours offPeakHours = OffPeakHours.getInstance(12, 13); + + // Only hour 12 should be off-peak + for (int hour = 0; hour < 24; hour++) { + if (hour == 12) { + assertThat(offPeakHours.isOffPeak(hour)) + .as("Hour %d should be off-peak", hour) + .isTrue(); + } else { + assertThat(offPeakHours.isOffPeak(hour)) + .as("Hour %d should not be off-peak", hour) + .isFalse(); + } + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java index c53d26ab4b..63e71354cf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java @@ -18,6 +18,7 @@ package org.apache.paimon.mergetree.compact; +import org.apache.paimon.OffPeakHours; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.mergetree.LevelSortedRun; @@ -47,6 +48,8 @@ public class UniversalCompaction implements CompactStrategy { private final int maxSizeAmp; private final int sizeRatio; private final int numRunCompactionTrigger; + private final OffPeakHours offPeakHours; + private final int compactOffPeakRatio; @Nullable private final Long opCompactionInterval; @Nullable private Long lastOptimizedCompaction; @@ -55,15 +58,22 @@ public class UniversalCompaction implements CompactStrategy { @Nullable private final AtomicInteger lookupCompactTriggerCount; public UniversalCompaction(int maxSizeAmp, int sizeRatio, int numRunCompactionTrigger) { - this(maxSizeAmp, sizeRatio, numRunCompactionTrigger, null); + this(maxSizeAmp, sizeRatio, numRunCompactionTrigger, null, OffPeakHours.DISABLED, 0); } public UniversalCompaction( int maxSizeAmp, int sizeRatio, int numRunCompactionTrigger, - @Nullable Duration opCompactionInterval) { - this(maxSizeAmp, sizeRatio, numRunCompactionTrigger, opCompactionInterval, null); + OffPeakHours offPeakHours, + int compactOffPeakRatio) { + this( + maxSizeAmp, + sizeRatio, + numRunCompactionTrigger, + null, + offPeakHours, + compactOffPeakRatio); } public UniversalCompaction( @@ -71,15 +81,36 @@ public class UniversalCompaction implements CompactStrategy { int sizeRatio, int numRunCompactionTrigger, @Nullable Duration opCompactionInterval, - @Nullable Integer maxLookupCompactInterval) { + OffPeakHours offPeakHours, + int compactOffPeakRatio) { + this( + maxSizeAmp, + sizeRatio, + numRunCompactionTrigger, + opCompactionInterval, + null, + offPeakHours, + compactOffPeakRatio); + } + + public UniversalCompaction( + int maxSizeAmp, + int sizeRatio, + int numRunCompactionTrigger, + @Nullable Duration opCompactionInterval, + @Nullable Integer maxLookupCompactInterval, + OffPeakHours offPeakHours, + int compactOffPeakRatio) { this.maxSizeAmp = maxSizeAmp; this.sizeRatio = sizeRatio; + this.offPeakHours = offPeakHours; this.numRunCompactionTrigger = numRunCompactionTrigger; this.opCompactionInterval = opCompactionInterval == null ? null : opCompactionInterval.toMillis(); this.maxLookupCompactInterval = maxLookupCompactInterval; this.lookupCompactTriggerCount = maxLookupCompactInterval == null ? null : new AtomicInteger(0); + this.compactOffPeakRatio = compactOffPeakRatio; } @Override @@ -203,7 +234,12 @@ public class UniversalCompaction implements CompactStrategy { long candidateSize = candidateSize(runs, candidateCount); for (int i = candidateCount; i < runs.size(); i++) { LevelSortedRun next = runs.get(i); - if (candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()) { + if (candidateSize + * (100.0 + + sizeRatio + + (offPeakHours.isOffPeak() ? compactOffPeakRatio : 0)) + / 100.0 + < next.run().totalSize()) { break; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index e337db7e63..15787ea5de 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -226,14 +226,18 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { options.maxSizeAmplificationPercent(), options.sortedRunSizeRatio(), options.numSortedRunCompactionTrigger(), - options.optimizedCompactionInterval())); + options.optimizedCompactionInterval(), + options.offPeakHours(), + options.compactOffPeakRatio())); } else if (CoreOptions.LookupCompactMode.GENTLE.equals(options.lookupCompact())) { return new UniversalCompaction( options.maxSizeAmplificationPercent(), options.sortedRunSizeRatio(), options.numSortedRunCompactionTrigger(), options.optimizedCompactionInterval(), - options.lookupCompactMaxInterval()); + options.lookupCompactMaxInterval(), + options.offPeakHours(), + options.compactOffPeakRatio()); } } @@ -242,7 +246,9 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { options.maxSizeAmplificationPercent(), options.sortedRunSizeRatio(), options.numSortedRunCompactionTrigger(), - options.optimizedCompactionInterval()); + options.optimizedCompactionInterval(), + options.offPeakHours(), + options.compactOffPeakRatio()); if (options.compactionForceUpLevel0()) { return new ForceUpLevel0Compaction(universal); } else { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java index b5587e4752..e70034693b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.mergetree.compact; +import org.apache.paimon.OffPeakHours; import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileSource; @@ -86,7 +87,8 @@ public class UniversalCompactionTest { public void testOptimizedCompactionInterval() { AtomicLong time = new AtomicLong(0); UniversalCompaction compaction = - new UniversalCompaction(100, 1, 3, Duration.ofMillis(1000)) { + new UniversalCompaction( + 100, 1, 3, Duration.ofMillis(1000), OffPeakHours.DISABLED, 0) { @Override long currentTimeMillis() { return time.get(); @@ -265,6 +267,23 @@ public class UniversalCompactionTest { .isEqualTo(new long[] {27}); } + @Test + public void testOffPeakRatioThreshold() { + + OffPeakHours offPeakHours = OffPeakHours.getInstance(0, 23); + long[] sizes = new long[] {8, 9, 10}; + assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2, offPeakHours, 0), sizes)) + .isEqualTo(new long[] {8, 9, 10}); + assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2, offPeakHours, 10), sizes)) + .isEqualTo(new long[] {27}); + + assertThat( + pickForSizeRatio( + new UniversalCompaction(25, 10, 2, OffPeakHours.DISABLED, 10), + sizes)) + .isEqualTo(new long[] {8, 9, 10}); + } + @Test public void testLookup() { ForceUpLevel0Compaction compaction = @@ -295,7 +314,8 @@ public class UniversalCompactionTest { @Test public void testForcePickL0() { int maxInterval = 5; - UniversalCompaction compaction = new UniversalCompaction(25, 1, 5, null, maxInterval); + UniversalCompaction compaction = + new UniversalCompaction(25, 1, 5, null, maxInterval, OffPeakHours.DISABLED, 0); // level 0 to max level List<LevelSortedRun> level0ToMax = level0(1, 2, 2, 2);