This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 531f6ebb9 [core] Unify contains classes into lookup for first row 
(#2915)
531f6ebb9 is described below

commit 531f6ebb928bea0810319544b2ae65a4b41c7091
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Feb 28 16:12:59 2024 +0800

    [core] Unify contains classes into lookup for first row (#2915)
---
 .../apache/paimon/mergetree/ContainsLevels.java    | 210 ---------------------
 .../org/apache/paimon/mergetree/LookupLevels.java  |  87 ++++++---
 .../compact/ChangelogMergeTreeRewriter.java        |   9 +-
 .../compact/FirstRowMergeTreeCompactRewriter.java  | 165 ----------------
 .../compact/FistRowMergeFunctionWrapper.java       |  75 ++++++++
 .../FullChangelogMergeTreeCompactRewriter.java     |  11 +-
 .../compact/LookupMergeTreeCompactRewriter.java    |  89 +++++++--
 .../paimon/operation/KeyValueFileStoreWrite.java   |  75 +++-----
 .../apache/paimon/table/query/LocalTableQuery.java |  19 +-
 .../paimon/mergetree/ContainsLevelsTest.java       |  38 ++--
 .../apache/paimon/mergetree/LookupLevelsTest.java  |  21 ++-
 .../LookupChangelogMergeFunctionWrapperTest.java   |   4 +-
 12 files changed, 291 insertions(+), 512 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/ContainsLevels.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/ContainsLevels.java
deleted file mode 100644
index cce18363c..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/ContainsLevels.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.mergetree;
-
-import org.apache.paimon.KeyValue;
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.serializer.RowCompactedSerializer;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.lookup.LookupStoreFactory;
-import org.apache.paimon.lookup.LookupStoreReader;
-import org.apache.paimon.lookup.LookupStoreWriter;
-import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.BloomFilter;
-import org.apache.paimon.utils.FileIOUtils;
-import org.apache.paimon.utils.IOFunction;
-
-import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
-import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
-import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
-import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
-
-import javax.annotation.Nullable;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.time.Duration;
-import java.util.Comparator;
-import java.util.TreeSet;
-import java.util.function.Function;
-import java.util.function.Supplier;
-
-import static org.apache.paimon.mergetree.LookupUtils.fileKibiBytes;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
-/** Provide contains key. */
-public class ContainsLevels implements Levels.DropFileCallback, Closeable {
-
-    private static final byte[] EMPTY_VALUE = new byte[0];
-
-    private final Levels levels;
-    private final Comparator<InternalRow> keyComparator;
-    private final RowCompactedSerializer keySerializer;
-    private final IOFunction<DataFileMeta, RecordReader<KeyValue>> 
fileReaderFactory;
-    private final Supplier<File> localFileFactory;
-    private final LookupStoreFactory lookupStoreFactory;
-    private final Cache<String, ContainsFile> containsFiles;
-    private final Function<Long, BloomFilter.Builder> bfGenerator;
-
-    public ContainsLevels(
-            Levels levels,
-            Comparator<InternalRow> keyComparator,
-            RowType keyType,
-            IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
-            Supplier<File> localFileFactory,
-            LookupStoreFactory lookupStoreFactory,
-            Duration fileRetention,
-            MemorySize maxDiskSize,
-            Function<Long, BloomFilter.Builder> bfGenerator) {
-        this.levels = levels;
-        this.keyComparator = keyComparator;
-        this.keySerializer = new RowCompactedSerializer(keyType);
-        this.fileReaderFactory = fileReaderFactory;
-        this.localFileFactory = localFileFactory;
-        this.lookupStoreFactory = lookupStoreFactory;
-        this.containsFiles =
-                Caffeine.newBuilder()
-                        .expireAfterAccess(fileRetention)
-                        .maximumWeight(maxDiskSize.getKibiBytes())
-                        .weigher(this::fileWeigh)
-                        .removalListener(this::removalCallback)
-                        .executor(MoreExecutors.directExecutor())
-                        .build();
-        levels.addDropFileCallback(this);
-        this.bfGenerator = bfGenerator;
-    }
-
-    @VisibleForTesting
-    Cache<String, ContainsFile> containsFiles() {
-        return containsFiles;
-    }
-
-    @Override
-    public void notifyDropFile(String file) {
-        containsFiles.invalidate(file);
-    }
-
-    public boolean contains(InternalRow key, int startLevel) throws 
IOException {
-        Boolean result =
-                LookupUtils.lookup(levels, key, startLevel, this::contains, 
this::containsLevel0);
-        return result != null && result;
-    }
-
-    @Nullable
-    private Boolean containsLevel0(InternalRow key, TreeSet<DataFileMeta> 
level0)
-            throws IOException {
-        return LookupUtils.lookupLevel0(keyComparator, key, level0, 
this::contains);
-    }
-
-    @Nullable
-    private Boolean contains(InternalRow key, SortedRun level) throws 
IOException {
-        return LookupUtils.lookup(keyComparator, key, level, this::contains);
-    }
-
-    @Nullable
-    private Boolean contains(InternalRow key, DataFileMeta file) throws 
IOException {
-        ContainsFile containsFile = 
containsFiles.getIfPresent(file.fileName());
-        while (containsFile == null || containsFile.isClosed) {
-            containsFile = createContainsFile(file);
-            containsFiles.put(file.fileName(), containsFile);
-        }
-        if (containsFile.get(keySerializer.serializeToBytes(key)) != null) {
-            return true;
-        }
-        return null;
-    }
-
-    private int fileWeigh(String file, ContainsFile containsFile) {
-        return fileKibiBytes(containsFile.localFile);
-    }
-
-    private void removalCallback(String key, ContainsFile file, RemovalCause 
cause) {
-        if (file != null) {
-            try {
-                file.close();
-            } catch (IOException e) {
-                throw new UncheckedIOException(e);
-            }
-        }
-    }
-
-    private ContainsFile createContainsFile(DataFileMeta file) throws 
IOException {
-        File localFile = localFileFactory.get();
-        if (!localFile.createNewFile()) {
-            throw new IOException("Can not create new file: " + localFile);
-        }
-        LookupStoreWriter kvWriter =
-                lookupStoreFactory.createWriter(localFile, 
bfGenerator.apply(file.rowCount()));
-        LookupStoreFactory.Context context;
-        try (RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) {
-            RecordReader.RecordIterator<KeyValue> batch;
-            KeyValue kv;
-            while ((batch = reader.readBatch()) != null) {
-                while ((kv = batch.next()) != null) {
-                    byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
-                    kvWriter.put(keyBytes, EMPTY_VALUE);
-                }
-                batch.releaseBatch();
-            }
-        } catch (IOException e) {
-            FileIOUtils.deleteFileOrDirectory(localFile);
-            throw e;
-        } finally {
-            context = kvWriter.close();
-        }
-
-        return new ContainsFile(localFile, 
lookupStoreFactory.createReader(localFile, context));
-    }
-
-    @Override
-    public void close() throws IOException {
-        containsFiles.invalidateAll();
-    }
-
-    private static class ContainsFile implements Closeable {
-
-        private final File localFile;
-        private final LookupStoreReader reader;
-
-        private boolean isClosed = false;
-
-        public ContainsFile(File localFile, LookupStoreReader reader) {
-            this.localFile = localFile;
-            this.reader = reader;
-        }
-
-        @Nullable
-        public byte[] get(byte[] key) throws IOException {
-            checkArgument(!isClosed);
-            return reader.lookup(key);
-        }
-
-        @Override
-        public void close() throws IOException {
-            reader.close();
-            isClosed = true;
-            FileIOUtils.deleteFileOrDirectory(localFile);
-        }
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index c35bda970..4869055ad 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -23,7 +23,6 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.RowCompactedSerializer;
 import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataOutputSerializer;
 import org.apache.paimon.lookup.LookupStoreFactory;
 import org.apache.paimon.lookup.LookupStoreReader;
 import org.apache.paimon.lookup.LookupStoreWriter;
@@ -57,12 +56,12 @@ import static 
org.apache.paimon.mergetree.LookupUtils.fileKibiBytes;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Provide lookup by key. */
-public class LookupLevels implements Levels.DropFileCallback, Closeable {
+public class LookupLevels<T> implements Levels.DropFileCallback, Closeable {
 
     private final Levels levels;
     private final Comparator<InternalRow> keyComparator;
     private final RowCompactedSerializer keySerializer;
-    private final RowCompactedSerializer valueSerializer;
+    private final ValueProcessor<T> valueProcessor;
     private final IOFunction<DataFileMeta, RecordReader<KeyValue>> 
fileReaderFactory;
     private final Supplier<File> localFileFactory;
     private final LookupStoreFactory lookupStoreFactory;
@@ -73,7 +72,7 @@ public class LookupLevels implements Levels.DropFileCallback, 
Closeable {
             Levels levels,
             Comparator<InternalRow> keyComparator,
             RowType keyType,
-            RowType valueType,
+            ValueProcessor<T> valueProcessor,
             IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
             Supplier<File> localFileFactory,
             LookupStoreFactory lookupStoreFactory,
@@ -83,7 +82,7 @@ public class LookupLevels implements Levels.DropFileCallback, 
Closeable {
         this.levels = levels;
         this.keyComparator = keyComparator;
         this.keySerializer = new RowCompactedSerializer(keyType);
-        this.valueSerializer = new RowCompactedSerializer(valueType);
+        this.valueProcessor = valueProcessor;
         this.fileReaderFactory = fileReaderFactory;
         this.localFileFactory = localFileFactory;
         this.lookupStoreFactory = lookupStoreFactory;
@@ -114,23 +113,22 @@ public class LookupLevels implements 
Levels.DropFileCallback, Closeable {
     }
 
     @Nullable
-    public KeyValue lookup(InternalRow key, int startLevel) throws IOException 
{
+    public T lookup(InternalRow key, int startLevel) throws IOException {
         return LookupUtils.lookup(levels, key, startLevel, this::lookup, 
this::lookupLevel0);
     }
 
     @Nullable
-    private KeyValue lookupLevel0(InternalRow key, TreeSet<DataFileMeta> 
level0)
-            throws IOException {
+    private T lookupLevel0(InternalRow key, TreeSet<DataFileMeta> level0) 
throws IOException {
         return LookupUtils.lookupLevel0(keyComparator, key, level0, 
this::lookup);
     }
 
     @Nullable
-    private KeyValue lookup(InternalRow key, SortedRun level) throws 
IOException {
+    private T lookup(InternalRow key, SortedRun level) throws IOException {
         return LookupUtils.lookup(keyComparator, key, level, this::lookup);
     }
 
     @Nullable
-    private KeyValue lookup(InternalRow key, DataFileMeta file) throws 
IOException {
+    private T lookup(InternalRow key, DataFileMeta file) throws IOException {
         LookupFile lookupFile = lookupFiles.getIfPresent(file.fileName());
 
         while (lookupFile == null || lookupFile.isClosed) {
@@ -143,13 +141,8 @@ public class LookupLevels implements 
Levels.DropFileCallback, Closeable {
         if (valueBytes == null) {
             return null;
         }
-        InternalRow value = valueSerializer.deserialize(valueBytes);
-        long sequenceNumber =
-                
MemorySegment.wrap(valueBytes).getLongBigEndian(valueBytes.length - 9);
-        RowKind rowKind = RowKind.fromByteValue(valueBytes[valueBytes.length - 
1]);
-        return new KeyValue()
-                .replace(key, sequenceNumber, rowKind, value)
-                .setLevel(lookupFile.remoteFile().level());
+
+        return valueProcessor.readFromDisk(key, 
lookupFile.remoteFile().level(), valueBytes);
     }
 
     private int fileWeigh(String file, LookupFile lookupFile) {
@@ -175,17 +168,12 @@ public class LookupLevels implements 
Levels.DropFileCallback, Closeable {
                 lookupStoreFactory.createWriter(localFile, 
bfGenerator.apply(file.rowCount()));
         LookupStoreFactory.Context context;
         try (RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) {
-            DataOutputSerializer valueOut = new DataOutputSerializer(32);
             RecordReader.RecordIterator<KeyValue> batch;
             KeyValue kv;
             while ((batch = reader.readBatch()) != null) {
                 while ((kv = batch.next()) != null) {
                     byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
-                    valueOut.clear();
-                    
valueOut.write(valueSerializer.serializeToBytes(kv.value()));
-                    valueOut.writeLong(kv.sequenceNumber());
-                    valueOut.writeByte(kv.valueKind().toByteValue());
-                    byte[] valueBytes = valueOut.getCopyOfBuffer();
+                    byte[] valueBytes = valueProcessor.persistToDisk(kv);
                     kvWriter.put(keyBytes, valueBytes);
                 }
                 batch.releaseBatch();
@@ -236,4 +224,57 @@ public class LookupLevels implements 
Levels.DropFileCallback, Closeable {
             FileIOUtils.deleteFileOrDirectory(localFile);
         }
     }
+
+    /** Processor to process value. */
+    public interface ValueProcessor<T> {
+
+        byte[] persistToDisk(KeyValue kv);
+
+        T readFromDisk(InternalRow key, int level, byte[] valueBytes);
+    }
+
+    /** A {@link ValueProcessor} to return {@link KeyValue}. */
+    public static class KeyValueProcessor implements ValueProcessor<KeyValue> {
+
+        private final RowCompactedSerializer valueSerializer;
+
+        public KeyValueProcessor(RowType valueType) {
+            this.valueSerializer = new RowCompactedSerializer(valueType);
+        }
+
+        @Override
+        public byte[] persistToDisk(KeyValue kv) {
+            byte[] vBytes = valueSerializer.serializeToBytes(kv.value());
+            byte[] bytes = new byte[vBytes.length + 8 + 1];
+            MemorySegment segment = MemorySegment.wrap(bytes);
+            segment.put(0, vBytes);
+            segment.putLong(bytes.length - 9, kv.sequenceNumber());
+            segment.put(bytes.length - 1, kv.valueKind().toByteValue());
+            return bytes;
+        }
+
+        @Override
+        public KeyValue readFromDisk(InternalRow key, int level, byte[] bytes) 
{
+            InternalRow value = valueSerializer.deserialize(bytes);
+            long sequenceNumber = 
MemorySegment.wrap(bytes).getLong(bytes.length - 9);
+            RowKind rowKind = RowKind.fromByteValue(bytes[bytes.length - 1]);
+            return new KeyValue().replace(key, sequenceNumber, rowKind, 
value).setLevel(level);
+        }
+    }
+
+    /** A {@link ValueProcessor} to return {@link Boolean} only. */
+    public static class ContainsValueProcessor implements 
ValueProcessor<Boolean> {
+
+        private static final byte[] EMPTY_BYTES = new byte[0];
+
+        @Override
+        public byte[] persistToDisk(KeyValue kv) {
+            return EMPTY_BYTES;
+        }
+
+        @Override
+        public Boolean readFromDisk(InternalRow key, int level, byte[] bytes) {
+            return Boolean.TRUE;
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index c57087eea..f056fafaa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -20,7 +20,6 @@ package org.apache.paimon.mergetree.compact;
 
 import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.KeyValue;
-import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
@@ -45,8 +44,6 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
 
     protected final int maxLevel;
     protected final MergeEngine mergeEngine;
-    protected final RecordEqualiser valueEqualiser;
-    protected final boolean changelogRowDeduplicate;
 
     public ChangelogMergeTreeRewriter(
             int maxLevel,
@@ -55,14 +52,10 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
-            MergeSorter mergeSorter,
-            RecordEqualiser valueEqualiser,
-            boolean changelogRowDeduplicate) {
+            MergeSorter mergeSorter) {
         super(readerFactory, writerFactory, keyComparator, mfFactory, 
mergeSorter);
         this.maxLevel = maxLevel;
         this.mergeEngine = mergeEngine;
-        this.valueEqualiser = valueEqualiser;
-        this.changelogRowDeduplicate = changelogRowDeduplicate;
     }
 
     protected abstract boolean rewriteChangelog(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java
deleted file mode 100644
index 24f872da1..000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.mergetree.compact;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.KeyValue;
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.codegen.RecordEqualiser;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.KeyValueFileReaderFactory;
-import org.apache.paimon.io.KeyValueFileWriterFactory;
-import org.apache.paimon.mergetree.ContainsLevels;
-import org.apache.paimon.mergetree.MergeSorter;
-import org.apache.paimon.mergetree.SortedRun;
-import org.apache.paimon.utils.Filter;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Comparator;
-import java.util.List;
-
-import static 
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE;
-import static 
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE;
-import static 
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
-/**
- * A {@link MergeTreeCompactRewriter} for first row merge engine which 
produces changelog files by
- * contains for the compaction involving level 0 files.
- */
-public class FirstRowMergeTreeCompactRewriter extends 
ChangelogMergeTreeRewriter {
-
-    private final ContainsLevels containsLevels;
-
-    public FirstRowMergeTreeCompactRewriter(
-            int maxLevel,
-            CoreOptions.MergeEngine mergeEngine,
-            ContainsLevels containsLevels,
-            KeyValueFileReaderFactory readerFactory,
-            KeyValueFileWriterFactory writerFactory,
-            Comparator<InternalRow> keyComparator,
-            MergeFunctionFactory<KeyValue> mfFactory,
-            MergeSorter mergeSorter,
-            RecordEqualiser valueEqualiser,
-            boolean changelogRowDeduplicate) {
-        super(
-                maxLevel,
-                mergeEngine,
-                readerFactory,
-                writerFactory,
-                keyComparator,
-                mfFactory,
-                mergeSorter,
-                valueEqualiser,
-                changelogRowDeduplicate);
-        this.containsLevels = containsLevels;
-    }
-
-    @Override
-    protected boolean rewriteChangelog(
-            int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections) {
-        return rewriteLookupChangelog(outputLevel, sections);
-    }
-
-    @Override
-    protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta 
file) {
-        if (file.level() != 0) {
-            return NO_CHANGELOG;
-        }
-
-        if (outputLevel == maxLevel) {
-            return CHANGELOG_NO_REWRITE;
-        }
-
-        // FIRST_ROW must rewrite file, because some records that are already 
at higher level may be
-        // skipped
-        // See LookupMergeFunction, it just returns newly records.
-        return CHANGELOG_WITH_REWRITE;
-    }
-
-    @Override
-    protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int 
outputLevel) {
-        return new FistRowMergeFunctionWrapper(
-                mfFactory,
-                key -> {
-                    try {
-                        return containsLevels.contains(key, outputLevel + 1);
-                    } catch (IOException e) {
-                        throw new UncheckedIOException(e);
-                    }
-                });
-    }
-
-    @Override
-    public void close() throws IOException {
-        containsLevels.close();
-    }
-
-    @VisibleForTesting
-    static class FistRowMergeFunctionWrapper implements 
MergeFunctionWrapper<ChangelogResult> {
-
-        private final Filter<InternalRow> contains;
-        private final FirstRowMergeFunction mergeFunction;
-        private final ChangelogResult reusedResult = new ChangelogResult();
-
-        public FistRowMergeFunctionWrapper(
-                MergeFunctionFactory<KeyValue> mergeFunctionFactory, 
Filter<InternalRow> contains) {
-            this.contains = contains;
-            MergeFunction<KeyValue> mergeFunction = 
mergeFunctionFactory.create();
-            checkArgument(
-                    mergeFunction instanceof FirstRowMergeFunction,
-                    "Merge function should be a FirstRowMergeFunction, but is 
%s, there is a bug.",
-                    mergeFunction.getClass().getName());
-            this.mergeFunction = (FirstRowMergeFunction) mergeFunction;
-        }
-
-        @Override
-        public void reset() {
-            mergeFunction.reset();
-        }
-
-        @Override
-        public void add(KeyValue kv) {
-            mergeFunction.add(kv);
-        }
-
-        @Override
-        public ChangelogResult getResult() {
-            reusedResult.reset();
-            KeyValue result = mergeFunction.getResult();
-            if (result == null) {
-                return reusedResult;
-            }
-
-            if (contains.test(result.key())) {
-                // empty
-                return reusedResult;
-            }
-
-            reusedResult.setResult(result);
-            if (result.level() == 0) {
-                // new record, output changelog
-                return reusedResult.addChangelog(result);
-            }
-            return reusedResult;
-        }
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FistRowMergeFunctionWrapper.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FistRowMergeFunctionWrapper.java
new file mode 100644
index 000000000..fe96658c9
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FistRowMergeFunctionWrapper.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.Filter;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Wrapper for {@link MergeFunction}s to produce changelog by lookup for 
first row. */
+public class FistRowMergeFunctionWrapper implements 
MergeFunctionWrapper<ChangelogResult> {
+
+    private final Filter<InternalRow> contains;
+    private final FirstRowMergeFunction mergeFunction;
+    private final ChangelogResult reusedResult = new ChangelogResult();
+
+    public FistRowMergeFunctionWrapper(
+            MergeFunctionFactory<KeyValue> mergeFunctionFactory, 
Filter<InternalRow> contains) {
+        this.contains = contains;
+        MergeFunction<KeyValue> mergeFunction = mergeFunctionFactory.create();
+        checkArgument(
+                mergeFunction instanceof FirstRowMergeFunction,
+                "Merge function should be a FirstRowMergeFunction, but is %s, 
there is a bug.",
+                mergeFunction.getClass().getName());
+        this.mergeFunction = (FirstRowMergeFunction) mergeFunction;
+    }
+
+    @Override
+    public void reset() {
+        mergeFunction.reset();
+    }
+
+    @Override
+    public void add(KeyValue kv) {
+        mergeFunction.add(kv);
+    }
+
+    @Override
+    public ChangelogResult getResult() {
+        reusedResult.reset();
+        KeyValue result = mergeFunction.getResult();
+        if (result == null) {
+            return reusedResult;
+        }
+
+        if (contains.test(result.key())) {
+            // empty
+            return reusedResult;
+        }
+
+        reusedResult.setResult(result);
+        if (result.level() == 0) {
+            // new record, output changelog
+            return reusedResult.addChangelog(result);
+        }
+        return reusedResult;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index b8d64fa86..553b0ff5d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -39,6 +39,9 @@ import static 
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.Upg
 /** A {@link MergeTreeCompactRewriter} which produces changelog files for each 
full compaction. */
 public class FullChangelogMergeTreeCompactRewriter extends 
ChangelogMergeTreeRewriter {
 
+    private final RecordEqualiser valueEqualiser;
+    private final boolean changelogRowDeduplicate;
+
     public FullChangelogMergeTreeCompactRewriter(
             int maxLevel,
             CoreOptions.MergeEngine mergeEngine,
@@ -47,7 +50,7 @@ public class FullChangelogMergeTreeCompactRewriter extends 
ChangelogMergeTreeRew
             Comparator<InternalRow> keyComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
             MergeSorter mergeSorter,
-            RecordEqualiser valueComparator,
+            RecordEqualiser valueEqualiser,
             boolean changelogRowDeduplicate) {
         super(
                 maxLevel,
@@ -56,9 +59,9 @@ public class FullChangelogMergeTreeCompactRewriter extends 
ChangelogMergeTreeRew
                 writerFactory,
                 keyComparator,
                 mfFactory,
-                mergeSorter,
-                valueComparator,
-                changelogRowDeduplicate);
+                mergeSorter);
+        this.valueEqualiser = valueEqualiser;
+        this.changelogRowDeduplicate = changelogRowDeduplicate;
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index c78ded5ff..b88c26b99 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -42,21 +42,21 @@ import static 
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.Upg
  * A {@link MergeTreeCompactRewriter} which produces changelog files by lookup 
for the compaction
  * involving level 0 files.
  */
-public class LookupMergeTreeCompactRewriter extends ChangelogMergeTreeRewriter 
{
+public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewriter {
 
-    private final LookupLevels lookupLevels;
+    private final LookupLevels<T> lookupLevels;
+    private final MergeFunctionWrapperFactory<T> wrapperFactory;
 
     public LookupMergeTreeCompactRewriter(
             int maxLevel,
             MergeEngine mergeEngine,
-            LookupLevels lookupLevels,
+            LookupLevels<T> lookupLevels,
             KeyValueFileReaderFactory readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
             MergeSorter mergeSorter,
-            RecordEqualiser valueEqualiser,
-            boolean changelogRowDeduplicate) {
+            MergeFunctionWrapperFactory<T> wrapperFactory) {
         super(
                 maxLevel,
                 mergeEngine,
@@ -64,10 +64,9 @@ public class LookupMergeTreeCompactRewriter extends 
ChangelogMergeTreeRewriter {
                 writerFactory,
                 keyComparator,
                 mfFactory,
-                mergeSorter,
-                valueEqualiser,
-                changelogRowDeduplicate);
+                mergeSorter);
         this.lookupLevels = lookupLevels;
+        this.wrapperFactory = wrapperFactory;
     }
 
     @Override
@@ -100,21 +99,73 @@ public class LookupMergeTreeCompactRewriter extends 
ChangelogMergeTreeRewriter {
 
     @Override
     protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int 
outputLevel) {
-        return new LookupChangelogMergeFunctionWrapper(
-                mfFactory,
-                key -> {
-                    try {
-                        return lookupLevels.lookup(key, outputLevel + 1);
-                    } catch (IOException e) {
-                        throw new UncheckedIOException(e);
-                    }
-                },
-                valueEqualiser,
-                changelogRowDeduplicate);
+        return wrapperFactory.create(mfFactory, outputLevel, lookupLevels);
     }
 
     @Override
     public void close() throws IOException {
         lookupLevels.close();
     }
+
+    /** Factory to create {@link MergeFunctionWrapper}. */
+    public interface MergeFunctionWrapperFactory<T> {
+
+        MergeFunctionWrapper<ChangelogResult> create(
+                MergeFunctionFactory<KeyValue> mfFactory,
+                int outputLevel,
+                LookupLevels<T> lookupLevels);
+    }
+
+    /** A normal {@link MergeFunctionWrapperFactory} to create lookup wrapper. 
*/
+    public static class LookupMergeFunctionWrapperFactory
+            implements MergeFunctionWrapperFactory<KeyValue> {
+
+        private final RecordEqualiser valueEqualiser;
+        private final boolean changelogRowDeduplicate;
+
+        public LookupMergeFunctionWrapperFactory(
+                RecordEqualiser valueEqualiser, boolean 
changelogRowDeduplicate) {
+            this.valueEqualiser = valueEqualiser;
+            this.changelogRowDeduplicate = changelogRowDeduplicate;
+        }
+
+        @Override
+        public MergeFunctionWrapper<ChangelogResult> create(
+                MergeFunctionFactory<KeyValue> mfFactory,
+                int outputLevel,
+                LookupLevels<KeyValue> lookupLevels) {
+            return new LookupChangelogMergeFunctionWrapper(
+                    mfFactory,
+                    key -> {
+                        try {
+                            return lookupLevels.lookup(key, outputLevel + 1);
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                        }
+                    },
+                    valueEqualiser,
+                    changelogRowDeduplicate);
+        }
+    }
+
+    /** A {@link MergeFunctionWrapperFactory} for first row. */
+    public static class FirstRowMergeFunctionWrapperFactory
+            implements MergeFunctionWrapperFactory<Boolean> {
+
+        @Override
+        public MergeFunctionWrapper<ChangelogResult> create(
+                MergeFunctionFactory<KeyValue> mfFactory,
+                int outputLevel,
+                LookupLevels<Boolean> lookupLevels) {
+            return new FistRowMergeFunctionWrapper(
+                    mfFactory,
+                    key -> {
+                        try {
+                            return lookupLevels.lookup(key, outputLevel + 1) 
!= null;
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                        }
+                    });
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 3073dc8d9..6a18a34dc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -35,17 +35,19 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
-import org.apache.paimon.mergetree.ContainsLevels;
 import org.apache.paimon.mergetree.Levels;
 import org.apache.paimon.mergetree.LookupLevels;
+import org.apache.paimon.mergetree.LookupLevels.ContainsValueProcessor;
+import org.apache.paimon.mergetree.LookupLevels.KeyValueProcessor;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeWriter;
 import org.apache.paimon.mergetree.compact.CompactRewriter;
 import org.apache.paimon.mergetree.compact.CompactStrategy;
-import org.apache.paimon.mergetree.compact.FirstRowMergeTreeCompactRewriter;
 import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
 import 
org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
 import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
+import 
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.FirstRowMergeFunctionWrapperFactory;
+import 
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.LookupMergeFunctionWrapperFactory;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
 import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
@@ -243,74 +245,51 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                                     .copyWithoutProjection()
                                     .withValueProjection(new int[0][])
                                     .build(partition, bucket);
-                    ContainsLevels containsLevels = 
createContainsLevels(levels, keyOnlyReader);
-                    return new FirstRowMergeTreeCompactRewriter(
+                    return new LookupMergeTreeCompactRewriter<>(
                             maxLevel,
                             mergeEngine,
-                            containsLevels,
+                            createLookupLevels(levels, new 
ContainsValueProcessor(), keyOnlyReader),
                             readerFactory,
                             writerFactory,
                             keyComparator,
                             mfFactory,
                             mergeSorter,
-                            valueEqualiserSupplier.get(),
-                            options.changelogRowDeduplicate());
+                            new FirstRowMergeFunctionWrapperFactory());
+                } else {
+                    return new LookupMergeTreeCompactRewriter<>(
+                            maxLevel,
+                            mergeEngine,
+                            createLookupLevels(
+                                    levels, new KeyValueProcessor(valueType), 
readerFactory),
+                            readerFactory,
+                            writerFactory,
+                            keyComparator,
+                            mfFactory,
+                            mergeSorter,
+                            new LookupMergeFunctionWrapperFactory(
+                                    valueEqualiserSupplier.get(),
+                                    options.changelogRowDeduplicate()));
                 }
-                LookupLevels lookupLevels = createLookupLevels(levels, 
readerFactory);
-                return new LookupMergeTreeCompactRewriter(
-                        maxLevel,
-                        mergeEngine,
-                        lookupLevels,
-                        readerFactory,
-                        writerFactory,
-                        keyComparator,
-                        mfFactory,
-                        mergeSorter,
-                        valueEqualiserSupplier.get(),
-                        options.changelogRowDeduplicate());
             default:
                 return new MergeTreeCompactRewriter(
                         readerFactory, writerFactory, keyComparator, 
mfFactory, mergeSorter);
         }
     }
 
-    private LookupLevels createLookupLevels(
-            Levels levels, KeyValueFileReaderFactory readerFactory) {
-        if (ioManager == null) {
-            throw new RuntimeException(
-                    "Can not use lookup, there is no temp disk directory to 
use.");
-        }
-        Options options = this.options.toConfiguration();
-        return new LookupLevels(
-                levels,
-                keyComparatorSupplier.get(),
-                keyType,
-                valueType,
-                file ->
-                        readerFactory.createRecordReader(
-                                file.schemaId(), file.fileName(), 
file.fileSize(), file.level()),
-                () -> ioManager.createChannel().getPathFile(),
-                new HashLookupStoreFactory(
-                        cacheManager,
-                        this.options.cachePageSize(),
-                        options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR),
-                        
options.get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION)),
-                options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
-                options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
-                bfGenerator(options));
-    }
-
-    private ContainsLevels createContainsLevels(
-            Levels levels, KeyValueFileReaderFactory readerFactory) {
+    private <T> LookupLevels<T> createLookupLevels(
+            Levels levels,
+            LookupLevels.ValueProcessor<T> valueProcessor,
+            KeyValueFileReaderFactory readerFactory) {
         if (ioManager == null) {
             throw new RuntimeException(
                     "Can not use lookup, there is no temp disk directory to 
use.");
         }
         Options options = this.options.toConfiguration();
-        return new ContainsLevels(
+        return new LookupLevels<>(
                 levels,
                 keyComparatorSupplier.get(),
                 keyType,
+                valueProcessor,
                 file ->
                         readerFactory.createRecordReader(
                                 file.schemaId(), file.fileName(), 
file.fileSize(), file.level()),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 425d3d621..9f008eb39 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -53,7 +53,7 @@ import static 
org.apache.paimon.lookup.LookupStoreFactory.bfGenerator;
 /** Implementation for {@link TableQuery} for caching data and file in local. 
*/
 public class LocalTableQuery implements TableQuery {
 
-    private final Map<BinaryRow, Map<Integer, LookupLevels>> tableView;
+    private final Map<BinaryRow, Map<Integer, LookupLevels<KeyValue>>> 
tableView;
 
     private final CoreOptions options;
 
@@ -125,12 +125,13 @@ public class LocalTableQuery implements TableQuery {
         Levels levels = new Levels(keyComparatorSupplier.get(), dataFiles, 
options.numLevels());
         KeyValueFileReaderFactory factory = 
readerFactoryBuilder.build(partition, bucket);
         Options options = this.options.toConfiguration();
-        LookupLevels lookupLevels =
-                new LookupLevels(
+        LookupLevels<KeyValue> lookupLevels =
+                new LookupLevels<>(
                         levels,
                         keyComparatorSupplier.get(),
                         readerFactoryBuilder.keyType(),
-                        readerFactoryBuilder.projectedValueType(),
+                        new LookupLevels.KeyValueProcessor(
+                                readerFactoryBuilder.projectedValueType()),
                         file ->
                                 factory.createRecordReader(
                                         file.schemaId(),
@@ -154,11 +155,11 @@ public class LocalTableQuery implements TableQuery {
     @Override
     public synchronized InternalRow lookup(BinaryRow partition, int bucket, 
InternalRow key)
             throws IOException {
-        Map<Integer, LookupLevels> buckets = tableView.get(partition);
+        Map<Integer, LookupLevels<KeyValue>> buckets = 
tableView.get(partition);
         if (buckets == null || buckets.isEmpty()) {
             return null;
         }
-        LookupLevels lookupLevels = buckets.get(bucket);
+        LookupLevels<KeyValue> lookupLevels = buckets.get(bucket);
         if (lookupLevels == null) {
             return null;
         }
@@ -189,8 +190,10 @@ public class LocalTableQuery implements TableQuery {
 
     @Override
     public void close() throws IOException {
-        for (Map.Entry<BinaryRow, Map<Integer, LookupLevels>> buckets : 
tableView.entrySet()) {
-            for (Map.Entry<Integer, LookupLevels> bucket : 
buckets.getValue().entrySet()) {
+        for (Map.Entry<BinaryRow, Map<Integer, LookupLevels<KeyValue>>> 
buckets :
+                tableView.entrySet()) {
+            for (Map.Entry<Integer, LookupLevels<KeyValue>> bucket :
+                    buckets.getValue().entrySet()) {
                 bucket.getValue().close();
             }
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 6db0abc90..631a6e47f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -64,7 +64,7 @@ import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
 import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test {@link ContainsLevels}. */
+/** Test {@link LookupLevels} for contains. */
 public class ContainsLevelsTest {
 
     private static final String LOOKUP_FILE_PREFIX = "lookup-";
@@ -88,22 +88,23 @@ public class ContainsLevelsTest {
                                 newFile(1, kv(1, 11), kv(3, 33), kv(5, 5)),
                                 newFile(2, kv(2, 22), kv(5, 55))),
                         3);
-        ContainsLevels containsLevels = createContainsLevels(levels, 
MemorySize.ofMebiBytes(10));
+        LookupLevels<Boolean> containsLevels =
+                createContainsLevels(levels, MemorySize.ofMebiBytes(10));
 
         // only in level 1
-        assertThat(containsLevels.contains(row(1), 1)).isTrue();
+        assertThat(containsLevels.lookup(row(1), 1)).isTrue();
 
         // only in level 2
-        assertThat(containsLevels.contains(row(2), 1)).isTrue();
+        assertThat(containsLevels.lookup(row(2), 1)).isTrue();
 
         // both in level 1 and level 2
-        assertThat(containsLevels.contains(row(5), 1)).isTrue();
+        assertThat(containsLevels.lookup(row(5), 1)).isTrue();
 
         // no exists
-        assertThat(containsLevels.contains(row(4), 1)).isFalse();
+        assertThat(containsLevels.lookup(row(4), 1)).isNull();
 
         containsLevels.close();
-        
assertThat(containsLevels.containsFiles().estimatedSize()).isEqualTo(0);
+        assertThat(containsLevels.lookupFiles().estimatedSize()).isEqualTo(0);
     }
 
     @Test
@@ -117,7 +118,8 @@ public class ContainsLevelsTest {
                                 newFile(1, kv(7, 77), kv(8, 88)),
                                 newFile(1, kv(10, 1010), kv(11, 1111))),
                         1);
-        ContainsLevels containsLevels = createContainsLevels(levels, 
MemorySize.ofMebiBytes(10));
+        LookupLevels<Boolean> containsLevels =
+                createContainsLevels(levels, MemorySize.ofMebiBytes(10));
 
         Map<Integer, Integer> contains =
                 new HashMap<Integer, Integer>() {
@@ -133,16 +135,16 @@ public class ContainsLevelsTest {
                     }
                 };
         for (Map.Entry<Integer, Integer> entry : contains.entrySet()) {
-            assertThat(containsLevels.contains(row(entry.getKey()), 
1)).isTrue();
+            assertThat(containsLevels.lookup(row(entry.getKey()), 1)).isTrue();
         }
 
         int[] notContains = new int[] {0, 3, 6, 9, 12};
         for (int key : notContains) {
-            assertThat(containsLevels.contains(row(key), 1)).isFalse();
+            assertThat(containsLevels.lookup(row(key), 1)).isNull();
         }
 
         containsLevels.close();
-        
assertThat(containsLevels.containsFiles().estimatedSize()).isEqualTo(0);
+        assertThat(containsLevels.lookupFiles().estimatedSize()).isEqualTo(0);
     }
 
     @Test
@@ -159,28 +161,30 @@ public class ContainsLevelsTest {
             files.add(newFile(1, kvs.toArray(new KeyValue[0])));
         }
         Levels levels = new Levels(comparator, files, 1);
-        ContainsLevels lookupLevels = createContainsLevels(levels, 
MemorySize.ofKibiBytes(60));
+        LookupLevels<Boolean> lookupLevels =
+                createContainsLevels(levels, MemorySize.ofKibiBytes(60));
 
         for (int i = 0; i < fileNum * recordInFile; i++) {
-            assertThat(lookupLevels.contains(row(i), 1)).isTrue();
+            assertThat(lookupLevels.lookup(row(i), 1)).isTrue();
         }
 
         // some files are invalided
-        long fileNumber = lookupLevels.containsFiles().estimatedSize();
+        long fileNumber = lookupLevels.lookupFiles().estimatedSize();
         String[] lookupFiles =
                 tempDir.toFile().list((dir, name) -> 
name.startsWith(LOOKUP_FILE_PREFIX));
         assertThat(lookupFiles).isNotNull();
         
assertThat(fileNumber).isNotEqualTo(fileNum).isEqualTo(lookupFiles.length);
 
         lookupLevels.close();
-        assertThat(lookupLevels.containsFiles().estimatedSize()).isEqualTo(0);
+        assertThat(lookupLevels.lookupFiles().estimatedSize()).isEqualTo(0);
     }
 
-    private ContainsLevels createContainsLevels(Levels levels, MemorySize 
maxDiskSize) {
-        return new ContainsLevels(
+    private LookupLevels<Boolean> createContainsLevels(Levels levels, 
MemorySize maxDiskSize) {
+        return new LookupLevels<>(
                 levels,
                 comparator,
                 keyType,
+                new LookupLevels.ContainsValueProcessor(),
                 file ->
                         createReaderFactory()
                                 .createRecordReader(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 4dc7504e5..e7df8fa8d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -89,7 +89,8 @@ public class LookupLevelsTest {
                                 newFile(1, kv(1, 11, 1), kv(3, 33, 2), kv(5, 
5, 3)),
                                 newFile(2, kv(2, 22, 4), kv(5, 55, 5))),
                         3);
-        LookupLevels lookupLevels = createLookupLevels(levels, 
MemorySize.ofMebiBytes(10));
+        LookupLevels<KeyValue> lookupLevels =
+                createLookupLevels(levels, MemorySize.ofMebiBytes(10));
 
         // only in level 1
         KeyValue kv = lookupLevels.lookup(row(1), 1);
@@ -131,7 +132,8 @@ public class LookupLevelsTest {
                                 newFile(1, kv(7, 77), kv(8, 88)),
                                 newFile(1, kv(10, 1010), kv(11, 1111))),
                         1);
-        LookupLevels lookupLevels = createLookupLevels(levels, 
MemorySize.ofMebiBytes(10));
+        LookupLevels<KeyValue> lookupLevels =
+                createLookupLevels(levels, MemorySize.ofMebiBytes(10));
 
         Map<Integer, Integer> contains =
                 new HashMap<Integer, Integer>() {
@@ -178,7 +180,8 @@ public class LookupLevelsTest {
             files.add(newFile(1, kvs.toArray(new KeyValue[0])));
         }
         Levels levels = new Levels(comparator, files, 1);
-        LookupLevels lookupLevels = createLookupLevels(levels, 
MemorySize.ofKibiBytes(20));
+        LookupLevels<KeyValue> lookupLevels =
+                createLookupLevels(levels, MemorySize.ofKibiBytes(20));
 
         for (int i = 0; i < fileNum * recordInFile; i++) {
             KeyValue kv = lookupLevels.lookup(row(i), 1);
@@ -209,7 +212,8 @@ public class LookupLevelsTest {
                                 // empty level 2
                                 newFile(3, kv(2, 22), kv(5, 55))),
                         3);
-        LookupLevels lookupLevels = createLookupLevels(levels, 
MemorySize.ofMebiBytes(10));
+        LookupLevels<KeyValue> lookupLevels =
+                createLookupLevels(levels, MemorySize.ofMebiBytes(10));
 
         KeyValue kv = lookupLevels.lookup(row(2), 1);
         assertThat(kv).isNotNull();
@@ -225,7 +229,8 @@ public class LookupLevelsTest {
                                 newFile(1, kv(1, 11), kv(3, 33), kv(5, 5)),
                                 newFile(2, kv(2, 22), kv(5, 55))),
                         3);
-        LookupLevels lookupLevels = createLookupLevels(levels, 
MemorySize.ofMebiBytes(10));
+        LookupLevels<KeyValue> lookupLevels =
+                createLookupLevels(levels, MemorySize.ofMebiBytes(10));
 
         KeyValue kv = lookupLevels.lookup(row(1), 0);
         assertThat(kv).isNotNull();
@@ -250,12 +255,12 @@ public class LookupLevelsTest {
         assertThat(kv.value().getInt(1)).isEqualTo(11);
     }
 
-    private LookupLevels createLookupLevels(Levels levels, MemorySize 
maxDiskSize) {
-        return new LookupLevels(
+    private LookupLevels<KeyValue> createLookupLevels(Levels levels, 
MemorySize maxDiskSize) {
+        return new LookupLevels<>(
                 levels,
                 comparator,
                 keyType,
-                rowType,
+                new LookupLevels.KeyValueProcessor(rowType),
                 file ->
                         createReaderFactory()
                                 .createRecordReader(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index df51eecff..7564ca07d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -296,8 +296,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
     @Test
     public void testFirstRow() {
         Set<InternalRow> highLevel = new HashSet<>();
-        FirstRowMergeTreeCompactRewriter.FistRowMergeFunctionWrapper function =
-                new 
FirstRowMergeTreeCompactRewriter.FistRowMergeFunctionWrapper(
+        FistRowMergeFunctionWrapper function =
+                new FistRowMergeFunctionWrapper(
                         projection ->
                                 new FirstRowMergeFunction(
                                         new RowType(

Reply via email to