This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8aaa687e1 [core] Optimize First row merge engine with lookup (#1644)
8aaa687e1 is described below

commit 8aaa687e1d97e99d173dca9f50b630b7914f5422
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jul 26 10:06:03 2023 +0800

    [core] Optimize First row merge engine with lookup (#1644)
---
 docs/content/concepts/primary-key-table.md         |  13 +-
 .../org/apache/paimon/io/cache/CacheManager.java   |  23 +--
 .../org/apache/paimon/utils/BiFunctionWithIOE.java |  35 ++++
 .../paimon/io/KeyValueFileReaderFactory.java       |  12 ++
 .../apache/paimon/mergetree/ContainsLevels.java    | 194 +++++++++++++++++++++
 .../org/apache/paimon/mergetree/LookupLevels.java  |  96 +++-------
 .../org/apache/paimon/mergetree/LookupUtils.java   | 103 +++++++++++
 .../compact/ChangelogMergeTreeRewriter.java        |  17 ++
 ....java => FirstRowMergeTreeCompactRewriter.java} |  92 +++++++---
 .../compact/FullChangelogMergeFunctionWrapper.java |   6 -
 .../LookupChangelogMergeFunctionWrapper.java       |  10 +-
 .../mergetree/compact/LookupMergeFunction.java     |  11 +-
 .../compact/LookupMergeTreeCompactRewriter.java    |  15 +-
 .../paimon/operation/KeyValueFileStoreWrite.java   |  40 +++++
 .../org/apache/paimon/schema/SchemaValidation.java |  24 ++-
 ...okupLevelsTest.java => ContainsLevelsTest.java} |  66 +++----
 .../apache/paimon/mergetree/LookupLevelsTest.java  |   8 +-
 .../FullChangelogMergeFunctionWrapperTestBase.java | 122 +------------
 .../LookupChangelogMergeFunctionWrapperTest.java   |  38 ++--
 .../mergetree/compact/SortMergeReaderTestBase.java |   7 +-
 .../mergetree/compact/UniversalCompactionTest.java |  26 +++
 .../paimon/flink/lookup/RocksDBListState.java      |  32 ++--
 .../apache/paimon/flink/lookup/RocksDBState.java   |  11 +-
 .../org/apache/paimon/flink/FirstRowITCase.java    |  71 +++-----
 .../apache/paimon/flink/ReadWriteTableITCase.java  |  18 ++
 25 files changed, 675 insertions(+), 415 deletions(-)

