This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e26fb64b9 [core] Lookup wait should affect deletion vectors mode and
first row engine (#3782)
e26fb64b9 is described below
commit e26fb64b9dc95ec4bceacd94a1794d3c8bd7e3da
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 18 22:16:59 2024 +0800
[core] Lookup wait should affect deletion vectors mode and first row engine
(#3782)
---
docs/content/primary-key-table/compaction.md | 5 +----
.../main/java/org/apache/paimon/CoreOptions.java | 18 +++++++++---------
.../java/org/apache/paimon/CoreOptionsTest.java | 22 ++++++++++++++++++++++
.../org/apache/paimon/flink/sink/FlinkSink.java | 3 +--
.../sink/MultiTablesStoreCompactOperator.java | 3 +--
.../flink/PrimaryKeyFileStoreTableITCase.java | 6 ++----
6 files changed, 36 insertions(+), 21 deletions(-)
diff --git a/docs/content/primary-key-table/compaction.md
b/docs/content/primary-key-table/compaction.md
index c7c866581..e5040a03a 100644
--- a/docs/content/primary-key-table/compaction.md
+++ b/docs/content/primary-key-table/compaction.md
@@ -59,15 +59,12 @@ You can use the following strategies for your table:
```shell
num-sorted-run.stop-trigger = 2147483647
sort-spill-threshold = 10
-changelog-producer.lookup-wait = false
+lookup-wait = false
```
This configuration will generate more files during peak write periods and
gradually merge into optimal read
performance during low write periods.
-In the case of `'changelog-producer' = 'lookup'`, by default, the lookup will
be completed at checkpointing, which
-will block the checkpoint. So if you want an asynchronous lookup, you should
also set `'changelog-producer.lookup-wait' = 'false'`.
-
## Dedicated compaction job
In general, if you expect multiple jobs to be written to the same table, you
need to separate the compaction. You can
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 5b2f8aa77..b368b483a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1279,16 +1279,13 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("Specifies the commit user prefix.");
- public static final ConfigOption<Boolean> CHANGELOG_PRODUCER_LOOKUP_WAIT =
- key("changelog-producer.lookup-wait")
+ public static final ConfigOption<Boolean> LOOKUP_WAIT =
+ key("lookup-wait")
.booleanType()
.defaultValue(true)
+ .withFallbackKeys("changelog-producer.lookup-wait")
.withDescription(
- "When "
- + CoreOptions.CHANGELOG_PRODUCER.key()
- + " is set to "
- + ChangelogProducer.LOOKUP.name()
- + ", commit will wait for changelog
generation by lookup.");
+ "When need to lookup, commit will wait for
compaction by lookup.");
public static final ConfigOption<Boolean> METADATA_ICEBERG_COMPATIBLE =
key("metadata.iceberg-compatible")
@@ -2036,8 +2033,11 @@ public class CoreOptions implements Serializable {
}
public boolean prepareCommitWaitCompaction() {
- return changelogProducer() == ChangelogProducer.LOOKUP
- && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
+ if (!needLookup()) {
+ return false;
+ }
+
+ return options.get(LOOKUP_WAIT);
}
public boolean metadataIcebergCompatible() {
diff --git a/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java
b/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java
index b8d134916..bf5445fc1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java
@@ -64,4 +64,26 @@ public class CoreOptionsTest {
assertThat(new CoreOptions(conf).startupMode())
.isEqualTo(CoreOptions.StartupMode.LATEST_FULL);
}
+
+ @Test
+ public void testPrepareCommitWaitCompaction() {
+ Options conf = new Options();
+ CoreOptions options = new CoreOptions(conf);
+
+ assertThat(options.prepareCommitWaitCompaction()).isFalse();
+
+ conf.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
+ assertThat(options.prepareCommitWaitCompaction()).isTrue();
+ conf.remove(CoreOptions.DELETION_VECTORS_ENABLED.key());
+
+ conf.set(CoreOptions.MERGE_ENGINE, CoreOptions.MergeEngine.FIRST_ROW);
+ assertThat(options.prepareCommitWaitCompaction()).isTrue();
+ conf.remove(CoreOptions.MERGE_ENGINE.key());
+
+ conf.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.LOOKUP);
+ assertThat(options.prepareCommitWaitCompaction()).isTrue();
+
+ conf.set(CoreOptions.LOOKUP_WAIT, false);
+ assertThat(options.prepareCommitWaitCompaction()).isFalse();
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index f369ec31c..865b2a939 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -134,8 +134,7 @@ public abstract class FlinkSink<T> implements Serializable {
}
}
- if (changelogProducer == ChangelogProducer.LOOKUP
- && !coreOptions.prepareCommitWaitCompaction()) {
+ if (coreOptions.needLookup() &&
!coreOptions.prepareCommitWaitCompaction()) {
return (table, commitUser, state, ioManager, memoryPool,
metricGroup) -> {
assertNoSinkMaterializer.run();
return new AsyncLookupSinkWrite(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index b25ec0dfc..f253a3bf8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -275,8 +275,7 @@ public class MultiTablesStoreCompactOperator
}
}
- if (changelogProducer == CoreOptions.ChangelogProducer.LOOKUP
- && !coreOptions.prepareCommitWaitCompaction()) {
+ if (coreOptions.needLookup() &&
!coreOptions.prepareCommitWaitCompaction()) {
return (table, commitUser, state, ioManager, memoryPool,
metricGroup) ->
new AsyncLookupSinkWrite(
table,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 0d6f37ce6..1a8da3c5b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -487,8 +487,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
"'write-buffer-size' = '%s',",
random.nextBoolean() ? "512kb" : "1mb")
+ "'changelog-producer' = 'lookup',"
- + String.format(
- "'changelog-producer.lookup-wait' = '%s',",
random.nextBoolean())
+ + String.format("'lookup-wait' = '%s',",
random.nextBoolean())
+ String.format(
"'deletion-vectors.enabled' = '%s'",
enableDeletionVectors));
@@ -551,8 +550,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
"'write-buffer-size' = '%s',",
random.nextBoolean() ? "512kb" : "1mb")
+ "'changelog-producer' = 'lookup',"
- + String.format(
- "'changelog-producer.lookup-wait' = '%s',",
random.nextBoolean())
+ + String.format("'lookup-wait' = '%s',",
random.nextBoolean())
+ "'write-only' = 'true'");
// sleep for a random amount of time to check