This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b1b0acd79 [core] Increase default sorted run stop trigger (#3220)
b1b0acd79 is described below
commit b1b0acd79168e07b9061c285cbe1b2dd06e501b1
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 22 10:59:53 2024 +0800
[core] Increase default sorted run stop trigger (#3220)
---
docs/content/maintenance/write-performance.md | 2 +-
.../shortcodes/generated/core_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 4 ++--
.../java/org/apache/paimon/utils/MathUtils.java | 9 +++++++++
.../apache/paimon/mergetree/MergeTreeWriter.java | 16 ++++++++++++----
.../apache/paimon/mergetree/MergeTreeTestBase.java | 21 ++++++++++++---------
6 files changed, 37 insertions(+), 17 deletions(-)
diff --git a/docs/content/maintenance/write-performance.md
b/docs/content/maintenance/write-performance.md
index 929f6c17a..0129c07a0 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -110,7 +110,7 @@ the threshold.
<td>No</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
- <td>The number of sorted runs that trigger the stopping of writes, the
default value is 'num-sorted-run.compaction-trigger' + 1.</td>
+ <td>The number of sorted runs that trigger the stopping of writes, the
default value is 'num-sorted-run.compaction-trigger' + 3.</td>
</tr>
</tbody>
</table>
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 48822af1b..468da3f5a 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -405,7 +405,7 @@ This config option does not affect the default filesystem
metastore.</td>
<td><h5>num-sorted-run.stop-trigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
- <td>The number of sorted runs that trigger the stopping of writes,
the default value is 'num-sorted-run.compaction-trigger' + 1.</td>
+ <td>The number of sorted runs that trigger the stopping of writes,
the default value is 'num-sorted-run.compaction-trigger' + 3.</td>
</tr>
<tr>
<td><h5>page-size</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 2440bdcc6..86bcb91fd 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -421,7 +421,7 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription(
"The number of sorted runs that trigger the
stopping of writes,"
- + " the default value is
'num-sorted-run.compaction-trigger' + 1.");
+ + " the default value is
'num-sorted-run.compaction-trigger' + 3.");
public static final ConfigOption<Integer> NUM_LEVELS =
key("num-levels")
@@ -1415,7 +1415,7 @@ public class CoreOptions implements Serializable {
public int numSortedRunStopTrigger() {
Integer stopTrigger = options.get(NUM_SORTED_RUNS_STOP_TRIGGER);
if (stopTrigger == null) {
- stopTrigger =
MathUtils.incrementSafely(numSortedRunCompactionTrigger());
+ stopTrigger = MathUtils.addSafely(numSortedRunCompactionTrigger(),
3);
}
return Math.max(numSortedRunCompactionTrigger(), stopTrigger);
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java
index c50646bd2..ba9994d9b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java
@@ -101,4 +101,13 @@ public class MathUtils {
}
return a + 1;
}
+
+ /** Safely add the given int value by another int value, ensuring that no
overflow occurs. */
+ public static int addSafely(int a, int b) {
+ try {
+ return Math.addExact(a, b);
+ } catch (ArithmeticException e) {
+ return Integer.MAX_VALUE;
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index f525c6666..17673fc06 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -241,10 +241,18 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws
Exception {
flushWriteBuffer(waitCompaction, false);
- trySyncLatestCompaction(
- waitCompaction
- || commitForceCompact
- || compactManager.shouldWaitForPreparingCheckpoint());
+ if (commitForceCompact) {
+ waitCompaction = true;
+ }
+ // Decide again whether to wait here.
+ // For example, in the case of repeated failures in writing, it is
possible that Level 0
+ // files were successfully committed, but failed to restart during the
compaction phase,
+ // which may result in an increasing number of Level 0 files. This
wait can avoid this
+ // situation.
+ if (compactManager.shouldWaitForPreparingCheckpoint()) {
+ waitCompaction = true;
+ }
+ trySyncLatestCompaction(waitCompaction);
return drainIncrement();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index a1e55f73f..3f9bd3e6e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -133,12 +133,15 @@ public abstract class MergeTreeTestBase {
}
private void recreateMergeTree(long targetFileSize) {
- Options configuration = new Options();
- configuration.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 *
3));
- configuration.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
- configuration.set(CoreOptions.TARGET_FILE_SIZE, new
MemorySize(targetFileSize));
- configuration.set(CoreOptions.SORT_ENGINE, getSortEngine());
- options = new CoreOptions(configuration);
+ Options options = new Options();
+ options.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+ options.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+ options.set(CoreOptions.TARGET_FILE_SIZE, new
MemorySize(targetFileSize));
+ options.set(CoreOptions.SORT_ENGINE, getSortEngine());
+ options.set(
+ CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER,
+ options.get(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER) +
1);
+ this.options = new CoreOptions(options);
RowType keyType = new RowType(singletonList(new DataField(0, "k", new
IntType())));
RowType valueType = new RowType(singletonList(new DataField(0, "v",
new IntType())));
@@ -188,9 +191,9 @@ public abstract class MergeTreeTestBase {
valueType,
flushingAvro,
pathFactoryMap,
- options.targetFileSize());
- writerFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0,
options);
- compactWriterFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW,
0, options);
+ this.options.targetFileSize());
+ writerFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0,
this.options);
+ compactWriterFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW,
0, this.options);
writer = createMergeTreeWriter(Collections.emptyList());
}