diff --git a/docs/content/concepts/primary-key-table.md 
b/docs/content/concepts/primary-key-table.md
index 868497995..d79bd0718 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -216,15 +216,12 @@ For streaming queries, `aggregation` merge engine must be 
used together with `lo
 
 ### First Row
 
-By specifying `'merge-engine' = 'first-row'`, users can keep the first row of 
the same primary key. It differs from the `deduplicate` merge engine that in 
the `first-row` merge engine, it will generate insert only changelog. 
+By specifying `'merge-engine' = 'first-row'`, users can keep the first row of 
the same primary key. It differs from the
+`deduplicate` merge engine that in the `first-row` merge engine, it will 
generate insert only changelog. 
 
-{{< hint info >}}
-For streaming queries, `first-row` merge engine must be used together with 
`lookup` or `full-compaction` [changelog producer]({{< ref 
"concepts/primary-key-table#changelog-producers" >}}).
-{{< /hint >}}
-
-{{< hint info >}}
-Currently, only the first row of insert order supported, so you can not 
specify `sequence.field` for this merge engine. And also not accept `DELETE` 
and `UPDATE_BEFORE` message.
-{{< /hint>}}
+1. `first-row` merge engine must be used together with `lookup` [changelog 
producer]({{< ref "concepts/primary-key-table#changelog-producers" >}}).
+2. You can not specify `sequence.field`.
+3. Not accept `DELETE` and `UPDATE_BEFORE` message.
 
 ## Changelog Producers
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java 
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
index 92d9bf296..d1eece7bf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
@@ -61,17 +61,15 @@ public class CacheManager {
     public MemorySegment getPage(
             RandomAccessFile file, int pageNumber, Consumer<Integer> 
cleanCallback) {
         CacheKey key = new CacheKey(file, pageNumber);
-        CacheValue value;
-        value =
-                cache.get(
-                        key,
-                        cacheKey -> {
-                            try {
-                                return createValue(key, cleanCallback);
-                            } catch (IOException e) {
-                                throw new RuntimeException(e);
-                            }
-                        });
+        CacheValue value = cache.getIfPresent(key);
+        while (value == null || value.isClosed) {
+            try {
+                value = createValue(key, cleanCallback);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            cache.put(key, value);
+        }
         return value.segment;
     }
 
@@ -84,6 +82,7 @@ public class CacheManager {
     }
 
     private void onRemoval(CacheKey key, CacheValue value, RemovalCause cause) 
{
+        value.isClosed = true;
         value.cleanCallback.accept(key.pageNumber);
     }
 
@@ -135,6 +134,8 @@ public class CacheManager {
         private final MemorySegment segment;
         private final Consumer<Integer> cleanCallback;
 
+        private boolean isClosed = false;
+
         private CacheValue(MemorySegment segment, Consumer<Integer> 
cleanCallback) {
             this.segment = segment;
             this.cleanCallback = cleanCallback;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/BiFunctionWithIOE.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/BiFunctionWithIOE.java
new file mode 100644
index 000000000..bd5f074d6
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/BiFunctionWithIOE.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.IOException;
+
+/** A bi function with {@link IOException}. */
+@FunctionalInterface
+public interface BiFunctionWithIOE<T, U, R> {
+
+    /**
+     * Applies this function to the given arguments.
+     *
+     * @param t the first function argument
+     * @param u the second function argument
+     * @return the function result
+     */
+    R apply(T t, U u) throws IOException;
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index e9413715f..71fbbec9a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -157,6 +157,18 @@ public class KeyValueFileReaderFactory {
             applyProjection();
         }
 
+        public Builder copyWithoutProjection() {
+            return new Builder(
+                    fileIO,
+                    schemaManager,
+                    schemaId,
+                    keyType,
+                    valueType,
+                    formatDiscover,
+                    pathFactory,
+                    extractor);
+        }
+
         public Builder withKeyProjection(int[][] projection) {
             keyProjection = projection;
             applyProjection();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/ContainsLevels.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/ContainsLevels.java
new file mode 100644
index 000000000..953f7a85f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/ContainsLevels.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.lookup.LookupStoreFactory;
+import org.apache.paimon.lookup.LookupStoreReader;
+import org.apache.paimon.lookup.LookupStoreWriter;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileIOUtils;
+import org.apache.paimon.utils.IOFunction;
+
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
+import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.Comparator;
+import java.util.function.Supplier;
+
+import static org.apache.paimon.mergetree.LookupUtils.fileKibiBytes;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Provide contains key. */
+public class ContainsLevels implements Levels.DropFileCallback, Closeable {
+
+    private static final byte[] EMPTY_VALUE = new byte[0];
+
+    private final Levels levels;
+    private final Comparator<InternalRow> keyComparator;
+    private final RowCompactedSerializer keySerializer;
+    private final IOFunction<DataFileMeta, RecordReader<KeyValue>> 
fileReaderFactory;
+    private final Supplier<File> localFileFactory;
+    private final LookupStoreFactory lookupStoreFactory;
+
+    private final Cache<String, ContainsFile> containsFiles;
+
+    public ContainsLevels(
+            Levels levels,
+            Comparator<InternalRow> keyComparator,
+            RowType keyType,
+            IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
+            Supplier<File> localFileFactory,
+            LookupStoreFactory lookupStoreFactory,
+            Duration fileRetention,
+            MemorySize maxDiskSize) {
+        this.levels = levels;
+        this.keyComparator = keyComparator;
+        this.keySerializer = new RowCompactedSerializer(keyType);
+        this.fileReaderFactory = fileReaderFactory;
+        this.localFileFactory = localFileFactory;
+        this.lookupStoreFactory = lookupStoreFactory;
+        this.containsFiles =
+                Caffeine.newBuilder()
+                        .expireAfterAccess(fileRetention)
+                        .maximumWeight(maxDiskSize.getKibiBytes())
+                        .weigher(this::fileWeigh)
+                        .removalListener(this::removalCallback)
+                        .executor(MoreExecutors.directExecutor())
+                        .build();
+        levels.addDropFileCallback(this);
+    }
+
+    @VisibleForTesting
+    Cache<String, ContainsFile> containsFiles() {
+        return containsFiles;
+    }
+
+    @Override
+    public void notifyDropFile(String file) {
+        containsFiles.invalidate(file);
+    }
+
+    public boolean contains(InternalRow key, int startLevel) throws 
IOException {
+        Boolean result = LookupUtils.lookup(levels, key, startLevel, 
this::contains);
+        return result != null && result;
+    }
+
+    @Nullable
+    private Boolean contains(InternalRow key, SortedRun level) throws 
IOException {
+        return LookupUtils.lookup(keyComparator, key, level, this::contains);
+    }
+
+    @Nullable
+    private Boolean contains(InternalRow key, DataFileMeta file) throws 
IOException {
+        ContainsFile containsFile = 
containsFiles.getIfPresent(file.fileName());
+        while (containsFile == null || containsFile.isClosed) {
+            containsFile = createContainsFile(file);
+            containsFiles.put(file.fileName(), containsFile);
+        }
+        if (containsFile.get(keySerializer.serializeToBytes(key)) != null) {
+            return true;
+        }
+        return null;
+    }
+
+    private int fileWeigh(String file, ContainsFile containsFile) {
+        return fileKibiBytes(containsFile.localFile);
+    }
+
+    private void removalCallback(String key, ContainsFile file, RemovalCause 
cause) {
+        if (file != null) {
+            try {
+                file.close();
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        }
+    }
+
+    private ContainsFile createContainsFile(DataFileMeta file) throws 
IOException {
+        File localFile = localFileFactory.get();
+        if (!localFile.createNewFile()) {
+            throw new IOException("Can not create new file: " + localFile);
+        }
+        try (LookupStoreWriter kvWriter = 
lookupStoreFactory.createWriter(localFile);
+                RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) 
{
+            RecordReader.RecordIterator<KeyValue> batch;
+            KeyValue kv;
+            while ((batch = reader.readBatch()) != null) {
+                while ((kv = batch.next()) != null) {
+                    byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
+                    kvWriter.put(keyBytes, EMPTY_VALUE);
+                }
+                batch.releaseBatch();
+            }
+        } catch (IOException e) {
+            FileIOUtils.deleteFileOrDirectory(localFile);
+            throw e;
+        }
+
+        return new ContainsFile(localFile, 
lookupStoreFactory.createReader(localFile));
+    }
+
+    @Override
+    public void close() throws IOException {
+        containsFiles.invalidateAll();
+    }
+
+    private static class ContainsFile implements Closeable {
+
+        private final File localFile;
+        private final LookupStoreReader reader;
+
+        private boolean isClosed = false;
+
+        public ContainsFile(File localFile, LookupStoreReader reader) {
+            this.localFile = localFile;
+            this.reader = reader;
+        }
+
+        @Nullable
+        public byte[] get(byte[] key) throws IOException {
+            checkArgument(!isClosed);
+            return reader.lookup(key);
+        }
+
+        @Override
+        public void close() throws IOException {
+            reader.close();
+            isClosed = true;
+            FileIOUtils.deleteFileOrDirectory(localFile);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index 444e86e95..4fee4b9d8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -35,9 +35,10 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.IOFunction;
 
-import org.apache.paimon.shade.guava30.com.google.common.cache.Cache;
-import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;
-import 
org.apache.paimon.shade.guava30.com.google.common.cache.RemovalNotification;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
+import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
 
 import javax.annotation.Nullable;
 
@@ -47,10 +48,11 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.time.Duration;
 import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
 
+import static org.apache.paimon.mergetree.LookupUtils.fileKibiBytes;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /** Provide lookup by key. */
 public class LookupLevels implements Levels.DropFileCallback, Closeable {
 
@@ -82,11 +84,12 @@ public class LookupLevels implements 
Levels.DropFileCallback, Closeable {
         this.localFileFactory = localFileFactory;
         this.lookupStoreFactory = lookupStoreFactory;
         this.lookupFiles =
-                CacheBuilder.newBuilder()
+                Caffeine.newBuilder()
                         .expireAfterAccess(fileRetention)
                         .maximumWeight(maxDiskSize.getKibiBytes())
                         .weigher(this::fileWeigh)
                         .removalListener(this::removalCallback)
+                        .executor(MoreExecutors.directExecutor())
                         .build();
         levels.addDropFileCallback(this);
     }
@@ -103,65 +106,22 @@ public class LookupLevels implements 
Levels.DropFileCallback, Closeable {
 
     @Nullable
     public KeyValue lookup(InternalRow key, int startLevel) throws IOException 
{
-        if (startLevel == 0) {
-            throw new IllegalArgumentException("Start level can not be zero.");
-        }
-
-        KeyValue kv = null;
-        for (int i = startLevel; i < levels.numberOfLevels(); i++) {
-            SortedRun level = levels.runOfLevel(i);
-            kv = lookup(key, level);
-            if (kv != null) {
-                break;
-            }
-        }
-
-        return kv;
+        return LookupUtils.lookup(levels, key, startLevel, this::lookup);
     }
 
     @Nullable
-    private KeyValue lookup(InternalRow target, SortedRun level) throws 
IOException {
-        List<DataFileMeta> files = level.files();
-        int left = 0;
-        int right = files.size() - 1;
-
-        // binary search restart positions to find the restart position 
immediately before the
-        // targetKey
-        while (left < right) {
-            int mid = (left + right) / 2;
-
-            if (keyComparator.compare(files.get(mid).maxKey(), target) < 0) {
-                // Key at "mid.max" is < "target".  Therefore all
-                // files at or before "mid" are uninteresting.
-                left = mid + 1;
-            } else {
-                // Key at "mid.max" is >= "target".  Therefore all files
-                // after "mid" are uninteresting.
-                right = mid;
-            }
-        }
-
-        int index = right;
-
-        // if the index is now pointing to the last file, check if the largest 
key in the block is
-        // than the target key.  If so, we need to seek beyond the end of this 
file
-        if (index == files.size() - 1
-                && keyComparator.compare(files.get(index).maxKey(), target) < 
0) {
-            index++;
-        }
-
-        // if files does not have a next, it means the key does not exist in 
this level
-        return index < files.size() ? lookup(target, files.get(index)) : null;
+    private KeyValue lookup(InternalRow key, SortedRun level) throws 
IOException {
+        return LookupUtils.lookup(keyComparator, key, level, this::lookup);
     }
 
     @Nullable
     private KeyValue lookup(InternalRow key, DataFileMeta file) throws 
IOException {
-        LookupFile lookupFile;
-        try {
-            lookupFile = lookupFiles.get(file.fileName(), () -> 
createLookupFile(file));
-        } catch (ExecutionException e) {
-            throw new IOException(e);
+        LookupFile lookupFile = lookupFiles.getIfPresent(file.fileName());
+        while (lookupFile == null || lookupFile.isClosed) {
+            lookupFile = createLookupFile(file);
+            lookupFiles.put(file.fileName(), lookupFile);
         }
+
         byte[] keyBytes = keySerializer.serializeToBytes(key);
         byte[] valueBytes = lookupFile.get(keyBytes);
         if (valueBytes == null) {
@@ -176,14 +136,13 @@ public class LookupLevels implements 
Levels.DropFileCallback, Closeable {
     }
 
     private int fileWeigh(String file, LookupFile lookupFile) {
-        return lookupFile.fileKibiBytes();
+        return fileKibiBytes(lookupFile.localFile);
     }
 
-    private void removalCallback(RemovalNotification<String, LookupFile> 
notification) {
-        LookupFile reader = notification.getValue();
-        if (reader != null) {
+    private void removalCallback(String key, LookupFile file, RemovalCause 
cause) {
+        if (file != null) {
             try {
-                reader.close();
+                file.close();
             } catch (IOException e) {
                 throw new UncheckedIOException(e);
             }
@@ -231,6 +190,8 @@ public class LookupLevels implements 
Levels.DropFileCallback, Closeable {
         private final DataFileMeta remoteFile;
         private final LookupStoreReader reader;
 
+        private boolean isClosed = false;
+
         public LookupFile(File localFile, DataFileMeta remoteFile, 
LookupStoreReader reader) {
             this.localFile = localFile;
             this.remoteFile = remoteFile;
@@ -239,18 +200,10 @@ public class LookupLevels implements 
Levels.DropFileCallback, Closeable {
 
         @Nullable
         public byte[] get(byte[] key) throws IOException {
+            checkArgument(!isClosed);
             return reader.lookup(key);
         }
 
-        public int fileKibiBytes() {
-            long kibiBytes = localFile.length() >> 10;
-            if (kibiBytes > Integer.MAX_VALUE) {
-                throw new RuntimeException(
-                        "Lookup file is too big: " + 
MemorySize.ofKibiBytes(kibiBytes));
-            }
-            return (int) kibiBytes;
-        }
-
         public DataFileMeta remoteFile() {
             return remoteFile;
         }
@@ -258,6 +211,7 @@ public class LookupLevels implements 
Levels.DropFileCallback, Closeable {
         @Override
         public void close() throws IOException {
             reader.close();
+            isClosed = true;
             FileIOUtils.deleteFileOrDirectory(localFile);
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupUtils.java
new file mode 100644
index 000000000..977d7ab9f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.utils.BiFunctionWithIOE;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+
+/** Utils for lookup. */
+public class LookupUtils {
+
+    public static <T> T lookup(
+            Levels levels,
+            InternalRow key,
+            int startLevel,
+            BiFunctionWithIOE<InternalRow, SortedRun, T> lookup)
+            throws IOException {
+        if (startLevel == 0) {
+            throw new IllegalArgumentException("Start level can not be zero.");
+        }
+
+        T result = null;
+        for (int i = startLevel; i < levels.numberOfLevels(); i++) {
+            SortedRun level = levels.runOfLevel(i);
+            result = lookup.apply(key, level);
+            if (result != null) {
+                break;
+            }
+        }
+
+        return result;
+    }
+
+    public static <T> T lookup(
+            Comparator<InternalRow> keyComparator,
+            InternalRow target,
+            SortedRun level,
+            BiFunctionWithIOE<InternalRow, DataFileMeta, T> lookup)
+            throws IOException {
+        List<DataFileMeta> files = level.files();
+        int left = 0;
+        int right = files.size() - 1;
+
+        // binary search restart positions to find the restart position 
immediately before the
+        // targetKey
+        while (left < right) {
+            int mid = (left + right) / 2;
+
+            if (keyComparator.compare(files.get(mid).maxKey(), target) < 0) {
+                // Key at "mid.max" is < "target".  Therefore all
+                // files at or before "mid" are uninteresting.
+                left = mid + 1;
+            } else {
+                // Key at "mid.max" is >= "target".  Therefore all files
+                // after "mid" are uninteresting.
+                right = mid;
+            }
+        }
+
+        int index = right;
+
+        // if the index is now pointing to the last file, check if the largest 
key in the block is
+        // than the target key.  If so, we need to seek beyond the end of this 
file
+        if (index == files.size() - 1
+                && keyComparator.compare(files.get(index).maxKey(), target) < 
0) {
+            index++;
+        }
+
+        // if files does not have a next, it means the key does not exist in 
this level
+        return index < files.size() ? lookup.apply(target, files.get(index)) : 
null;
+    }
+
+    public static int fileKibiBytes(File file) {
+        long kibiBytes = file.length() >> 10;
+        if (kibiBytes > Integer.MAX_VALUE) {
+            throw new RuntimeException(
+                    "Lookup file is too big: " + 
MemorySize.ofKibiBytes(kibiBytes));
+        }
+        return (int) kibiBytes;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index 84d884483..efdca7944 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -62,6 +62,23 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
 
     protected abstract MergeFunctionWrapper<ChangelogResult> 
createMergeWrapper(int outputLevel);
 
+    protected boolean rewriteLookupChangelog(int outputLevel, 
List<List<SortedRun>> sections) {
+        if (outputLevel == 0) {
+            return false;
+        }
+
+        for (List<SortedRun> runs : sections) {
+            for (SortedRun run : runs) {
+                for (DataFileMeta file : run.files()) {
+                    if (file.level() == 0) {
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
     @Override
     public CompactResult rewrite(
             int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections) throws Exception {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java
similarity index 51%
copy from 
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
copy to 
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java
index d0c710590..d79cef325 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java
@@ -19,30 +19,34 @@
 package org.apache.paimon.mergetree.compact;
 
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
-import org.apache.paimon.mergetree.LookupLevels;
+import org.apache.paimon.mergetree.ContainsLevels;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.utils.Filter;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Comparator;
 import java.util.List;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /**
- * A {@link MergeTreeCompactRewriter} which produces changelog files by lookup 
for the compaction
- * involving level 0 files.
+ * A {@link MergeTreeCompactRewriter} for first row merge engine which 
produces changelog files by
+ * contains for the compaction involving level 0 files.
  */
-public class LookupMergeTreeCompactRewriter extends ChangelogMergeTreeRewriter 
{
+public class FirstRowMergeTreeCompactRewriter extends 
ChangelogMergeTreeRewriter {
 
-    private final LookupLevels lookupLevels;
+    private final ContainsLevels containsLevels;
 
-    public LookupMergeTreeCompactRewriter(
-            LookupLevels lookupLevels,
+    public FirstRowMergeTreeCompactRewriter(
+            ContainsLevels containsLevels,
             KeyValueFileReaderFactory readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
@@ -58,26 +62,13 @@ public class LookupMergeTreeCompactRewriter extends 
ChangelogMergeTreeRewriter {
                 mergeSorter,
                 valueEqualiser,
                 changelogRowDeduplicate);
-        this.lookupLevels = lookupLevels;
+        this.containsLevels = containsLevels;
     }
 
     @Override
     protected boolean rewriteChangelog(
             int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections) {
-        if (outputLevel == 0) {
-            return false;
-        }
-
-        for (List<SortedRun> runs : sections) {
-            for (SortedRun run : runs) {
-                for (DataFileMeta file : run.files()) {
-                    if (file.level() == 0) {
-                        return true;
-                    }
-                }
-            }
-        }
-        return false;
+        return rewriteLookupChangelog(outputLevel, sections);
     }
 
     @Override
@@ -87,21 +78,66 @@ public class LookupMergeTreeCompactRewriter extends 
ChangelogMergeTreeRewriter {
 
     @Override
     protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int 
outputLevel) {
-        return new LookupChangelogMergeFunctionWrapper(
+        return new FistRowMergeFunctionWrapper(
                 mfFactory,
                 key -> {
                     try {
-                        return lookupLevels.lookup(key, outputLevel + 1);
+                        return containsLevels.contains(key, outputLevel + 1);
                     } catch (IOException e) {
                         throw new UncheckedIOException(e);
                     }
-                },
-                valueEqualiser,
-                changelogRowDeduplicate);
+                });
     }
 
     @Override
     public void close() throws IOException {
-        lookupLevels.close();
+        containsLevels.close();
+    }
+
+    @VisibleForTesting
+    static class FistRowMergeFunctionWrapper implements 
MergeFunctionWrapper<ChangelogResult> {
+
+        private final Filter<InternalRow> contains;
+        private final FirstRowMergeFunction mergeFunction;
+        private final ChangelogResult reusedResult = new ChangelogResult();
+
+        public FistRowMergeFunctionWrapper(
+                MergeFunctionFactory<KeyValue> mergeFunctionFactory, 
Filter<InternalRow> contains) {
+            this.contains = contains;
+            MergeFunction<KeyValue> mergeFunction = 
mergeFunctionFactory.create();
+            checkArgument(
+                    mergeFunction instanceof FirstRowMergeFunction,
+                    "Merge function should be a FirstRowMergeFunction, but is 
%s, there is a bug.",
+                    mergeFunction.getClass().getName());
+            this.mergeFunction = (FirstRowMergeFunction) mergeFunction;
+        }
+
+        @Override
+        public void reset() {
+            mergeFunction.reset();
+        }
+
+        @Override
+        public void add(KeyValue kv) {
+            mergeFunction.add(kv);
+        }
+
+        @Override
+        public ChangelogResult getResult() {
+            reusedResult.reset();
+            KeyValue result = mergeFunction.getResult();
+            checkArgument(result != null);
+            if (contains.test(result.key())) {
+                // empty
+                return reusedResult;
+            }
+
+            reusedResult.setResult(result);
+            if (result.level() == 0) {
+                // new record, output changelog
+                return reusedResult.addChangelog(result);
+            }
+            return reusedResult;
+        }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
index 86eb16698..0b1e41450 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
@@ -41,7 +41,6 @@ public class FullChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper<C
     private final int maxLevel;
     private final RecordEqualiser valueEqualiser;
     private final boolean changelogRowDeduplicate;
-    private final boolean isFirstRow;
 
     // only full compaction will write files into maxLevel, see 
UniversalCompaction class
     private KeyValue topLevelKv;
@@ -62,7 +61,6 @@ public class FullChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper<C
                 "Value count merge function does not need to produce changelog 
from full compaction. "
                         + "Please set changelog producer to 'input'.");
         this.mergeFunction = mergeFunction;
-        this.isFirstRow = mergeFunction instanceof FirstRowMergeFunction;
         this.maxLevel = maxLevel;
         this.valueEqualiser = valueEqualiser;
         this.changelogRowDeduplicate = changelogRowDeduplicate;
@@ -110,10 +108,6 @@ public class FullChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper<C
                     reusedResult.addChangelog(replace(reusedAfter, 
RowKind.INSERT, merged));
                 }
             } else {
-                // For first row, we should just return old value. And produce 
no changelog.
-                if (isFirstRow) {
-                    return reusedResult.setResultIfNotRetract(merged);
-                }
                 if (merged == null || !isAdd(merged)) {
                     reusedResult.addChangelog(replace(reusedBefore, 
RowKind.DELETE, topLevelKv));
                 } else if (!changelogRowDeduplicate
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index e862789b2..e405d33cb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -53,7 +53,6 @@ public class LookupChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper
     private final KeyValue reusedAfter = new KeyValue();
     private final RecordEqualiser valueEqualiser;
     private final boolean changelogRowDeduplicate;
-    private final boolean isFirstRow;
 
     public LookupChangelogMergeFunctionWrapper(
             MergeFunctionFactory<KeyValue> mergeFunctionFactory,
@@ -66,7 +65,6 @@ public class LookupChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper
                 "Merge function should be a LookupMergeFunction, but is %s, 
there is a bug.",
                 mergeFunction.getClass().getName());
         this.mergeFunction = (LookupMergeFunction) mergeFunction;
-        this.isFirstRow = this.mergeFunction.isFirstRow;
         this.mergeFunction2 = mergeFunctionFactory.create();
         this.lookup = lookup;
         this.valueEqualiser = valueEqualiser;
@@ -100,9 +98,7 @@ public class LookupChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper
         // 2. With level 0, with the latest high level, return changelog
         if (highLevel != null) {
             // For first row, we should just return old value. And produce no 
changelog.
-            if (!isFirstRow) {
-                setChangelog(highLevel, result);
-            }
+            setChangelog(highLevel, result);
             return reusedResult.setResult(result);
         }
 
@@ -114,9 +110,7 @@ public class LookupChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper
             mergeFunction2.add(highLevel);
             mergeFunction2.add(result);
             result = mergeFunction2.getResult();
-            if (!isFirstRow) {
-                setChangelog(highLevel, result);
-            }
+            setChangelog(highLevel, result);
         } else {
             setChangelog(null, result);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index fe8056e4c..64fc78045 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -41,12 +41,10 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
 
     KeyValue highLevel;
     boolean containLevel0;
-    protected final boolean isFirstRow;
 
     public LookupMergeFunction(
             MergeFunction<KeyValue> mergeFunction, RowType keyType, RowType 
valueType) {
         this.mergeFunction = mergeFunction;
-        this.isFirstRow = mergeFunction instanceof FirstRowMergeFunction;
         this.keySerializer = new InternalRowSerializer(keyType);
         this.valueSerializer = new InternalRowSerializer(valueType);
     }
@@ -66,9 +64,7 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
     @Override
     public KeyValue getResult() {
         // 1. Find the latest high level record
-        // For the first row, the candidates should in the highest level.
-        Iterator<KeyValue> descending =
-                isFirstRow ? candidates.iterator() : 
candidates.descendingIterator();
+        Iterator<KeyValue> descending = candidates.descendingIterator();
         while (descending.hasNext()) {
             KeyValue kv = descending.next();
             if (kv.level() > 0) {
@@ -90,6 +86,11 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
 
     public static MergeFunctionFactory<KeyValue> wrap(
             MergeFunctionFactory<KeyValue> wrapped, RowType keyType, RowType 
valueType) {
+        if (wrapped.create() instanceof FirstRowMergeFunction) {
+            // don't wrap first row, it is already OK
+            return wrapped;
+        }
+
         return new Factory(wrapped, keyType, valueType);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index d0c710590..e5a13b1c6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -64,20 +64,7 @@ public class LookupMergeTreeCompactRewriter extends 
ChangelogMergeTreeRewriter {
     @Override
     protected boolean rewriteChangelog(
             int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections) {
-        if (outputLevel == 0) {
-            return false;
-        }
-
-        for (List<SortedRun> runs : sections) {
-            for (SortedRun run : runs) {
-                for (DataFileMeta file : run.files()) {
-                    if (file.level() == 0) {
-                        return true;
-                    }
-                }
-            }
-        }
-        return false;
+        return rewriteLookupChangelog(outputLevel, sections);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 176914738..ffd7c6081 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -35,12 +35,14 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
+import org.apache.paimon.mergetree.ContainsLevels;
 import org.apache.paimon.mergetree.Levels;
 import org.apache.paimon.mergetree.LookupLevels;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeWriter;
 import org.apache.paimon.mergetree.compact.CompactRewriter;
 import org.apache.paimon.mergetree.compact.CompactStrategy;
+import org.apache.paimon.mergetree.compact.FirstRowMergeTreeCompactRewriter;
 import 
org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
 import org.apache.paimon.mergetree.compact.LookupCompaction;
 import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
@@ -218,6 +220,23 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         valueEqualiserSupplier.get(),
                         options.changelogRowDeduplicate());
             case LOOKUP:
+                if (options.mergeEngine() == 
CoreOptions.MergeEngine.FIRST_ROW) {
+                    KeyValueFileReaderFactory keyOnlyReader =
+                            readerFactoryBuilder
+                                    .copyWithoutProjection()
+                                    .withValueProjection(new int[0][])
+                                    .build(partition, bucket);
+                    ContainsLevels containsLevels = 
createContainsLevels(levels, keyOnlyReader);
+                    return new FirstRowMergeTreeCompactRewriter(
+                            containsLevels,
+                            readerFactory,
+                            writerFactory,
+                            keyComparator,
+                            mfFactory,
+                            mergeSorter,
+                            valueEqualiserSupplier.get(),
+                            options.changelogRowDeduplicate());
+                }
                 LookupLevels lookupLevels = createLookupLevels(levels, 
readerFactory);
                 return new LookupMergeTreeCompactRewriter(
                         lookupLevels,
@@ -255,4 +274,25 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
                 
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
     }
+
+    private ContainsLevels createContainsLevels(
+            Levels levels, KeyValueFileReaderFactory readerFactory) {
+        if (ioManager == null) {
+            throw new RuntimeException(
+                    "Can not use lookup, there is no temp disk directory to 
use.");
+        }
+        return new ContainsLevels(
+                levels,
+                keyComparatorSupplier.get(),
+                keyType,
+                file ->
+                        readerFactory.createRecordReader(
+                                file.schemaId(), file.fileName(), 
file.level()),
+                () -> ioManager.createChannel().getPathFile(),
+                new HashLookupStoreFactory(
+                        cacheManager,
+                        
options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR)),
+                
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
+                
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 72ea13649..37f779d35 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.schema;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.WriteMode;
 import org.apache.paimon.casting.CastExecutor;
 import org.apache.paimon.casting.CastExecutors;
@@ -82,8 +83,9 @@ public class SchemaValidation {
 
         validateStartupMode(options);
 
+        ChangelogProducer changelogProducer = options.changelogProducer();
         if (options.writeMode() == WriteMode.APPEND_ONLY
-                && options.changelogProducer() != 
CoreOptions.ChangelogProducer.NONE) {
+                && changelogProducer != ChangelogProducer.NONE) {
             throw new UnsupportedOperationException(
                     String.format(
                             "Can not set the %s to %s and %s at the same 
time.",
@@ -100,16 +102,15 @@ public class SchemaValidation {
                         + SNAPSHOT_NUM_RETAINED_MAX.key());
 
         // Only changelog tables with primary keys support full compaction or 
lookup
-        // changelog
-        // producer
+        // changelog producer
         if (options.writeMode() == WriteMode.CHANGE_LOG) {
-            switch (options.changelogProducer()) {
+            switch (changelogProducer) {
                 case FULL_COMPACTION:
                 case LOOKUP:
                     if (schema.primaryKeys().isEmpty()) {
                         throw new UnsupportedOperationException(
                                 "Changelog table with "
-                                        + options.changelogProducer()
+                                        + changelogProducer
                                         + " must have primary keys");
                     }
                     break;
@@ -173,9 +174,16 @@ public class SchemaValidation {
                                 field));
 
         CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
-        if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW && 
sequenceField.isPresent()) {
-            throw new IllegalArgumentException(
-                    "Do not support use sequence field on FIRST_MERGE merge 
engine");
+        if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW) {
+            if (sequenceField.isPresent()) {
+                throw new IllegalArgumentException(
+                        "Do not support use sequence field on FIRST_MERGE 
merge engine");
+            }
+
+            if (changelogProducer != ChangelogProducer.LOOKUP) {
+                throw new IllegalArgumentException(
+                        "Only support 'lookup' changelog-producer on 
FIRST_MERGE merge engine");
+            }
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
similarity index 78%
copy from 
paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
copy to 
paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 079ebce31..424fe9315 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -60,12 +60,11 @@ import java.util.Map;
 import java.util.UUID;
 
 import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
-import static org.apache.paimon.KeyValue.UNKNOWN_SEQUENCE;
 import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test {@link LookupLevels}. */
-public class LookupLevelsTest {
+/** Test {@link ContainsLevels}. */
+public class ContainsLevelsTest {
 
     private static final String LOOKUP_FILE_PREFIX = "lookup-";
 
@@ -88,35 +87,22 @@ public class LookupLevelsTest {
                                 newFile(1, kv(1, 11), kv(3, 33), kv(5, 5)),
                                 newFile(2, kv(2, 22), kv(5, 55))),
                         3);
-        LookupLevels lookupLevels = createLookupLevels(levels, 
MemorySize.ofMebiBytes(10));
+        ContainsLevels containsLevels = createContainsLevels(levels, 
MemorySize.ofMebiBytes(10));
 
         // only in level 1
-        KeyValue kv = lookupLevels.lookup(row(1), 1);
-        assertThat(kv).isNotNull();
-        assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
-        assertThat(kv.level()).isEqualTo(1);
-        assertThat(kv.value().getInt(1)).isEqualTo(11);
+        assertThat(containsLevels.contains(row(1), 1)).isTrue();
 
         // only in level 2
-        kv = lookupLevels.lookup(row(2), 1);
-        assertThat(kv).isNotNull();
-        assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
-        assertThat(kv.level()).isEqualTo(2);
-        assertThat(kv.value().getInt(1)).isEqualTo(22);
+        assertThat(containsLevels.contains(row(2), 1)).isTrue();
 
         // both in level 1 and level 2
-        kv = lookupLevels.lookup(row(5), 1);
-        assertThat(kv).isNotNull();
-        assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
-        assertThat(kv.level()).isEqualTo(1);
-        assertThat(kv.value().getInt(1)).isEqualTo(5);
+        assertThat(containsLevels.contains(row(5), 1)).isTrue();
 
         // no exists
-        kv = lookupLevels.lookup(row(4), 1);
-        assertThat(kv).isNull();
+        assertThat(containsLevels.contains(row(4), 1)).isFalse();
 
-        lookupLevels.close();
-        assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+        containsLevels.close();
+        
assertThat(containsLevels.containsFiles().estimatedSize()).isEqualTo(0);
     }
 
     @Test
@@ -130,7 +116,7 @@ public class LookupLevelsTest {
                                 newFile(1, kv(7, 77), kv(8, 88)),
                                 newFile(1, kv(10, 1010), kv(11, 1111))),
                         1);
-        LookupLevels lookupLevels = createLookupLevels(levels, 
MemorySize.ofMebiBytes(10));
+        ContainsLevels containsLevels = createContainsLevels(levels, 
MemorySize.ofMebiBytes(10));
 
         Map<Integer, Integer> contains =
                 new HashMap<Integer, Integer>() {
@@ -146,28 +132,23 @@ public class LookupLevelsTest {
                     }
                 };
         for (Map.Entry<Integer, Integer> entry : contains.entrySet()) {
-            KeyValue kv = lookupLevels.lookup(row(entry.getKey()), 1);
-            assertThat(kv).isNotNull();
-            assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
-            assertThat(kv.level()).isEqualTo(1);
-            assertThat(kv.value().getInt(1)).isEqualTo(entry.getValue());
+            assertThat(containsLevels.contains(row(entry.getKey()), 
1)).isTrue();
         }
 
         int[] notContains = new int[] {0, 3, 6, 9, 12};
         for (int key : notContains) {
-            KeyValue kv = lookupLevels.lookup(row(key), 1);
-            assertThat(kv).isNull();
+            assertThat(containsLevels.contains(row(key), 1)).isFalse();
         }
 
-        lookupLevels.close();
-        assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+        containsLevels.close();
+        
assertThat(containsLevels.containsFiles().estimatedSize()).isEqualTo(0);
     }
 
     @Test
     public void testMaxDiskSize() throws IOException {
         List<DataFileMeta> files = new ArrayList<>();
         int fileNum = 10;
-        int recordInFile = 100;
+        int recordInFile = 1000;
         for (int i = 0; i < fileNum; i++) {
             List<KeyValue> kvs = new ArrayList<>();
             for (int j = 0; j < recordInFile; j++) {
@@ -177,33 +158,28 @@ public class LookupLevelsTest {
             files.add(newFile(1, kvs.toArray(new KeyValue[0])));
         }
         Levels levels = new Levels(comparator, files, 1);
-        LookupLevels lookupLevels = createLookupLevels(levels, 
MemorySize.ofKibiBytes(20));
+        ContainsLevels lookupLevels = createContainsLevels(levels, 
MemorySize.ofKibiBytes(50));
 
         for (int i = 0; i < fileNum * recordInFile; i++) {
-            KeyValue kv = lookupLevels.lookup(row(i), 1);
-            assertThat(kv).isNotNull();
-            assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
-            assertThat(kv.level()).isEqualTo(1);
-            assertThat(kv.value().getInt(1)).isEqualTo(i);
+            assertThat(lookupLevels.contains(row(i), 1)).isTrue();
         }
 
         // some files are invalided
-        long fileNumber = lookupLevels.lookupFiles().size();
+        long fileNumber = lookupLevels.containsFiles().estimatedSize();
         String[] lookupFiles =
                 tempDir.toFile().list((dir, name) -> 
name.startsWith(LOOKUP_FILE_PREFIX));
         assertThat(lookupFiles).isNotNull();
         
assertThat(fileNumber).isNotEqualTo(fileNum).isEqualTo(lookupFiles.length);
 
         lookupLevels.close();
-        assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+        assertThat(lookupLevels.containsFiles().estimatedSize()).isEqualTo(0);
     }
 
-    private LookupLevels createLookupLevels(Levels levels, MemorySize 
maxDiskSize) {
-        return new LookupLevels(
+    private ContainsLevels createContainsLevels(Levels levels, MemorySize 
maxDiskSize) {
+        return new ContainsLevels(
                 levels,
                 comparator,
                 keyType,
-                rowType,
                 file -> createReaderFactory().createRecordReader(0, 
file.fileName(), file.level()),
                 () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new HashLookupStoreFactory(new CacheManager(2048, 
MemorySize.ofMebiBytes(1)), 0.75),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 079ebce31..ef77b75e0 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -116,7 +116,7 @@ public class LookupLevelsTest {
         assertThat(kv).isNull();
 
         lookupLevels.close();
-        assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+        assertThat(lookupLevels.lookupFiles().estimatedSize()).isEqualTo(0);
     }
 
     @Test
@@ -160,7 +160,7 @@ public class LookupLevelsTest {
         }
 
         lookupLevels.close();
-        assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+        assertThat(lookupLevels.lookupFiles().estimatedSize()).isEqualTo(0);
     }
 
     @Test
@@ -188,14 +188,14 @@ public class LookupLevelsTest {
         }
 
         // some files are invalided
-        long fileNumber = lookupLevels.lookupFiles().size();
+        long fileNumber = lookupLevels.lookupFiles().estimatedSize();
         String[] lookupFiles =
                 tempDir.toFile().list((dir, name) -> 
name.startsWith(LOOKUP_FILE_PREFIX));
         assertThat(lookupFiles).isNotNull();
         
assertThat(fileNumber).isNotEqualTo(fileNum).isEqualTo(lookupFiles.length);
 
         lookupLevels.close();
-        assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+        assertThat(lookupLevels.lookupFiles().estimatedSize()).isEqualTo(0);
     }
 
     private LookupLevels createLookupLevels(Levels levels, MemorySize 
maxDiskSize) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
index bddd37111..70e361f87 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
@@ -20,9 +20,7 @@ package org.apache.paimon.mergetree.compact;
 
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordEqualiser;
-import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -40,7 +38,7 @@ public abstract class 
FullChangelogMergeFunctionWrapperTestBase {
     private static final int MAX_LEVEL = 3;
 
     private static final RecordEqualiser EQUALISER =
-            (RecordEqualiser) (row1, row2) -> row1.getInt(0) == row2.getInt(0);
+            (row1, row2) -> row1.getInt(0) == row2.getInt(0);
 
     protected FullChangelogMergeFunctionWrapper wrapper;
 
@@ -48,10 +46,6 @@ public abstract class 
FullChangelogMergeFunctionWrapperTestBase {
 
     protected abstract boolean changelogRowDeduplicate();
 
-    protected List<List<KeyValue>> getInputKvs() {
-        return INPUT_KVS;
-    }
-
     @BeforeEach
     public void beforeEach() {
         wrapper =
@@ -113,10 +107,9 @@ public abstract class 
FullChangelogMergeFunctionWrapperTestBase {
 
     @Test
     public void testFullChangelogMergeFunctionWrapper() {
-        List<List<KeyValue>> inputs = getInputKvs();
-        for (int i = 0; i < inputs.size(); i++) {
+        for (int i = 0; i < INPUT_KVS.size(); i++) {
             wrapper.reset();
-            List<KeyValue> kvs = inputs.get(i);
+            List<KeyValue> kvs = INPUT_KVS.get(i);
             kvs.forEach(kv -> wrapper.add(kv));
             ChangelogResult actualResult = wrapper.getResult();
             List<KeyValue> expectedChangelogs = new ArrayList<>();
@@ -222,113 +215,4 @@ public abstract class 
FullChangelogMergeFunctionWrapperTestBase {
             return true;
         }
     }
-
-    /** Test for {@link FirstRowMergeFunction} with {@link 
FullChangelogMergeFunctionWrapper}. */
-    public static class FirstRowMergeFunctionTest
-            extends FullChangelogMergeFunctionWrapperTestBase {
-
-        private static final List<List<KeyValue>> INPUT_KVS =
-                Arrays.asList(
-                        // only 1 insert record, not from top level
-                        Collections.singletonList(
-                                new KeyValue()
-                                        .replace(row(1), 1, RowKind.INSERT, 
row(1))
-                                        .setLevel(0)),
-                        // only 1 delete record, not from top level
-                        Collections.singletonList(
-                                new KeyValue()
-                                        .replace(row(2), 2, RowKind.DELETE, 
row(0))
-                                        .setLevel(0)),
-                        // only 1 insert record, from top level
-                        Collections.singletonList(
-                                new KeyValue()
-                                        .replace(row(3), 3, RowKind.INSERT, 
row(3))
-                                        .setLevel(MAX_LEVEL)),
-                        // multiple records, none from top level
-                        Arrays.asList(
-                                new KeyValue()
-                                        .replace(row(4), 4, RowKind.INSERT, 
row(3))
-                                        .setLevel(0),
-                                new KeyValue()
-                                        .replace(row(4), 5, RowKind.INSERT, 
row(-3))
-                                        .setLevel(0)),
-                        // multiple records, one from top level
-                        Arrays.asList(
-                                new KeyValue()
-                                        .replace(row(6), 8, RowKind.INSERT, 
row(3))
-                                        .setLevel(MAX_LEVEL),
-                                new KeyValue()
-                                        .replace(row(6), 9, RowKind.INSERT, 
row(-3))
-                                        .setLevel(0)),
-                        Arrays.asList(
-                                new KeyValue()
-                                        .replace(row(8), 14, RowKind.INSERT, 
row(3))
-                                        .setLevel(MAX_LEVEL),
-                                new KeyValue()
-                                        .replace(row(8), 15, RowKind.INSERT, 
row(3))
-                                        .setLevel(0)));
-
-        @Override
-        protected List<List<KeyValue>> getInputKvs() {
-            return INPUT_KVS;
-        }
-
-        private final List<KeyValue> expectedBefore =
-                Arrays.asList(
-                        null,
-                        null,
-                        null,
-                        null,
-                        null,
-                        changelogRowDeduplicate()
-                                ? null
-                                : new KeyValue()
-                                        .replace(row(8), 14, 
RowKind.UPDATE_BEFORE, row(3)));
-
-        private final List<KeyValue> expectedAfter =
-                Arrays.asList(
-                        new KeyValue().replace(row(1), 1, RowKind.INSERT, 
row(1)),
-                        null,
-                        null,
-                        new KeyValue().replace(row(4), 4, RowKind.INSERT, 
row(3)),
-                        null,
-                        changelogRowDeduplicate()
-                                ? null
-                                : new KeyValue().replace(row(8), 15, 
RowKind.UPDATE_AFTER, row(3)));
-
-        private final List<KeyValue> expectedResult =
-                Arrays.asList(
-                        new KeyValue().replace(row(1), 1, RowKind.INSERT, 
row(1)),
-                        null,
-                        new KeyValue().replace(row(3), 3, RowKind.INSERT, 
row(3)),
-                        new KeyValue().replace(row(4), 4, RowKind.INSERT, 
row(3)),
-                        new KeyValue().replace(row(6), 8, RowKind.INSERT, 
row(3)),
-                        new KeyValue().replace(row(8), 14, RowKind.INSERT, 
row(3)));
-
-        @Override
-        protected MergeFunction<KeyValue> createMergeFunction() {
-            return new FirstRowMergeFunction(
-                    RowType.of(DataTypes.INT()), RowType.of(DataTypes.INT()));
-        }
-
-        @Override
-        protected boolean changelogRowDeduplicate() {
-            return true;
-        }
-
-        @Override
-        protected KeyValue getExpectedBefore(int idx) {
-            return expectedBefore.get(idx);
-        }
-
-        @Override
-        protected KeyValue getExpectedAfter(int idx) {
-            return expectedAfter.get(idx);
-        }
-
-        @Override
-        protected KeyValue getExpectedResult(int idx) {
-            return expectedResult.get(idx);
-        }
-    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 892446da6..5cf976313 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -36,8 +36,10 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.apache.paimon.types.RowKind.DELETE;
@@ -50,7 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class LookupChangelogMergeFunctionWrapperTest {
 
     private static final RecordEqualiser EQUALISER =
-            (RecordEqualiser) (row1, row2) -> row1.getInt(0) == row2.getInt(0);
+            (row1, row2) -> row1.getInt(0) == row2.getInt(0);
 
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
@@ -290,25 +292,16 @@ public class LookupChangelogMergeFunctionWrapperTest {
 
     @Test
     public void testFirstRow() {
-        Map<InternalRow, KeyValue> highLevel = new HashMap<>();
-        LookupChangelogMergeFunctionWrapper function =
-                new LookupChangelogMergeFunctionWrapper(
-                        LookupMergeFunction.wrap(
-                                projection ->
-                                        new FirstRowMergeFunction(
-                                                new RowType(
-                                                        Lists.list(
-                                                                new DataField(
-                                                                        0, 
"f0", new IntType()))),
-                                                new RowType(
-                                                        Lists.list(
-                                                                new DataField(
-                                                                        1, 
"f1", new IntType())))),
-                                RowType.of(DataTypes.INT()),
-                                RowType.of(DataTypes.INT())),
-                        highLevel::get,
-                        EQUALISER,
-                        false);
+        Set<InternalRow> highLevel = new HashSet<>();
+        FirstRowMergeTreeCompactRewriter.FistRowMergeFunctionWrapper function =
+                new 
FirstRowMergeTreeCompactRewriter.FistRowMergeFunctionWrapper(
+                        projection ->
+                                new FirstRowMergeFunction(
+                                        new RowType(
+                                                Lists.list(new DataField(0, 
"f0", new IntType()))),
+                                        new RowType(
+                                                Lists.list(new DataField(1, 
"f1", new IntType())))),
+                        highLevel::contains);
 
         // Without level-0
         function.reset();
@@ -361,7 +354,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
 
         // with high level value
         function.reset();
-        highLevel.put(row(1), new KeyValue().replace(row(1), INSERT, row(10)));
+        highLevel.add(row(1));
         function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(0)).setLevel(0));
 
         result = function.getResult();
@@ -369,7 +362,6 @@ public class LookupChangelogMergeFunctionWrapperTest {
         changelogs = result.changelogs();
         assertThat(changelogs).hasSize(0);
         kv = result.result();
-        assertThat(kv).isNotNull();
-        assertThat(kv.value().getInt(0)).isEqualTo(10);
+        assertThat(kv).isNull();
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index a12c5d152..ea4607214 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -160,9 +160,10 @@ public abstract class SortMergeReaderTestBase extends 
CombiningRecordReaderTestB
 
         @Override
         protected MergeFunction<KeyValue> createMergeFunction() {
-            return new FirstRowMergeFunction(
-                    new RowType(Lists.list(new DataField(0, "f0", new 
IntType()))),
-                    new RowType(Lists.list(new DataField(1, "f1", new 
BigIntType()))));
+            RowType keyType = new RowType(Lists.list(new DataField(0, "f0", 
new IntType())));
+            RowType valueType = new RowType(Lists.list(new DataField(1, "f1", 
new BigIntType())));
+            return new LookupMergeFunction(
+                    new FirstRowMergeFunction(keyType, valueType), keyType, 
valueType);
         }
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index d549954a7..4c4020343 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -195,6 +195,32 @@ public class UniversalCompactionTest {
                 .isEqualTo(new long[] {27});
     }
 
+    @Test
+    public void testLookup() {
+        LookupCompaction compaction = new LookupCompaction(new 
UniversalCompaction(25, 1, 3));
+
+        // level 0 to max level
+        Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 2, 2));
+        assertThat(pick.isPresent()).isTrue();
+        long[] results = 
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+        assertThat(results).isEqualTo(new long[] {1, 2, 2, 2});
+        assertThat(pick.get().outputLevel()).isEqualTo(2);
+
+        // level 0 force pick
+        pick = compaction.pick(3, Arrays.asList(level(0, 1), level(1, 2), 
level(2, 2)));
+        assertThat(pick.isPresent()).isTrue();
+        results = 
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+        assertThat(results).isEqualTo(new long[] {1, 2, 2});
+        assertThat(pick.get().outputLevel()).isEqualTo(2);
+
+        // level 0 to empty level
+        pick = compaction.pick(3, Arrays.asList(level(0, 1), level(2, 2)));
+        assertThat(pick.isPresent()).isTrue();
+        results = 
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+        assertThat(results).isEqualTo(new long[] {1});
+        assertThat(pick.get().outputLevel()).isEqualTo(1);
+    }
+
     private List<LevelSortedRun> createLevels(int... levels) {
         List<LevelSortedRun> runs = new ArrayList<>();
         for (int size : levels) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
index e1e7a7efe..f2ee7ee43 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
@@ -28,7 +28,6 @@ import org.rocksdb.RocksDBException;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 
 /** RocksDB state for key -> List of value. */
 public class RocksDBListState extends RocksDBState<List<InternalRow>> {
@@ -59,21 +58,22 @@ public class RocksDBListState extends 
RocksDBState<List<InternalRow>> {
 
     public List<InternalRow> get(InternalRow key) throws IOException {
         byte[] keyBytes = serializeKey(key);
-        try {
-            return cache.get(
-                    wrap(keyBytes),
-                    () -> {
-                        byte[] valueBytes = db.get(columnFamily, keyBytes);
-                        List<InternalRow> rows =
-                                listSerializer.deserializeList(valueBytes, 
valueSerializer);
-                        if (rows == null) {
-                            return EMPTY;
-                        }
-                        return rows;
-                    });
-        } catch (ExecutionException e) {
-            throw new IOException(e);
-        }
+        return cache.get(
+                wrap(keyBytes),
+                k -> {
+                    byte[] valueBytes;
+                    try {
+                        valueBytes = db.get(columnFamily, keyBytes);
+                    } catch (RocksDBException e) {
+                        throw new RuntimeException(e);
+                    }
+                    List<InternalRow> rows =
+                            listSerializer.deserializeList(valueBytes, 
valueSerializer);
+                    if (rows == null) {
+                        return EMPTY;
+                    }
+                    return rows;
+                });
     }
 
     private byte[] serializeValue(InternalRow value) throws IOException {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
index 0e47b6c38..7269d1234 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
@@ -23,8 +23,9 @@ import org.apache.paimon.data.serializer.Serializer;
 import org.apache.paimon.io.DataInputDeserializer;
 import org.apache.paimon.io.DataOutputSerializer;
 
-import org.apache.paimon.shade.guava30.com.google.common.cache.Cache;
-import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
@@ -70,7 +71,11 @@ public abstract class RocksDBState<CacheV> {
         this.valueInputView = new DataInputDeserializer();
         this.valueOutputView = new DataOutputSerializer(32);
         this.writeOptions = new WriteOptions().setDisableWAL(true);
-        this.cache = 
CacheBuilder.newBuilder().maximumSize(lruCacheSize).build();
+        this.cache =
+                Caffeine.newBuilder()
+                        .maximumSize(lruCacheSize)
+                        .executor(MoreExecutors.directExecutor())
+                        .build();
     }
 
     protected byte[] serializeKey(InternalRow key) throws IOException {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
index cd625b5ac..12af57fea 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
@@ -26,81 +26,66 @@ import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for first row merge engine. */
 public class FirstRowITCase extends CatalogITCaseBase {
 
     @Override
     protected List<String> ddl() {
-        return Arrays.asList(
+        return Collections.singletonList(
                 "CREATE TABLE IF NOT EXISTS T ("
                         + "a INT, b INT, c STRING, PRIMARY KEY (a) NOT 
ENFORCED)"
-                        + " WITH ('merge-engine'='first-row');",
-                "CREATE TABLE IF NOT EXISTS T1 ("
-                        + "a INT, b INT, c STRING, PRIMARY KEY (a) NOT 
ENFORCED)"
-                        + " WITH ('merge-engine'='first-row', 
'changelog-producer' = 'lookup');",
-                "CREATE TABLE IF NOT EXISTS T2 ("
-                        + "a INT, b INT, c STRING, PRIMARY KEY (a) NOT 
ENFORCED)"
-                        + " WITH ('merge-engine'='first-row', 
'changelog-producer' = 'full-compaction', 'full-compaction.delta-commits' = 
'3');");
+                        + " WITH ('merge-engine'='first-row', 
'file.format'='avro', 'changelog-producer' = 'lookup');");
     }
 
     @Test
-    public void testBatchQuery() {
-        batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
-        List<Row> result = batchSql("SELECT * FROM T");
-        
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 1, 1, 
"1"));
-
-        result = batchSql("SELECT c FROM T");
-        
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1"));
+    public void testIllegal() {
+        assertThatThrownBy(
+                        () ->
+                                sql(
+                                        "CREATE TABLE ILLEGAL_T (a INT, b INT, 
c STRING, PRIMARY KEY (a) NOT ENFORCED)"
+                                                + " WITH 
('merge-engine'='first-row')"))
+                .hasRootCauseMessage(
+                        "Only support 'lookup' changelog-producer on 
FIRST_MERGE merge engine");
     }
 
     @Test
-    public void testReadAfterFullCompaction() {
-        batchSql("ALTER TABLE T SET ('full-compaction.delta-commits'='1')");
-
+    public void testBatchQuery() {
         batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
         List<Row> result = batchSql("SELECT * FROM T");
         
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 1, 1, 
"1"));
 
-        batchSql("INSERT INTO T VALUES (1, 1, '1'), (2, 2, '2')");
-        result = batchSql("SELECT * FROM T");
-        assertThat(result)
-                .containsExactlyInAnyOrder(
-                        Row.ofKind(RowKind.INSERT, 1, 1, "1"),
-                        Row.ofKind(RowKind.INSERT, 2, 2, "2"));
+        result = batchSql("SELECT c FROM T");
+        
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1"));
     }
 
     @Test
-    public void testStreamingReadOnFullCompaction() throws Exception {
-        BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * 
FROM T2");
+    public void testStreamingRead() throws Exception {
+        BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * 
FROM T");
 
-        sql("INSERT INTO T2 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (1, 
4, '4')");
+        sql("INSERT INTO T VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (1, 4, 
'4')");
         assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(
                         Row.ofKind(RowKind.INSERT, 1, 1, "1"),
                         Row.ofKind(RowKind.INSERT, 2, 2, "2"));
 
-        sql("INSERT INTO T2 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (3, 
3, '3')");
-        assertThat(iterator.collect(1))
-                .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 3, 3, 
"3"));
-    }
-
-    @Test
-    public void testStreamingReadOnLookup() throws Exception {
-        BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * 
FROM T1");
-
-        sql("INSERT INTO T1 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (1, 
4, '4')");
-        assertThat(iterator.collect(2))
+        sql(
+                "INSERT INTO T VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), 
(3, 3, '3'), (4, 4, '4'), (5, 5, '5'), (6, 6, '6'), (7, 7, '7')");
+        assertThat(iterator.collect(5))
                 .containsExactlyInAnyOrder(
-                        Row.ofKind(RowKind.INSERT, 1, 1, "1"),
-                        Row.ofKind(RowKind.INSERT, 2, 2, "2"));
+                        Row.ofKind(RowKind.INSERT, 3, 3, "3"),
+                        Row.ofKind(RowKind.INSERT, 4, 4, "4"),
+                        Row.ofKind(RowKind.INSERT, 5, 5, "5"),
+                        Row.ofKind(RowKind.INSERT, 6, 6, "6"),
+                        Row.ofKind(RowKind.INSERT, 7, 7, "7"));
 
-        sql("INSERT INTO T1 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (3, 
3, '3')");
+        sql("INSERT INTO T VALUES(7, 7, '8'), (8, 8, '8')");
         assertThat(iterator.collect(1))
-                .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 3, 3, 
"3"));
+                .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 8, 8, 
"8"));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 0c7636efb..d89f5860e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -72,8 +72,11 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
+import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
+import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
 import static org.apache.paimon.CoreOptions.WRITE_MODE;
@@ -1482,6 +1485,9 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
         Map<String, String> options = new HashMap<>();
         options.put(WRITE_MODE.key(), WriteMode.CHANGE_LOG.toString());
         options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+        if (mergeEngine == FIRST_ROW) {
+            options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
+        }
         String table =
                 createTable(
                         Arrays.asList(
@@ -1576,6 +1582,9 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
         options.put(
                 CoreOptions.FIELDS_PREFIX + ".rate." + 
CoreOptions.DEFAULT_VALUE_SUFFIX, "1000");
         options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+        if (mergeEngine == FIRST_ROW) {
+            options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
+        }
         String table =
                 createTable(
                         Arrays.asList(
@@ -1656,6 +1665,9 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
         Map<String, String> options = new HashMap<>();
         options.put(WRITE_MODE.key(), WriteMode.CHANGE_LOG.toString());
         options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+        if (mergeEngine == FIRST_ROW) {
+            options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
+        }
         String table =
                 createTable(
                         Arrays.asList(
@@ -1746,6 +1758,9 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
         Map<String, String> options = new HashMap<>();
         options.put(WRITE_MODE.key(), WriteMode.CHANGE_LOG.toString());
         options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+        if (mergeEngine == FIRST_ROW) {
+            options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
+        }
         String table =
                 createTable(
                         Arrays.asList(
@@ -1824,6 +1839,9 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
         Map<String, String> options = new HashMap<>();
         options.put(WRITE_MODE.key(), WriteMode.CHANGE_LOG.toString());
         options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+        if (mergeEngine == FIRST_ROW) {
+            options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
+        }
         String table =
                 createTable(
                         Arrays.asList(

Reply via email to