This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.8
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 46d9f3d5ce74c14af5144a4e00ec80275f90510a
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri May 31 17:31:35 2024 +0800

    [core] Fisrt row merge engine supports none changelog producer (#3452)
---
 docs/content/primary-key-table/merge-engine.md     |  5 ++--
 .../main/java/org/apache/paimon/CoreOptions.java   |  3 ++-
 .../org/apache/paimon/lookup/LookupStrategy.java   | 31 +++++++---------------
 .../paimon/operation/KeyValueFileStoreWrite.java   |  7 +++--
 .../org/apache/paimon/schema/SchemaValidation.java |  5 ++--
 .../table/source/InnerStreamTableScanImpl.java     |  3 ++-
 .../LookupChangelogMergeFunctionWrapperTest.java   |  6 ++---
 .../org/apache/paimon/flink/FirstRowITCase.java    | 24 ++++++++---------
 8 files changed, 36 insertions(+), 48 deletions(-)

diff --git a/docs/content/primary-key-table/merge-engine.md 
b/docs/content/primary-key-table/merge-engine.md
index bc6c1ee35..24caa5b43 100644
--- a/docs/content/primary-key-table/merge-engine.md
+++ b/docs/content/primary-key-table/merge-engine.md
@@ -326,9 +326,8 @@ By specifying `'merge-engine' = 'first-row'`, users can 
keep the first row of th
 `deduplicate` merge engine that in the `first-row` merge engine, it will 
generate insert only changelog.
 
 {{< hint info >}}
-1. `first-row` merge engine must be used together with `lookup` [changelog 
producer]({{< ref "primary-key-table/changelog-producer" >}}).
-2. You can not specify `sequence.field`.
-3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config 
`ignore-delete` to ignore these two kinds records.
+1. You can not specify `sequence.field`.
+2. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config 
`ignore-delete` to ignore these two kinds records.
    {{< /hint >}}
 
 This is of great help in replacing log deduplication in streaming computation.
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 63832638b..d88b9515a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1489,7 +1489,8 @@ public class CoreOptions implements Serializable {
 
     public LookupStrategy lookupStrategy() {
         return LookupStrategy.from(
-                
options.get(CHANGELOG_PRODUCER).equals(ChangelogProducer.LOOKUP),
+                mergeEngine().equals(MergeEngine.FIRST_ROW),
+                changelogProducer().equals(ChangelogProducer.LOOKUP),
                 deletionVectorsEnabled());
     }
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java
index 6c709bcae..24e03ebdb 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java
@@ -19,38 +19,25 @@
 package org.apache.paimon.lookup;
 
 /** Strategy for lookup. */
-public enum LookupStrategy {
-    NO_LOOKUP(false, false),
-
-    CHANGELOG_ONLY(true, false),
-
-    DELETION_VECTOR_ONLY(false, true),
-
-    CHANGELOG_AND_DELETION_VECTOR(true, true);
+public class LookupStrategy {
 
     public final boolean needLookup;
 
+    public final boolean isFirstRow;
+
     public final boolean produceChangelog;
 
     public final boolean deletionVector;
 
-    LookupStrategy(boolean produceChangelog, boolean deletionVector) {
+    private LookupStrategy(boolean isFirstRow, boolean produceChangelog, 
boolean deletionVector) {
+        this.isFirstRow = isFirstRow;
         this.produceChangelog = produceChangelog;
         this.deletionVector = deletionVector;
-        this.needLookup = produceChangelog || deletionVector;
+        this.needLookup = produceChangelog || deletionVector || isFirstRow;
     }
 
-    public static LookupStrategy from(boolean produceChangelog, boolean 
deletionVector) {
-        for (LookupStrategy strategy : values()) {
-            if (strategy.produceChangelog == produceChangelog
-                    && strategy.deletionVector == deletionVector) {
-                return strategy;
-            }
-        }
-        throw new IllegalArgumentException(
-                "Invalid combination of produceChangelog : "
-                        + produceChangelog
-                        + " and deletionVector : "
-                        + deletionVector);
+    public static LookupStrategy from(
+            boolean isFirstRow, boolean produceChangelog, boolean 
deletionVector) {
+        return new LookupStrategy(isFirstRow, produceChangelog, 
deletionVector);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index b7702ef8a..1bdc3042d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -83,7 +83,6 @@ import java.util.function.Supplier;
 
 import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
 import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
-import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator;
 
 /** {@link FileStoreWrite} for {@link KeyValueFileStore}. */
@@ -273,6 +272,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
         int maxLevel = options.numLevels() - 1;
         MergeEngine mergeEngine = options.mergeEngine();
         ChangelogProducer changelogProducer = options.changelogProducer();
+        LookupStrategy lookupStrategy = options.lookupStrategy();
         if (changelogProducer.equals(FULL_COMPACTION)) {
             return new FullChangelogMergeTreeCompactRewriter(
                     maxLevel,
@@ -285,12 +285,11 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                     mergeSorter,
                     valueEqualiserSupplier.get(),
                     options.changelogRowDeduplicate());
-        } else if (options.needLookup()) {
-            LookupStrategy lookupStrategy = options.lookupStrategy();
+        } else if (lookupStrategy.needLookup) {
             LookupLevels.ValueProcessor<?> processor;
             LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory<?> 
wrapperFactory;
             FileReaderFactory<KeyValue> lookupReaderFactory = readerFactory;
-            if (mergeEngine == FIRST_ROW) {
+            if (lookupStrategy.isFirstRow) {
                 if (options.deletionVectorsEnabled()) {
                     throw new UnsupportedOperationException(
                             "First row merge engine does not need deletion 
vectors because there is no deletion of old data in this merge engine.");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index fab5f31fd..b87dac065 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -171,9 +171,10 @@ public class SchemaValidation {
         }
 
         if (options.mergeEngine() == MergeEngine.FIRST_ROW) {
-            if (options.changelogProducer() != ChangelogProducer.LOOKUP) {
+            if (options.changelogProducer() != ChangelogProducer.LOOKUP
+                    && options.changelogProducer() != ChangelogProducer.NONE) {
                 throw new IllegalArgumentException(
-                        "Only support 'lookup' changelog-producer on 
FIRST_MERGE merge engine");
+                        "Only support 'none' and 'lookup' changelog-producer 
on FIRST_MERGE merge engine");
             }
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 3e03dc0a1..b2c74a83d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -145,7 +145,8 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
             ScannedResult scannedResult = (ScannedResult) result;
             currentWatermark = scannedResult.currentWatermark();
             long currentSnapshotId = scannedResult.currentSnapshotId();
-            if 
(options.lookupStrategy().equals(LookupStrategy.DELETION_VECTOR_ONLY)) {
+            LookupStrategy lookupStrategy = options.lookupStrategy();
+            if (!lookupStrategy.produceChangelog && 
lookupStrategy.deletionVector) {
                 // For DELETION_VECTOR_ONLY mode, we need to return the 
remaining data from level 0
                 // in the subsequent plan.
                 nextSnapshotId = currentSnapshotId;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index d43be8aff..dbee0805a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -73,7 +73,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
                         highLevel::get,
                         EQUALISER,
                         changelogRowDeduplicate,
-                        LookupStrategy.CHANGELOG_ONLY,
+                        LookupStrategy.from(false, true, false),
                         null,
                         null);
 
@@ -233,7 +233,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
                         key -> null,
                         EQUALISER,
                         changelogRowDeduplicate,
-                        LookupStrategy.CHANGELOG_ONLY,
+                        LookupStrategy.from(false, true, false),
                         null,
                         null);
 
@@ -322,7 +322,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
                         highLevel::get,
                         EQUALISER,
                         false,
-                        LookupStrategy.CHANGELOG_ONLY,
+                        LookupStrategy.from(false, true, false),
                         null,
                         UserDefinedSeqComparator.create(
                                 RowType.builder().field("f0", 
DataTypes.INT()).build(),
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
index 5d7927a97..e44aa4c63 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
@@ -31,7 +31,6 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for first row merge engine. */
 public class FirstRowITCase extends CatalogITCaseBase {
@@ -45,23 +44,24 @@ public class FirstRowITCase extends CatalogITCaseBase {
     }
 
     @Test
-    public void testIllegal() {
-        assertThatThrownBy(
-                        () ->
-                                sql(
-                                        "CREATE TABLE ILLEGAL_T (a INT, b INT, 
c STRING, PRIMARY KEY (a) NOT ENFORCED)"
-                                                + " WITH 
('merge-engine'='first-row')"))
-                .hasRootCauseMessage(
-                        "Only support 'lookup' changelog-producer on 
FIRST_MERGE merge engine");
+    public void testBatchQueryNoChangelog() {
+        sql(
+                "CREATE TABLE T_NO_CHANGELOG (a INT, b INT, c STRING, PRIMARY 
KEY (a) NOT ENFORCED)"
+                        + " WITH ('merge-engine'='first-row')");
+        testBatchQuery("T_NO_CHANGELOG");
     }
 
     @Test
     public void testBatchQuery() {
-        batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
-        List<Row> result = batchSql("SELECT * FROM T");
+        testBatchQuery("T");
+    }
+
+    private void testBatchQuery(String table) {
+        batchSql("INSERT INTO %s VALUES (1, 1, '1'), (1, 2, '2')", table);
+        List<Row> result = batchSql("SELECT * FROM %s", table);
         
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 1, 1, 
"1"));
 
-        result = batchSql("SELECT c FROM T");
+        result = batchSql("SELECT c FROM %s", table);
         
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1"));
     }
 

Reply via email to