This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e26fb64b9 [core] Lookup wait should affect deletion vectors mode and 
first row engine (#3782)
e26fb64b9 is described below

commit e26fb64b9dc95ec4bceacd94a1794d3c8bd7e3da
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 18 22:16:59 2024 +0800

    [core] Lookup wait should affect deletion vectors mode and first row engine 
(#3782)
---
 docs/content/primary-key-table/compaction.md       |  5 +----
 .../main/java/org/apache/paimon/CoreOptions.java   | 18 +++++++++---------
 .../java/org/apache/paimon/CoreOptionsTest.java    | 22 ++++++++++++++++++++++
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  3 +--
 .../sink/MultiTablesStoreCompactOperator.java      |  3 +--
 .../flink/PrimaryKeyFileStoreTableITCase.java      |  6 ++----
 6 files changed, 36 insertions(+), 21 deletions(-)

diff --git a/docs/content/primary-key-table/compaction.md 
b/docs/content/primary-key-table/compaction.md
index c7c866581..e5040a03a 100644
--- a/docs/content/primary-key-table/compaction.md
+++ b/docs/content/primary-key-table/compaction.md
@@ -59,15 +59,12 @@ You can use the following strategies for your table:
 ```shell
 num-sorted-run.stop-trigger = 2147483647
 sort-spill-threshold = 10
-changelog-producer.lookup-wait = false
+lookup-wait = false
 ```
 
 This configuration will generate more files during peak write periods and 
gradually merge into optimal read
 performance during low write periods.
 
-In the case of `'changelog-producer' = 'lookup'`, by default, the lookup will 
be completed at checkpointing, which
-will block the checkpoint. So if you want an asynchronous lookup, you should 
also set `'changelog-producer.lookup-wait' = 'false'`.
-
 ## Dedicated compaction job
 
 In general, if you expect multiple jobs to be written to the same table, you 
need to separate the compaction. You can
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 5b2f8aa77..b368b483a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1279,16 +1279,13 @@ public class CoreOptions implements Serializable {
                     .noDefaultValue()
                     .withDescription("Specifies the commit user prefix.");
 
-    public static final ConfigOption<Boolean> CHANGELOG_PRODUCER_LOOKUP_WAIT =
-            key("changelog-producer.lookup-wait")
+    public static final ConfigOption<Boolean> LOOKUP_WAIT =
+            key("lookup-wait")
                     .booleanType()
                     .defaultValue(true)
+                    .withFallbackKeys("changelog-producer.lookup-wait")
                     .withDescription(
-                            "When "
-                                    + CoreOptions.CHANGELOG_PRODUCER.key()
-                                    + " is set to "
-                                    + ChangelogProducer.LOOKUP.name()
-                                    + ", commit will wait for changelog 
generation by lookup.");
+                            "When need to lookup, commit will wait for 
compaction by lookup.");
 
     public static final ConfigOption<Boolean> METADATA_ICEBERG_COMPATIBLE =
             key("metadata.iceberg-compatible")
@@ -2036,8 +2033,11 @@ public class CoreOptions implements Serializable {
     }
 
     public boolean prepareCommitWaitCompaction() {
-        return changelogProducer() == ChangelogProducer.LOOKUP
-                && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
+        if (!needLookup()) {
+            return false;
+        }
+
+        return options.get(LOOKUP_WAIT);
     }
 
     public boolean metadataIcebergCompatible() {
diff --git a/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java
index b8d134916..bf5445fc1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java
@@ -64,4 +64,26 @@ public class CoreOptionsTest {
         assertThat(new CoreOptions(conf).startupMode())
                 .isEqualTo(CoreOptions.StartupMode.LATEST_FULL);
     }
+
+    @Test
+    public void testPrepareCommitWaitCompaction() {
+        Options conf = new Options();
+        CoreOptions options = new CoreOptions(conf);
+
+        assertThat(options.prepareCommitWaitCompaction()).isFalse();
+
+        conf.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
+        assertThat(options.prepareCommitWaitCompaction()).isTrue();
+        conf.remove(CoreOptions.DELETION_VECTORS_ENABLED.key());
+
+        conf.set(CoreOptions.MERGE_ENGINE, CoreOptions.MergeEngine.FIRST_ROW);
+        assertThat(options.prepareCommitWaitCompaction()).isTrue();
+        conf.remove(CoreOptions.MERGE_ENGINE.key());
+
+        conf.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.LOOKUP);
+        assertThat(options.prepareCommitWaitCompaction()).isTrue();
+
+        conf.set(CoreOptions.LOOKUP_WAIT, false);
+        assertThat(options.prepareCommitWaitCompaction()).isFalse();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index f369ec31c..865b2a939 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -134,8 +134,7 @@ public abstract class FlinkSink<T> implements Serializable {
             }
         }
 
-        if (changelogProducer == ChangelogProducer.LOOKUP
-                && !coreOptions.prepareCommitWaitCompaction()) {
+        if (coreOptions.needLookup() && 
!coreOptions.prepareCommitWaitCompaction()) {
             return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) -> {
                 assertNoSinkMaterializer.run();
                 return new AsyncLookupSinkWrite(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index b25ec0dfc..f253a3bf8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -275,8 +275,7 @@ public class MultiTablesStoreCompactOperator
             }
         }
 
-        if (changelogProducer == CoreOptions.ChangelogProducer.LOOKUP
-                && !coreOptions.prepareCommitWaitCompaction()) {
+        if (coreOptions.needLookup() && 
!coreOptions.prepareCommitWaitCompaction()) {
             return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
                     new AsyncLookupSinkWrite(
                             table,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 0d6f37ce6..1a8da3c5b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -487,8 +487,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                                 "'write-buffer-size' = '%s',",
                                 random.nextBoolean() ? "512kb" : "1mb")
                         + "'changelog-producer' = 'lookup',"
-                        + String.format(
-                                "'changelog-producer.lookup-wait' = '%s',", 
random.nextBoolean())
+                        + String.format("'lookup-wait' = '%s',", 
random.nextBoolean())
                         + String.format(
                                 "'deletion-vectors.enabled' = '%s'", 
enableDeletionVectors));
 
@@ -551,8 +550,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                                 "'write-buffer-size' = '%s',",
                                 random.nextBoolean() ? "512kb" : "1mb")
                         + "'changelog-producer' = 'lookup',"
-                        + String.format(
-                                "'changelog-producer.lookup-wait' = '%s',", 
random.nextBoolean())
+                        + String.format("'lookup-wait' = '%s',", 
random.nextBoolean())
                         + "'write-only' = 'true'");
 
         // sleep for a random amount of time to check

Reply via email to