This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cd9746b [core] Introduce lookup changelog producer
5cd9746b is described below

commit 5cd9746b1df2674ee4ca8ab00f07e3dff24848d6
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 15 10:23:31 2023 +0800

    [core] Introduce lookup changelog producer
    
    This closes #590
---
 docs/content/docs/concepts/primary-key-table.md    |  48 +++-
 .../shortcodes/generated/core_configuration.html   |  26 +-
 .../generated/flink_connector_configuration.html   |   6 +
 docs/static/img/changelog-producer-lookup.png      | Bin 0 -> 79473 bytes
 .../table/store/data/RandomAccessInputView.java    |   4 +-
 .../table/store/io/SeekableDataInputView.java      |  25 +-
 .../flink/table/store/io/cache/CacheManager.java   | 137 +++++++++++
 .../store/io/cache/CachedRandomInputView.java      | 108 ++++++++
 .../store/lookup/hash/HashLookupStoreFactory.java  |  12 +-
 .../store/lookup/hash/HashLookupStoreReader.java   | 190 +++-----------
 .../flink/table/store/options/MemorySize.java      |   4 +
 .../apache/flink/table/store/utils/IOFunction.java |  21 +-
 .../apache/flink/table/store/utils/MathUtils.java  |  11 +
 .../flink/table/store/utils/SimpleReadBuffer.java  |  43 ----
 .../flink/table/store/utils/VarLengthIntUtils.java |  23 --
 .../store/io/cache/CachedRandomInputViewTest.java  | 106 ++++++++
 .../lookup/hash/HashLookupStoreFactoryTest.java    |  37 ++-
 .../org/apache/flink/table/store/CoreOptions.java  |  34 ++-
 .../file/append/AppendOnlyCompactManager.java      |   4 +
 .../table/store/file/compact/CompactManager.java   |   3 +-
 .../store/file/compact/NoopCompactManager.java     |   4 +
 .../table/store/file/mergetree/DataFileReader.java |  68 +++++
 .../flink/table/store/file/mergetree/Levels.java   |  23 ++
 .../table/store/file/mergetree/LookupLevels.java   | 268 ++++++++++++++++++++
 .../store/file/mergetree/MergeTreeWriter.java      |   1 +
 .../mergetree/compact/AbstractCompactRewriter.java |   4 +
 .../file/mergetree/compact/CompactRewriter.java    |   3 +-
 .../FullChangelogMergeTreeCompactRewriter.java     |   4 +
 .../file/mergetree/compact/LookupCompaction.java   |  59 +++++
 ...er.java => LookupMergeTreeCompactRewriter.java} |  55 +++--
 .../mergetree/compact/MergeTreeCompactManager.java |   6 +
 .../mergetree/compact/UniversalCompaction.java     |   1 +
 .../file/operation/KeyValueFileStoreWrite.java     |  81 ++++--
 .../store/file/operation/MemoryFileStoreWrite.java |  15 +-
 .../table/store/file/schema/SchemaValidation.java  |  19 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |  10 +-
 .../store/table/source/StreamDataTableScan.java    |  19 +-
 .../table/source/StreamDataTableScanImpl.java      |  32 ++-
 .../store/file/mergetree/LookupLevelsTest.java     | 274 +++++++++++++++++++++
 .../store/connector/FlinkConnectorOptions.java     |  11 +
 .../table/store/connector/sink/FlinkSink.java      |  49 ++--
 .../sink/LookupChangelogStoreSinkWrite.java        |  50 ++++
 .../table/store/connector/CatalogITCaseBase.java   |   5 +
 .../ChangelogWithKeyFileStoreTableITCase.java      | 239 ++++++++++++------
 .../connector/LookupChangelogWithAggITCase.java    |  57 +++++
 .../table/store/connector/LookupJoinITCase.java    |   2 +-
 46 files changed, 1760 insertions(+), 441 deletions(-)

diff --git a/docs/content/docs/concepts/primary-key-table.md 
b/docs/content/docs/concepts/primary-key-table.md
index 84d20247..7cfa1aa0 100644
--- a/docs/content/docs/concepts/primary-key-table.md
+++ b/docs/content/docs/concepts/primary-key-table.md
@@ -60,7 +60,7 @@ For example, let's say Table Store receives three records:
 If the first column is the primary key. The final result will be `<1, 25.2, 
10, 'This is a book'>`.
 
 {{< hint info >}}
-For streaming queries, `partial-update` merge engine must be used together 
with `full-compaction` [changelog producer]({{< ref 
"docs/concepts/primary-key-table#changelog-producers" >}}).
+For streaming queries, `partial-update` merge engine must be used together 
with `lookup` or `full-compaction` [changelog producer]({{< ref 
"docs/concepts/primary-key-table#changelog-producers" >}}).
 {{< /hint >}}
 
 {{< hint info >}}
@@ -109,7 +109,7 @@ If you allow some functions to ignore retraction messages, 
you can configure:
 `'fields.${field_name}.ignore-retract'='true'`.
 
 {{< hint info >}}
-For streaming queries, `aggregation` merge engine must be used together with 
`full-compaction` [changelog producer]({{< ref 
"docs/concepts/primary-key-table#changelog-producers" >}}).
+For streaming queries, `aggregation` merge engine must be used together with 
`lookup` or `full-compaction` [changelog producer]({{< ref 
"docs/concepts/primary-key-table#changelog-producers" >}}).
 {{< /hint >}}
 
 ## Changelog Producers
@@ -144,9 +144,51 @@ By specifying `'changelog-producer' = 'input'`, Table 
Store writers rely on thei
 
 {{< img src="/img/changelog-producer-input.png">}}
 
+### Lookup
+
+If your input can’t produce a complete changelog but you still want to get rid 
of the costly normalized operator, you may consider using the `'lookup'` 
changelog producer.
+
+By specifying `'changelog-producer' = 'lookup'`, Table Store will generate 
changelog through `'lookup'` before committing the data writing.
+
+{{< img src="/img/changelog-producer-lookup.png">}}
+
+Lookup will cache data on the memory and local disk, you can use the following 
options to tune performance:
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Option</th>
+      <th class="text-left" style="width: 5%">Default</th>
+      <th class="text-left" style="width: 10%">Type</th>
+      <th class="text-left" style="width: 60%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td><h5>lookup.cache-file-retention</h5></td>
+        <td style="word-wrap: break-word;">1 h</td>
+        <td>Duration</td>
+        <td>The cached files retention time for lookup. After the file 
expires, if there is a need for access, it will be re-read from the DFS to 
build an index on the local disk.</td>
+    </tr>
+    <tr>
+        <td><h5>lookup.cache-max-disk-size</h5></td>
+        <td style="word-wrap: break-word;">unlimited</td>
+        <td>MemorySize</td>
+        <td>Max disk size for lookup cache, you can use this option to limit 
the use of local disks.</td>
+    </tr>
+    <tr>
+        <td><h5>lookup.cache-max-memory-size</h5></td>
+        <td style="word-wrap: break-word;">256 mb</td>
+        <td>MemorySize</td>
+        <td>Max memory size for lookup cache.</td>
+    </tr>
+    </tbody>
+</table>
+
 ### Full Compaction
 
-If your input can’t produce a complete changelog but you still want to get rid 
of the costly normalized operator, you may consider using the full compaction 
changelog producer.
+If you think the resource consumption of 'lookup' is too large, you can 
consider using 'full-compaction' changelog producer,
+which can decouple data writing and changelog generation, and is more suitable 
for scenarios with high latency (For example, 10 minutes).
 
 By specifying `'changelog-producer' = 'full-compaction'`, Table Store will 
compare the results between full compactions and produce the differences as 
changelog. The latency of changelog is affected by the frequency of full 
compactions.
 
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index fc58a74e..d93339dc 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -30,7 +30,7 @@
             <td><h5>changelog-producer</h5></td>
             <td style="word-wrap: break-word;">none</td>
             <td><p>Enum</p></td>
-            <td>Whether to double write to a changelog file. This changelog 
file keeps the details of data changes, it can be read directly during stream 
reads.<br /><br />Possible values:<ul><li>"none": No changelog 
file.</li><li>"input": Double write to a changelog file when flushing memory 
table, the changelog is from input.</li><li>"full-compaction": Generate 
changelog files with each full compaction.</li></ul></td>
+            <td>Whether to double write to a changelog file. This changelog 
file keeps the details of data changes, it can be read directly during stream 
reads.<br /><br />Possible values:<ul><li>"none": No changelog 
file.</li><li>"input": Double write to a changelog file when flushing memory 
table, the changelog is from input.</li><li>"full-compaction": Generate 
changelog files with each full compaction.</li><li>"lookup": Generate changelog 
files through 'lookup' before committing the d [...]
         </tr>
         <tr>
             <td><h5>commit.force-compact</h5></td>
@@ -122,6 +122,30 @@
             <td>Boolean</td>
             <td>Whether to force the removal of the normalize node when 
streaming read. Note: This is dangerous and is likely to cause data errors if 
downstream is used to calculate aggregation and the input is not complete 
changelog.</td>
         </tr>
+        <tr>
+            <td><h5>lookup.cache-file-retention</h5></td>
+            <td style="word-wrap: break-word;">1 h</td>
+            <td>Duration</td>
+            <td>The cached files retention time for lookup. After the file 
expires, if there is a need for access, it will be re-read from the DFS to 
build an index on the local disk.</td>
+        </tr>
+        <tr>
+            <td><h5>lookup.cache-max-disk-size</h5></td>
+            <td style="word-wrap: break-word;">9223372036854775807 bytes</td>
+            <td>MemorySize</td>
+            <td>Max disk size for lookup cache, you can use this option to 
limit the use of local disks.</td>
+        </tr>
+        <tr>
+            <td><h5>lookup.cache-max-memory-size</h5></td>
+            <td style="word-wrap: break-word;">256 mb</td>
+            <td>MemorySize</td>
+            <td>Max memory size for lookup cache.</td>
+        </tr>
+        <tr>
+            <td><h5>lookup.hash-load-factor</h5></td>
+            <td style="word-wrap: break-word;">0.75</td>
+            <td>Float</td>
+            <td>The index load factor for lookup.</td>
+        </tr>
         <tr>
             <td><h5>manifest.format</h5></td>
             <td style="word-wrap: break-word;">"avro"</td>
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index aa51d674..092452c9 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -14,6 +14,12 @@
             <td>Duration</td>
             <td>When changelog-producer is set to FULL_COMPACTION, full 
compaction will be constantly triggered after this interval.</td>
         </tr>
+        <tr>
+            <td><h5>changelog-producer.lookup-wait</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>When changelog-producer is set to LOOKUP, commit will wait for 
changelog generation by lookup.</td>
+        </tr>
         <tr>
             <td><h5>log.system</h5></td>
             <td style="word-wrap: break-word;">"none"</td>
diff --git a/docs/static/img/changelog-producer-lookup.png 
b/docs/static/img/changelog-producer-lookup.png
new file mode 100644
index 00000000..0540863e
Binary files /dev/null and b/docs/static/img/changelog-producer-lookup.png 
differ
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/RandomAccessInputView.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/RandomAccessInputView.java
index da69a9a8..18de2307 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/RandomAccessInputView.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/RandomAccessInputView.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.data;
 
+import org.apache.flink.table.store.io.SeekableDataInputView;
 import org.apache.flink.table.store.memory.MemorySegment;
 import org.apache.flink.table.store.utils.MathUtils;
 
@@ -25,7 +26,7 @@ import java.io.EOFException;
 import java.util.ArrayList;
 
 /** A {@link AbstractPagedInputView} to read pages in memory. */
-public class RandomAccessInputView extends AbstractPagedInputView {
+public class RandomAccessInputView extends AbstractPagedInputView implements 
SeekableDataInputView {
 
     private final ArrayList<MemorySegment> segments;
 
@@ -54,6 +55,7 @@ public class RandomAccessInputView extends 
AbstractPagedInputView {
         this.limitInLastSegment = limitInLastSegment;
     }
 
+    @Override
     public void setReadPosition(long position) {
         final int bufferNum = (int) (position >>> this.segmentSizeBits);
         final int offset = (int) (position & this.segmentSizeMask);
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/SeekableDataInputView.java
similarity index 59%
copy from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
copy to 
flink-table-store-common/src/main/java/org/apache/flink/table/store/io/SeekableDataInputView.java
index d2b62bd1..0da271de 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/SeekableDataInputView.java
@@ -16,19 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.mergetree.compact;
+package org.apache.flink.table.store.io;
 
-import org.apache.flink.table.store.file.compact.CompactResult;
-import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.file.mergetree.SortedRun;
-
-import java.util.List;
-
-/** Rewrite sections to the files. */
-public interface CompactRewriter {
-
-    CompactResult rewrite(int outputLevel, boolean dropDelete, 
List<List<SortedRun>> sections)
-            throws Exception;
+/**
+ * Interface marking a {@link DataInputView} as seekable. Seekable views can 
set the position where
+ * they read from.
+ */
+public interface SeekableDataInputView extends DataInputView {
 
-    CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception;
+    /**
+     * Sets the read pointer to the given position.
+     *
+     * @param position The new read position.
+     */
+    void setReadPosition(long position);
 }
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/cache/CacheManager.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/cache/CacheManager.java
new file mode 100644
index 00000000..7daeae83
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/cache/CacheManager.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.io.cache;
+
+import org.apache.flink.table.store.annotation.VisibleForTesting;
+import org.apache.flink.table.store.memory.MemorySegment;
+import org.apache.flink.table.store.options.MemorySize;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import 
org.apache.flink.shaded.guava30.com.google.common.cache.RemovalNotification;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+/** Cache manager to cache bytes to paged {@link MemorySegment}s. */
+public class CacheManager {
+
+    private final int pageSize;
+    private final Cache<CacheKey, CacheValue> cache;
+
+    public CacheManager(int pageSize, MemorySize maxMemorySize) {
+        this.pageSize = pageSize;
+        this.cache =
+                CacheBuilder.newBuilder()
+                        .weigher(this::weigh)
+                        .maximumWeight(maxMemorySize.getBytes())
+                        .removalListener(this::onRemoval)
+                        .build();
+    }
+
+    @VisibleForTesting
+    Cache<CacheKey, CacheValue> cache() {
+        return cache;
+    }
+
+    public int pageSize() {
+        return pageSize;
+    }
+
+    public MemorySegment getPage(
+            RandomAccessFile file, int pageNumber, Consumer<Integer> 
cleanCallback) {
+        CacheKey key = new CacheKey(file, pageNumber);
+        CacheValue value;
+        try {
+            value = cache.get(key, () -> createValue(key, cleanCallback));
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+        return value.segment;
+    }
+
+    public void invalidPage(RandomAccessFile file, int pageNumber) {
+        cache.invalidate(new CacheKey(file, pageNumber));
+    }
+
+    private int weigh(CacheKey cacheKey, CacheValue cacheValue) {
+        return cacheValue.segment.size();
+    }
+
+    private void onRemoval(RemovalNotification<CacheKey, CacheValue> 
notification) {
+        
notification.getValue().cleanCallback.accept(notification.getKey().pageNumber);
+    }
+
+    private CacheValue createValue(CacheKey key, Consumer<Integer> 
cleanCallback)
+            throws IOException {
+        return new CacheValue(key.read(pageSize), cleanCallback);
+    }
+
+    private static class CacheKey {
+
+        private final RandomAccessFile file;
+        private final int pageNumber;
+
+        private CacheKey(RandomAccessFile file, int pageNumber) {
+            this.file = file;
+            this.pageNumber = pageNumber;
+        }
+
+        private MemorySegment read(int pageSize) throws IOException {
+            long length = file.length();
+            long pageAddress = (long) pageNumber * pageSize;
+            int len = (int) Math.min(pageSize, length - pageAddress);
+            byte[] bytes = new byte[len];
+            file.seek(pageAddress);
+            file.readFully(bytes);
+            return MemorySegment.wrap(bytes);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            CacheKey cacheKey = (CacheKey) o;
+            return pageNumber == cacheKey.pageNumber && Objects.equals(file, 
cacheKey.file);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(file, pageNumber);
+        }
+    }
+
+    private static class CacheValue {
+
+        private final MemorySegment segment;
+        private final Consumer<Integer> cleanCallback;
+
+        private CacheValue(MemorySegment segment, Consumer<Integer> 
cleanCallback) {
+            this.segment = segment;
+            this.cleanCallback = cleanCallback;
+        }
+    }
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/cache/CachedRandomInputView.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/cache/CachedRandomInputView.java
new file mode 100644
index 00000000..8d8e5f2f
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/cache/CachedRandomInputView.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.io.cache;
+
+import org.apache.flink.table.store.data.AbstractPagedInputView;
+import org.apache.flink.table.store.io.SeekableDataInputView;
+import org.apache.flink.table.store.memory.MemorySegment;
+import org.apache.flink.table.store.utils.MathUtils;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link SeekableDataInputView} to read bytes from {@link 
RandomAccessFile}, the bytes can be
+ * cached to {@link MemorySegment}s in {@link CacheManager}.
+ */
+public class CachedRandomInputView extends AbstractPagedInputView
+        implements SeekableDataInputView, Closeable {
+
+    private final RandomAccessFile file;
+    private final long fileLength;
+    private final CacheManager cacheManager;
+    private final Map<Integer, MemorySegment> segments;
+    private final int segmentSizeBits;
+    private final int segmentSizeMask;
+
+    private int currentSegmentIndex;
+
+    public CachedRandomInputView(File file, CacheManager cacheManager)
+            throws FileNotFoundException {
+        this.file = new RandomAccessFile(file, "r");
+        this.fileLength = file.length();
+        this.cacheManager = cacheManager;
+        this.segments = new HashMap<>();
+        int segmentSize = cacheManager.pageSize();
+        this.segmentSizeBits = MathUtils.log2strict(segmentSize);
+        this.segmentSizeMask = segmentSize - 1;
+
+        this.currentSegmentIndex = -1;
+    }
+
+    @Override
+    public void setReadPosition(long position) {
+        final int pageNumber = (int) (position >>> this.segmentSizeBits);
+        final int offset = (int) (position & this.segmentSizeMask);
+        this.currentSegmentIndex = pageNumber;
+        MemorySegment segment = getCurrentPage();
+        seekInput(segment, offset, getLimitForSegment(segment));
+    }
+
+    private MemorySegment getCurrentPage() {
+        return segments.computeIfAbsent(
+                currentSegmentIndex,
+                key -> cacheManager.getPage(file, currentSegmentIndex, 
this::invalidPage));
+    }
+
+    @Override
+    protected MemorySegment nextSegment(MemorySegment current) throws 
EOFException {
+        currentSegmentIndex++;
+        if ((long) currentSegmentIndex << segmentSizeBits >= fileLength) {
+            throw new EOFException();
+        }
+
+        return getCurrentPage();
+    }
+
+    @Override
+    protected int getLimitForSegment(MemorySegment segment) {
+        return segment.size();
+    }
+
+    private void invalidPage(int pageNumber) {
+        segments.remove(pageNumber);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // copy out to avoid ConcurrentModificationException
+        List<Integer> pages = new ArrayList<>(segments.keySet());
+        pages.forEach(page -> cacheManager.invalidPage(file, page));
+
+        file.close();
+    }
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactory.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactory.java
index 895ab08e..d827775a 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactory.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactory.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.store.lookup.hash;
 
+import org.apache.flink.table.store.io.cache.CacheManager;
 import org.apache.flink.table.store.lookup.LookupStoreFactory;
-import org.apache.flink.table.store.options.MemorySize;
 
 import java.io.File;
 import java.io.IOException;
@@ -27,14 +27,12 @@ import java.io.IOException;
 /** A {@link LookupStoreFactory} which uses hash to lookup records on disk. */
 public class HashLookupStoreFactory implements LookupStoreFactory {
 
+    private final CacheManager cacheManager;
     private final double loadFactor;
-    private final boolean useMmp;
-    private final MemorySize mmpSegmentSize;
 
-    public HashLookupStoreFactory(double loadFactor, boolean useMmp, 
MemorySize mmpSegmentSize) {
+    public HashLookupStoreFactory(CacheManager cacheManager, double 
loadFactor) {
+        this.cacheManager = cacheManager;
         this.loadFactor = loadFactor;
-        this.useMmp = useMmp;
-        this.mmpSegmentSize = mmpSegmentSize;
     }
 
     @Override
@@ -44,6 +42,6 @@ public class HashLookupStoreFactory implements 
LookupStoreFactory {
 
     @Override
     public HashLookupStoreReader createReader(File file) throws IOException {
-        return new HashLookupStoreReader(useMmp, mmpSegmentSize.getBytes(), 
file);
+        return new HashLookupStoreReader(cacheManager, file);
     }
 }
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreReader.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreReader.java
index 390d599b..f04908d7 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreReader.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreReader.java
@@ -22,9 +22,10 @@
 
 package org.apache.flink.table.store.lookup.hash;
 
+import org.apache.flink.table.store.io.cache.CacheManager;
+import org.apache.flink.table.store.io.cache.CachedRandomInputView;
 import org.apache.flink.table.store.lookup.LookupStoreReader;
 import org.apache.flink.table.store.utils.MurmurHashUtils;
-import org.apache.flink.table.store.utils.SimpleReadBuffer;
 import org.apache.flink.table.store.utils.VarLengthIntUtils;
 
 import org.slf4j.Logger;
@@ -32,15 +33,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
 import java.text.DecimalFormat;
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
@@ -55,8 +51,6 @@ public class HashLookupStoreReader
     private static final Logger LOG =
             LoggerFactory.getLogger(HashLookupStoreReader.class.getName());
 
-    // Buffer segment size
-    private final long mmpSegmentSize;
     // Key count for each key length
     private final int[] keyCounts;
     // Slot size for each key length
@@ -65,44 +59,28 @@ public class HashLookupStoreReader
     private final int[] slots;
     // Offset of the index for different key length
     private final int[] indexOffsets;
-    // Offset of the data in the channel
-    private final long dataOffset;
     // Offset of the data for different key length
     private final long[] dataOffsets;
-    // Data size
-    private final long dataSize;
-    // Index and data buffers
-    private MappedByteBuffer indexBuffer;
-    private MappedByteBuffer[] dataBuffers;
-    // FileChannel
-    private RandomAccessFile mappedFile;
-    private FileChannel channel;
-    // Use MMap for data?
-    private final boolean mMapData;
+    // File input view
+    private CachedRandomInputView inputView;
     // Buffers
-    private final SimpleReadBuffer sizeBuffer = new SimpleReadBuffer(new 
byte[5]);
     private final byte[] slotBuffer;
 
-    HashLookupStoreReader(boolean useMmp, long mmpSegmentSize, File file) 
throws IOException {
+    HashLookupStoreReader(CacheManager cacheManager, File file) throws 
IOException {
         // File path
         if (!file.exists()) {
             throw new FileNotFoundException("File " + file.getAbsolutePath() + 
" not found");
         }
         LOG.info("Opening file {}", file.getName());
 
-        this.mmpSegmentSize = mmpSegmentSize;
-        // Check valid segmentSize
-        if (this.mmpSegmentSize > Integer.MAX_VALUE) {
-            throw new IllegalArgumentException("The mmpSegmentSize can't be 
larger than 2GB");
-        }
-
         // Open file and read metadata
         long createdAt;
         FileInputStream inputStream = new FileInputStream(file);
         DataInputStream dataInputStream = new DataInputStream(new 
BufferedInputStream(inputStream));
         // Offset of the index in the channel
-        int indexOffset;
         int keyCount;
+        int indexOffset;
+        long dataOffset;
         try {
             // Time
             createdAt = dataInputStream.readLong();
@@ -136,9 +114,16 @@ public class HashLookupStoreReader
 
             slotBuffer = new byte[maxSlotSize];
 
-            // Read index and data offset
+            // Read index offset to resign indexOffsets
             indexOffset = dataInputStream.readInt();
+            for (int i = 0; i < indexOffsets.length; i++) {
+                indexOffsets[i] = indexOffset + indexOffsets[i];
+            }
+            // Read data offset to resign dataOffsets
             dataOffset = dataInputStream.readLong();
+            for (int i = 0; i < dataOffsets.length; i++) {
+                dataOffsets[i] = dataOffset + dataOffsets[i];
+            }
         } finally {
             // Close metadata
             dataInputStream.close();
@@ -146,42 +131,7 @@ public class HashLookupStoreReader
         }
 
         // Create Mapped file in read-only mode
-        mappedFile = new RandomAccessFile(file, "r");
-        channel = mappedFile.getChannel();
-        long fileSize = file.length();
-
-        // Create index buffer
-        indexBuffer =
-                channel.map(FileChannel.MapMode.READ_ONLY, indexOffset, 
dataOffset - indexOffset);
-
-        // Create data buffers
-        dataSize = fileSize - dataOffset;
-
-        // Check if data size fits in memory map limit
-        if (!useMmp) {
-            // Use classical disk read
-            mMapData = false;
-            dataBuffers = null;
-        } else {
-            // Use Mmap
-            mMapData = true;
-
-            // Build data buffers
-            int bufArraySize =
-                    (int) (dataSize / this.mmpSegmentSize)
-                            + ((dataSize % this.mmpSegmentSize != 0) ? 1 : 0);
-            dataBuffers = new MappedByteBuffer[bufArraySize];
-            int bufIdx = 0;
-            for (long offset = 0; offset < dataSize; offset += 
this.mmpSegmentSize) {
-                long remainingFileSize = dataSize - offset;
-                long thisSegmentSize = Math.min(this.mmpSegmentSize, 
remainingFileSize);
-                dataBuffers[bufIdx++] =
-                        channel.map(
-                                FileChannel.MapMode.READ_ONLY,
-                                dataOffset + offset,
-                                thisSegmentSize);
-            }
-        }
+        inputView = new CachedRandomInputView(file, cacheManager);
 
         // logging
         DecimalFormat integerFormat = new DecimalFormat("#,##0.00");
@@ -201,13 +151,8 @@ public class HashLookupStoreReader
                 .append(integerFormat.format((dataOffset - indexOffset) / 
(1024.0 * 1024.0)))
                 .append(" Mb\n");
         statMsg.append("  Data size: ")
-                .append(integerFormat.format((fileSize - dataOffset) / (1024.0 
* 1024.0)))
+                .append(integerFormat.format((file.length() - dataOffset) / 
(1024.0 * 1024.0)))
                 .append(" Mb\n");
-        if (mMapData) {
-            statMsg.append("  Number of memory mapped data buffers: 
").append(dataBuffers.length);
-        } else {
-            statMsg.append("  Memory mapped data disabled, using disk");
-        }
         LOG.info(statMsg.toString());
     }
 
@@ -217,25 +162,23 @@ public class HashLookupStoreReader
         if (keyLength >= slots.length || keyCounts[keyLength] == 0) {
             return null;
         }
-        long hash = MurmurHashUtils.hashBytesPositive(key);
+        int hash = MurmurHashUtils.hashBytesPositive(key);
         int numSlots = slots[keyLength];
         int slotSize = slotSizes[keyLength];
         int indexOffset = indexOffsets[keyLength];
         long dataOffset = dataOffsets[keyLength];
 
         for (int probe = 0; probe < numSlots; probe++) {
-            int slot = (int) ((hash + probe) % numSlots);
-            indexBuffer.position(indexOffset + slot * slotSize);
-            indexBuffer.get(slotBuffer, 0, slotSize);
+            long slot = (hash + probe) % numSlots;
+            inputView.setReadPosition(indexOffset + slot * slotSize);
+            inputView.readFully(slotBuffer, 0, slotSize);
 
             long offset = VarLengthIntUtils.decodeLong(slotBuffer, keyLength);
             if (offset == 0) {
                 return null;
             }
             if (isKey(slotBuffer, key)) {
-                return mMapData
-                        ? getMMapBytes(dataOffset + offset)
-                        : getDiskBytes(dataOffset + offset);
+                return getValue(dataOffset + offset);
             }
         }
         return null;
@@ -250,85 +193,18 @@ public class HashLookupStoreReader
         return true;
     }
 
-    // Read the data at the given offset, the data can be spread over multiple 
data buffers
-    private byte[] getMMapBytes(long offset) {
-        // Read the first 4 bytes to get the size of the data
-        ByteBuffer buf = getDataBuffer(offset);
-        int maxLen = (int) Math.min(5, dataSize - offset);
-
-        int size;
-        if (buf.remaining() >= maxLen) {
-            // Continuous read
-            int pos = buf.position();
-            size = VarLengthIntUtils.decodeInt(buf);
-
-            // Used in case of data is spread over multiple buffers
-            offset += buf.position() - pos;
-        } else {
-            // The size of the data is spread over multiple buffers
-            int len = maxLen;
-            int off = 0;
-            sizeBuffer.reset();
-            while (len > 0) {
-                buf = getDataBuffer(offset + off);
-                int count = Math.min(len, buf.remaining());
-                buf.get(sizeBuffer.getBuf(), off, count);
-                off += count;
-                len -= count;
-            }
-            size = VarLengthIntUtils.decodeInt(sizeBuffer);
-            offset += sizeBuffer.getPos();
-            buf = getDataBuffer(offset);
-        }
-
-        // Create output bytes
-        byte[] res = new byte[size];
-
-        // Check if the data is one buffer
-        if (buf.remaining() >= size) {
-            // Continuous read
-            buf.get(res, 0, size);
-        } else {
-            int len = size;
-            int off = 0;
-            while (len > 0) {
-                buf = getDataBuffer(offset);
-                int count = Math.min(len, buf.remaining());
-                buf.get(res, off, count);
-                offset += count;
-                off += count;
-                len -= count;
-            }
-        }
-
-        return res;
-    }
-
-    // Get data from disk
-    private byte[] getDiskBytes(long offset) throws IOException {
-        mappedFile.seek(dataOffset + offset);
+    private byte[] getValue(long offset) throws IOException {
+        inputView.setReadPosition(offset);
 
         // Get size of data
-        int size = VarLengthIntUtils.decodeInt(mappedFile);
+        int size = VarLengthIntUtils.decodeInt(inputView);
 
         // Create output bytes
         byte[] res = new byte[size];
-
-        // Read data
-        if (mappedFile.read(res) == -1) {
-            throw new EOFException();
-        }
-
+        inputView.readFully(res);
         return res;
     }
 
-    // Return the data buffer for the given position
-    private ByteBuffer getDataBuffer(long index) {
-        ByteBuffer buf = dataBuffers[(int) (index / mmpSegmentSize)];
-        buf.position((int) (index % mmpSegmentSize));
-        return buf;
-    }
-
     private String formatCreatedAt(long createdAt) {
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd G 'at' 
HH:mm:ss z");
         Calendar cl = Calendar.getInstance();
@@ -338,12 +214,8 @@ public class HashLookupStoreReader
 
     @Override
     public void close() throws IOException {
-        channel.close();
-        mappedFile.close();
-        indexBuffer = null;
-        dataBuffers = null;
-        mappedFile = null;
-        channel = null;
+        inputView.close();
+        inputView = null;
     }
 
     @Override
@@ -393,11 +265,11 @@ public class HashLookupStoreReader
         @Override
         public FastEntry next() {
             try {
-                indexBuffer.position(currentIndexOffset);
+                inputView.setReadPosition(currentIndexOffset);
 
                 long offset = 0;
                 while (offset == 0) {
-                    indexBuffer.get(currentSlotBuffer);
+                    inputView.readFully(currentSlotBuffer);
                     offset = VarLengthIntUtils.decodeLong(currentSlotBuffer, 
currentKeyLength);
                     currentIndexOffset += currentSlotBuffer.length;
                 }
@@ -407,7 +279,7 @@ public class HashLookupStoreReader
 
                 if (withValue) {
                     long valueOffset = currentDataOffset + offset;
-                    value = mMapData ? getMMapBytes(valueOffset) : 
getDiskBytes(valueOffset);
+                    value = getValue(valueOffset);
                 }
 
                 entry.set(key, value);
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/options/MemorySize.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/options/MemorySize.java
index 6595c729..b906d66c 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/options/MemorySize.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/options/MemorySize.java
@@ -84,6 +84,10 @@ public class MemorySize implements java.io.Serializable, 
Comparable<MemorySize>
         return new MemorySize(mebiBytes << 20);
     }
 
+    public static MemorySize ofKibiBytes(long kibiBytes) {
+        return new MemorySize(kibiBytes << 10);
+    }
+
     // ------------------------------------------------------------------------
 
     /** Gets the memory size in bytes. */
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/IOFunction.java
similarity index 59%
copy from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
copy to 
flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/IOFunction.java
index d2b62bd1..a6d761b3 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/IOFunction.java
@@ -16,19 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.mergetree.compact;
+package org.apache.flink.table.store.utils;
 
-import org.apache.flink.table.store.file.compact.CompactResult;
-import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.file.mergetree.SortedRun;
+import java.io.IOException;
 
-import java.util.List;
-
-/** Rewrite sections to the files. */
-public interface CompactRewriter {
-
-    CompactResult rewrite(int outputLevel, boolean dropDelete, 
List<List<SortedRun>> sections)
-            throws Exception;
+/**
+ * A functional interface for a {@link java.util.function.Function} that may 
throw {@link
+ * IOException}.
+ */
+@FunctionalInterface
+public interface IOFunction<T, R> {
 
-    CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception;
+    R apply(T value) throws IOException;
 }
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MathUtils.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MathUtils.java
index b14cc55b..6eb7cc15 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MathUtils.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MathUtils.java
@@ -21,6 +21,17 @@ package org.apache.flink.table.store.utils;
 /** Collection of simple mathematical routines. */
 public class MathUtils {
 
+    /**
+     * Decrements the given number down to the closest power of two. If the 
argument is a power of
+     * two, it remains unchanged.
+     *
+     * @param value The value to round down.
+     * @return The closest value that is a power of two and less or equal than 
the given value.
+     */
+    public static int roundDownToPowerOf2(int value) {
+        return Integer.highestOneBit(value);
+    }
+
     /**
      * Computes the logarithm of the given value to the base of 2. This method 
throws an error, if
      * the given argument is not a power of 2.
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/SimpleReadBuffer.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/SimpleReadBuffer.java
deleted file mode 100644
index 9960495e..00000000
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/SimpleReadBuffer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2015 LinkedIn Corp. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- */
-
-package org.apache.flink.table.store.utils;
-
-/** A simple read buffer provides {@code readUnsignedByte} and position. */
-public final class SimpleReadBuffer {
-
-    private int pos = 0;
-    private final byte[] buf;
-
-    public SimpleReadBuffer(byte[] data) {
-        buf = data;
-    }
-
-    public byte[] getBuf() {
-        return buf;
-    }
-
-    public int getPos() {
-        return pos;
-    }
-
-    public SimpleReadBuffer reset() {
-        pos = 0;
-        return this;
-    }
-
-    public int readUnsignedByte() {
-        return buf[pos++] & 0xff;
-    }
-}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
index a6180a80..481a1597 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
@@ -17,7 +17,6 @@ package org.apache.flink.table.store.utils;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 /** Utils for encoding int/long to var length bytes. */
 public final class VarLengthIntUtils {
@@ -99,17 +98,6 @@ public final class VarLengthIntUtils {
         return i;
     }
 
-    public static int decodeInt(SimpleReadBuffer is) {
-        for (int offset = 0, result = 0; offset < 32; offset += 7) {
-            int b = is.readUnsignedByte();
-            result |= (b & 0x7F) << offset;
-            if ((b & 0x80) == 0) {
-                return result;
-            }
-        }
-        throw new Error("Malformed integer.");
-    }
-
     public static int decodeInt(DataInput is) throws IOException {
         for (int offset = 0, result = 0; offset < 32; offset += 7) {
             int b = is.readUnsignedByte();
@@ -120,15 +108,4 @@ public final class VarLengthIntUtils {
         }
         throw new Error("Malformed integer.");
     }
-
-    public static int decodeInt(ByteBuffer bb) {
-        for (int offset = 0, result = 0; offset < 32; offset += 7) {
-            int b = bb.get() & 0xffff;
-            result |= (b & 0x7F) << offset;
-            if ((b & 0x80) == 0) {
-                return result;
-            }
-        }
-        throw new Error("Malformed integer.");
-    }
 }
diff --git 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/io/cache/CachedRandomInputViewTest.java
 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/io/cache/CachedRandomInputViewTest.java
new file mode 100644
index 00000000..e9e2f090
--- /dev/null
+++ 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/io/cache/CachedRandomInputViewTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.io.cache;
+
+import org.apache.flink.table.store.memory.MemorySegment;
+import org.apache.flink.table.store.options.MemorySize;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CachedRandomInputView}. */
+public class CachedRandomInputViewTest {
+
+    @TempDir Path tempDir;
+
+    private final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+    @Test
+    public void testMatched() throws IOException {
+        innerTest(1024 * 512);
+    }
+
+    @Test
+    public void testNotMatched() throws IOException {
+        innerTest(131092);
+    }
+
+    @Test
+    public void testRandom() throws IOException {
+        innerTest(rnd.nextInt(5000, 100000));
+    }
+
+    private void innerTest(int len) throws IOException {
+        byte[] bytes = new byte[len];
+        MemorySegment segment = MemorySegment.wrap(bytes);
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = (byte) rnd.nextInt();
+        }
+
+        File file = writeFile(bytes);
+        CacheManager cacheManager = new CacheManager(1024, 
MemorySize.ofKibiBytes(128));
+        CachedRandomInputView view = new CachedRandomInputView(file, 
cacheManager);
+
+        // read first one
+        view.setReadPosition(0);
+        assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(0));
+
+        // read mid
+        int mid = bytes.length / 2;
+        view.setReadPosition(mid);
+        assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(mid));
+
+        // read special
+        view.setReadPosition(1021);
+        assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(1021));
+
+        // read last one
+        view.setReadPosition(bytes.length - 1);
+        assertThat(view.readByte()).isEqualTo(bytes[bytes.length - 1]);
+
+        // random read
+        for (int i = 0; i < 10000; i++) {
+            int position = rnd.nextInt(bytes.length - 8);
+            view.setReadPosition(position);
+            
assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(position));
+        }
+
+        view.close();
+        assertThat(cacheManager.cache().size()).isEqualTo(0);
+    }
+
+    private File writeFile(byte[] bytes) throws IOException {
+        File file = new File(tempDir.toFile(), UUID.randomUUID().toString());
+        if (!file.createNewFile()) {
+            throw new IOException("Can not create: " + file);
+        }
+        Files.write(file.toPath(), bytes, StandardOpenOption.WRITE);
+        return file;
+    }
+}
diff --git 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactoryTest.java
 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactoryTest.java
index 6c76f425..fe058a94 100644
--- 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactoryTest.java
+++ 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactoryTest.java
@@ -23,7 +23,9 @@
 package org.apache.flink.table.store.lookup.hash;
 
 import org.apache.flink.table.store.io.DataOutputSerializer;
+import org.apache.flink.table.store.io.cache.CacheManager;
 import org.apache.flink.table.store.options.MemorySize;
+import org.apache.flink.table.store.utils.MathUtils;
 import org.apache.flink.table.store.utils.VarLengthIntUtils;
 
 import org.apache.commons.math3.random.RandomDataGenerator;
@@ -42,6 +44,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -57,7 +60,9 @@ public class HashLookupStoreFactoryTest {
 
     @BeforeEach
     public void setUp() throws IOException {
-        this.factory = new HashLookupStoreFactory(0.75d, true, 
MemorySize.ofMebiBytes(1024));
+        this.factory =
+                new HashLookupStoreFactory(
+                        new CacheManager(1024, MemorySize.ofMebiBytes(1)), 
0.75d);
         this.file = new File(tempDir.toFile(), UUID.randomUUID().toString());
         if (!file.createNewFile()) {
             throw new IOException("Can not create file: " + file);
@@ -171,7 +176,12 @@ public class HashLookupStoreFactoryTest {
         writeStore(file, keys, values);
 
         // Read
-        factory = new HashLookupStoreFactory(0.75d, true, new 
MemorySize(byteSize - 100));
+        factory =
+                new HashLookupStoreFactory(
+                        new CacheManager(
+                                MathUtils.roundDownToPowerOf2(byteSize - 100),
+                                MemorySize.ofMebiBytes(1)),
+                        0.75d);
         HashLookupStoreReader reader = factory.createReader(file);
         for (int i = 0; i < keys.length; i++) {
             
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
@@ -197,7 +207,12 @@ public class HashLookupStoreFactoryTest {
         writeStore(file, keys, values);
 
         // Read
-        factory = new HashLookupStoreFactory(0.75d, true, new 
MemorySize(byteSize + sizeSize + 3));
+        factory =
+                new HashLookupStoreFactory(
+                        new CacheManager(
+                                MathUtils.roundDownToPowerOf2(byteSize + 
sizeSize + 3),
+                                MemorySize.ofMebiBytes(1)),
+                        0.75d);
         HashLookupStoreReader reader = factory.createReader(file);
         for (int i = 0; i < keys.length; i++) {
             
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
@@ -250,21 +265,25 @@ public class HashLookupStoreFactoryTest {
     }
 
     @Test
-    public void testReadDisk() throws IOException {
-        Integer[] keys = generateIntKeys(10000);
+    public void testCacheExpiration() throws IOException {
+        int len = 1000;
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+        Object[] keys = new Object[len];
+        Object[] values = new Object[len];
+        for (int i = 0; i < len; i++) {
+            keys[i] = rnd.nextInt();
+            values[i] = generateStringData(100);
+        }
 
         // Write
-        Object[] values = generateStringData(keys.length, 1000);
         writeStore(file, keys, values);
 
         // Read
-        factory = new HashLookupStoreFactory(0.75d, false, 
MemorySize.ofMebiBytes(1024));
+        factory = new HashLookupStoreFactory(new CacheManager(1024, new 
MemorySize(8096)), 0.75d);
         HashLookupStoreReader reader = factory.createReader(file);
-
         for (int i = 0; i < keys.length; i++) {
             
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
         }
-        reader.close();
     }
 
     @Test
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 89395df8..6cb9ebe1 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -496,6 +496,34 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Define partition by table options, cannot define 
partition on DDL and table options at the same time.");
 
+    public static final ConfigOption<Float> LOOKUP_HASH_LOAD_FACTOR =
+            key("lookup.hash-load-factor")
+                    .floatType()
+                    .defaultValue(0.75F)
+                    .withDescription("The index load factor for lookup.");
+
+    public static final ConfigOption<Duration> LOOKUP_CACHE_FILE_RETENTION =
+            key("lookup.cache-file-retention")
+                    .durationType()
+                    .defaultValue(Duration.ofHours(1))
+                    .withDescription(
+                            "The cached files retention time for lookup. After 
the file expires,"
+                                    + " if there is a need for access, it will 
be re-read from the DFS to build"
+                                    + " an index on the local disk.");
+
+    public static final ConfigOption<MemorySize> LOOKUP_CACHE_MAX_DISK_SIZE =
+            key("lookup.cache-max-disk-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.MAX_VALUE)
+                    .withDescription(
+                            "Max disk size for lookup cache, you can use this 
option to limit the use of local disks.");
+
+    public static final ConfigOption<MemorySize> LOOKUP_CACHE_MAX_MEMORY_SIZE =
+            key("lookup.cache-max-memory-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("256 mb"))
+                    .withDescription("Max memory size for lookup cache.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -885,7 +913,11 @@ public class CoreOptions implements Serializable {
                 "input",
                 "Double write to a changelog file when flushing memory table, 
the changelog is from input."),
 
-        FULL_COMPACTION("full-compaction", "Generate changelog files with each 
full compaction.");
+        FULL_COMPACTION("full-compaction", "Generate changelog files with each 
full compaction."),
+
+        LOOKUP(
+                "lookup",
+                "Generate changelog files through 'lookup' before committing 
the data writing.");
 
         private final String value;
         private final String description;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
index 3777e27e..132995e0 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.table.store.file.io.DataFilePathFactory;
 import org.apache.flink.table.store.fs.FileIO;
 import org.apache.flink.table.store.utils.Preconditions;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -177,6 +178,9 @@ public class AppendOnlyCompactManager extends 
CompactFutureManager {
         return toCompact;
     }
 
+    @Override
+    public void close() throws IOException {}
+
     /**
      * A {@link CompactTask} impl for full compaction of append-only table.
      *
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
index e82e1748..982405d4 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
@@ -20,11 +20,12 @@ package org.apache.flink.table.store.file.compact;
 
 import org.apache.flink.table.store.file.io.DataFileMeta;
 
+import java.io.Closeable;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 
 /** Manager to submit compaction task. */
-public interface CompactManager {
+public interface CompactManager extends Closeable {
 
     /** Should wait compaction finish. */
     boolean shouldWaitCompaction();
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
index 195b59de..3aff478d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.utils.Preconditions;
 
+import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 
@@ -56,4 +57,7 @@ public class NoopCompactManager implements CompactManager {
 
     @Override
     public void cancelCompaction() {}
+
+    @Override
+    public void close() throws IOException {}
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/DataFileReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/DataFileReader.java
new file mode 100644
index 00000000..c7b53d26
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/DataFileReader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.lookup.LookupStoreReader;
+import org.apache.flink.table.store.options.MemorySize;
+import org.apache.flink.table.store.utils.FileIOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+/** Reader for a {@link DataFileMeta}. */
+public class DataFileReader implements Closeable {
+
+    private final File localFile;
+    private final DataFileMeta remoteFile;
+    private final LookupStoreReader reader;
+
+    public DataFileReader(File localFile, DataFileMeta remoteFile, 
LookupStoreReader reader) {
+        this.localFile = localFile;
+        this.remoteFile = remoteFile;
+        this.reader = reader;
+    }
+
+    @Nullable
+    public byte[] get(byte[] key) throws IOException {
+        return reader.lookup(key);
+    }
+
+    public int fileKibiBytes() {
+        long kibiBytes = localFile.length() >> 10;
+        if (kibiBytes > Integer.MAX_VALUE) {
+            throw new RuntimeException(
+                    "Lookup file is too big: " + 
MemorySize.ofKibiBytes(kibiBytes));
+        }
+        return (int) kibiBytes;
+    }
+
+    public DataFileMeta remoteFile() {
+        return remoteFile;
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+        FileIOUtils.deleteFileOrDirectory(localFile);
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
index f295f456..3804e29b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
@@ -27,6 +27,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
@@ -42,6 +43,8 @@ public class Levels {
 
     private final List<SortedRun> levels;
 
+    private final List<DropFileCallback> dropFileCallbacks = new ArrayList<>();
+
     public Levels(
             Comparator<InternalRow> keyComparator, List<DataFileMeta> 
inputFiles, int numLevels) {
         this.keyComparator = keyComparator;
@@ -83,6 +86,10 @@ public class Levels {
                 "Number of files stored in Levels does not equal to the size 
of inputFiles. This is unexpected.");
     }
 
+    public void addDropFileCallback(DropFileCallback callback) {
+        dropFileCallbacks.add(callback);
+    }
+
     public void addLevel0File(DataFileMeta file) {
         checkArgument(file.level() == 0);
         level0.add(file);
@@ -148,6 +155,16 @@ public class Levels {
                     groupedBefore.getOrDefault(i, emptyList()),
                     groupedAfter.getOrDefault(i, emptyList()));
         }
+
+        if (dropFileCallbacks.size() > 0) {
+            Set<String> droppedFiles =
+                    
before.stream().map(DataFileMeta::fileName).collect(Collectors.toSet());
+            // exclude upgrade files
+            
after.stream().map(DataFileMeta::fileName).forEach(droppedFiles::remove);
+            for (DropFileCallback callback : dropFileCallbacks) {
+                droppedFiles.forEach(callback::notifyDropFile);
+            }
+        }
     }
 
     private void updateLevel(int level, List<DataFileMeta> before, 
List<DataFileMeta> after) {
@@ -170,4 +187,10 @@ public class Levels {
         return files.stream()
                 .collect(Collectors.groupingBy(DataFileMeta::level, 
Collectors.toList()));
     }
+
+    /** A callback to notify dropping file. */
+    public interface DropFileCallback {
+
+        void notifyDropFile(String file);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LookupLevels.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LookupLevels.java
new file mode 100644
index 00000000..cc50f718
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LookupLevels.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.store.annotation.VisibleForTesting;
+import org.apache.flink.table.store.data.BinaryRow;
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.data.serializer.InternalRowSerializer;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.io.DataOutputSerializer;
+import org.apache.flink.table.store.lookup.LookupStoreFactory;
+import org.apache.flink.table.store.lookup.LookupStoreReader;
+import org.apache.flink.table.store.lookup.LookupStoreWriter;
+import org.apache.flink.table.store.memory.MemorySegment;
+import org.apache.flink.table.store.options.MemorySize;
+import org.apache.flink.table.store.reader.RecordReader;
+import org.apache.flink.table.store.types.RowKind;
+import org.apache.flink.table.store.utils.FileIOUtils;
+import org.apache.flink.table.store.utils.IOFunction;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import 
org.apache.flink.shaded.guava30.com.google.common.cache.RemovalNotification;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static org.apache.flink.table.store.options.ConfigOptions.key;
+
+/** Provide lookup by key. */
+public class LookupLevels implements Levels.DropFileCallback, Closeable {
+
+    private final Levels levels;
+    private final Comparator<InternalRow> keyComparator;
+    private final InternalRowSerializer keySerializer;
+    private final InternalRowSerializer valueSerializer;
+    private final IOFunction<DataFileMeta, RecordReader<KeyValue>> 
fileReaderFactory;
+    private final Supplier<File> localFileFactory;
+    private final LookupStoreFactory lookupStoreFactory;
+
+    private final Cache<String, LookupFile> lookupFiles;
+
+    public LookupLevels(
+            Levels levels,
+            Comparator<InternalRow> keyComparator,
+            InternalRowSerializer keySerializer,
+            InternalRowSerializer valueSerializer,
+            IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
+            Supplier<File> localFileFactory,
+            LookupStoreFactory lookupStoreFactory,
+            Duration fileRetention,
+            MemorySize maxDiskSize) {
+        this.levels = levels;
+        this.keyComparator = keyComparator;
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
+        this.fileReaderFactory = fileReaderFactory;
+        this.localFileFactory = localFileFactory;
+        this.lookupStoreFactory = lookupStoreFactory;
+        this.lookupFiles =
+                CacheBuilder.newBuilder()
+                        .expireAfterAccess(fileRetention)
+                        .maximumWeight(maxDiskSize.getKibiBytes())
+                        .weigher(this::fileWeigh)
+                        .removalListener(this::removalCallback)
+                        .build();
+        levels.addDropFileCallback(this);
+    }
+
+    @VisibleForTesting
+    Cache<String, LookupFile> lookupFiles() {
+        return lookupFiles;
+    }
+
+    @Override
+    public void notifyDropFile(String file) {
+        lookupFiles.invalidate(file);
+    }
+
+    @Nullable
+    public KeyValue lookup(InternalRow key, int startLevel) throws IOException 
{
+        if (startLevel == 0) {
+            throw new IllegalArgumentException("Start level can not be zero.");
+        }
+
+        KeyValue kv = null;
+        for (int i = startLevel; i < levels.numberOfLevels(); i++) {
+            SortedRun level = levels.runOfLevel(i);
+            kv = lookup(key, level);
+            if (kv != null) {
+                break;
+            }
+        }
+
+        return kv;
+    }
+
+    @Nullable
+    private KeyValue lookup(InternalRow target, SortedRun level) throws 
IOException {
+        List<DataFileMeta> files = level.files();
+        int left = 0;
+        int right = files.size() - 1;
+
+        // binary search restart positions to find the restart position 
immediately before the
+        // targetKey
+        while (left < right) {
+            int mid = (left + right) / 2;
+
+            if (keyComparator.compare(files.get(mid).maxKey(), target) < 0) {
+                // Key at "mid.max" is < "target".  Therefore all
+                // files at or before "mid" are uninteresting.
+                left = mid + 1;
+            } else {
+                // Key at "mid.max" is >= "target".  Therefore all files
+                // after "mid" are uninteresting.
+                right = mid;
+            }
+        }
+
+        int index = right;
+
+        // if the index is now pointing to the last file, check if the largest 
key in the block is
+        // than the target key.  If so, we need to seek beyond the end of this 
file
+        if (index == files.size() - 1
+                && keyComparator.compare(files.get(index).maxKey(), target) < 
0) {
+            index++;
+        }
+
+        // if files does not have a next, it means the key does not exist in 
this level
+        return index < files.size() ? lookup(target, files.get(index)) : null;
+    }
+
+    @Nullable
+    private KeyValue lookup(InternalRow key, DataFileMeta file) throws 
IOException {
+        LookupFile lookupFile;
+        try {
+            lookupFile = lookupFiles.get(file.fileName(), () -> 
createLookupFile(file));
+        } catch (ExecutionException e) {
+            throw new IOException(e);
+        }
+        byte[] keyBytes = keySerializer.toBinaryRow(key).toBytes();
+        byte[] valueBytes = lookupFile.get(keyBytes);
+        if (valueBytes == null) {
+            return null;
+        }
+        MemorySegment memorySegment = MemorySegment.wrap(valueBytes);
+        long sequenceNumber = memorySegment.getLong(0);
+        RowKind rowKind = RowKind.fromByteValue(valueBytes[8]);
+        BinaryRow value = new BinaryRow(valueSerializer.getArity());
+        value.pointTo(memorySegment, 9, valueBytes.length - 9);
+        return new KeyValue()
+                .replace(key, sequenceNumber, rowKind, value)
+                .setLevel(lookupFile.remoteFile().level());
+    }
+
+    private int fileWeigh(String file, LookupFile lookupFile) {
+        return lookupFile.fileKibiBytes();
+    }
+
+    private void removalCallback(RemovalNotification<String, LookupFile> 
notification) {
+        LookupFile reader = notification.getValue();
+        if (reader != null) {
+            try {
+                reader.close();
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        }
+    }
+
+    private LookupFile createLookupFile(DataFileMeta file) throws IOException {
+        File localFile = localFileFactory.get();
+        if (!localFile.createNewFile()) {
+            throw new IOException("Can not create new file: " + localFile);
+        }
+        try (LookupStoreWriter kvWriter = 
lookupStoreFactory.createWriter(localFile);
+                RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) 
{
+            DataOutputSerializer valueOut = new DataOutputSerializer(32);
+            RecordReader.RecordIterator<KeyValue> batch;
+            KeyValue kv;
+            while ((batch = reader.readBatch()) != null) {
+                while ((kv = batch.next()) != null) {
+                    byte[] keyBytes = 
keySerializer.toBinaryRow(kv.key()).toBytes();
+                    valueOut.clear();
+                    valueOut.writeLong(kv.sequenceNumber());
+                    valueOut.writeByte(kv.valueKind().toByteValue());
+                    
valueOut.write(valueSerializer.toBinaryRow(kv.value()).toBytes());
+                    byte[] valueBytes = valueOut.getCopyOfBuffer();
+                    kvWriter.put(keyBytes, valueBytes);
+                }
+                batch.releaseBatch();
+            }
+        } catch (IOException e) {
+            FileIOUtils.deleteFileOrDirectory(localFile);
+            throw e;
+        }
+
+        return new LookupFile(localFile, file, 
lookupStoreFactory.createReader(localFile));
+    }
+
+    @Override
+    public void close() throws IOException {
+        lookupFiles.invalidateAll();
+    }
+
+    private static class LookupFile implements Closeable {
+
+        private final File localFile;
+        private final DataFileMeta remoteFile;
+        private final LookupStoreReader reader;
+
+        public LookupFile(File localFile, DataFileMeta remoteFile, 
LookupStoreReader reader) {
+            this.localFile = localFile;
+            this.remoteFile = remoteFile;
+            this.reader = reader;
+        }
+
+        @Nullable
+        public byte[] get(byte[] key) throws IOException {
+            return reader.lookup(key);
+        }
+
+        public int fileKibiBytes() {
+            long kibiBytes = localFile.length() >> 10;
+            if (kibiBytes > Integer.MAX_VALUE) {
+                throw new RuntimeException(
+                        "Lookup file is too big: " + 
MemorySize.ofKibiBytes(kibiBytes));
+            }
+            return (int) kibiBytes;
+        }
+
+        public DataFileMeta remoteFile() {
+            return remoteFile;
+        }
+
+        @Override
+        public void close() throws IOException {
+            reader.close();
+            FileIOUtils.deleteFileOrDirectory(localFile);
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index b159b23e..28a1f742 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -277,6 +277,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
         // cancel compaction so that it does not block job cancelling
         compactManager.cancelCompaction();
         sync();
+        compactManager.close();
 
         // delete temporary files
         List<DataFileMeta> delete = new ArrayList<>(newFiles);
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AbstractCompactRewriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AbstractCompactRewriter.java
index 9881b0a5..8bff9e0f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AbstractCompactRewriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AbstractCompactRewriter.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.table.store.file.compact.CompactResult;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.SortedRun;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -41,4 +42,7 @@ public abstract class AbstractCompactRewriter implements 
CompactRewriter {
                 .flatMap(Collection::stream)
                 .collect(Collectors.toList());
     }
+
+    @Override
+    public void close() throws IOException {}
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
index d2b62bd1..dd6cc8dd 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
@@ -22,10 +22,11 @@ import 
org.apache.flink.table.store.file.compact.CompactResult;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.SortedRun;
 
+import java.io.Closeable;
 import java.util.List;
 
 /** Rewrite sections to the files. */
-public interface CompactRewriter {
+public interface CompactRewriter extends Closeable {
 
     CompactResult rewrite(int outputLevel, boolean dropDelete, 
List<List<SortedRun>> sections)
             throws Exception;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index 6fc064c4..5bbe3571 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
 import org.apache.flink.table.store.file.mergetree.SortedRun;
 import org.apache.flink.table.store.utils.Preconditions;
 
+import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
 
@@ -65,4 +66,7 @@ public class FullChangelogMergeTreeCompactRewriter extends 
ChangelogMergeTreeRew
     protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int 
outputLevel) {
         return new FullChangelogMergeFunctionWrapper(mfFactory.create(), 
maxLevel);
     }
+
+    @Override
+    public void close() throws IOException {}
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupCompaction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupCompaction.java
new file mode 100644
index 00000000..acab8c3d
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupCompaction.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.compact.CompactUnit;
+import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A {@link CompactStrategy} to force compacting level 0 files. */
+public class LookupCompaction implements CompactStrategy {
+
+    private final UniversalCompaction universalCompaction;
+
+    public LookupCompaction(UniversalCompaction universalCompaction) {
+        this.universalCompaction = universalCompaction;
+    }
+
+    @Override
+    public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> 
runs) {
+        Optional<CompactUnit> pick = universalCompaction.pick(numLevels, runs);
+        if (pick.isPresent()) {
+            return pick;
+        }
+
+        if (runs.isEmpty() || runs.get(0).level() > 0) {
+            return Optional.empty();
+        }
+
+        // collect all level 0 files
+        int candidateCount = 1;
+        for (int i = candidateCount; i < runs.size(); i++) {
+            if (runs.get(i).level() > 0) {
+                break;
+            }
+            candidateCount++;
+        }
+
+        return Optional.of(
+                universalCompaction.pickForSizeRatio(numLevels - 1, runs, 
candidateCount, true));
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupMergeTreeCompactRewriter.java
similarity index 58%
copy from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
copy to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupMergeTreeCompactRewriter.java
index 6fc064c4..ce3fdbcf 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -23,46 +23,71 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.io.KeyValueFileReaderFactory;
 import org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
+import org.apache.flink.table.store.file.mergetree.LookupLevels;
 import org.apache.flink.table.store.file.mergetree.SortedRun;
-import org.apache.flink.table.store.utils.Preconditions;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Comparator;
 import java.util.List;
 
-/** A {@link MergeTreeCompactRewriter} which produces changelog files for each 
full compaction. */
-public class FullChangelogMergeTreeCompactRewriter extends 
ChangelogMergeTreeRewriter {
+/**
+ * A {@link MergeTreeCompactRewriter} which produces changelog files by lookup 
for the compaction
+ * involving level 0 files.
+ */
+public class LookupMergeTreeCompactRewriter extends ChangelogMergeTreeRewriter 
{
 
-    private final int maxLevel;
+    private final LookupLevels lookupLevels;
 
-    public FullChangelogMergeTreeCompactRewriter(
-            int maxLevel,
+    public LookupMergeTreeCompactRewriter(
+            LookupLevels lookupLevels,
             KeyValueFileReaderFactory readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
             MergeFunctionFactory<KeyValue> mfFactory) {
         super(readerFactory, writerFactory, keyComparator, mfFactory);
-        this.maxLevel = maxLevel;
+        this.lookupLevels = lookupLevels;
     }
 
     @Override
     protected boolean rewriteChangelog(
             int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections) {
-        boolean changelog = outputLevel == maxLevel;
-        if (changelog) {
-            Preconditions.checkArgument(
-                    dropDelete,
-                    "Delete records should be dropped from result of full 
compaction. This is unexpected.");
+        if (outputLevel == 0) {
+            return false;
+        }
+
+        for (List<SortedRun> runs : sections) {
+            for (SortedRun run : runs) {
+                for (DataFileMeta file : run.files()) {
+                    if (file.level() == 0) {
+                        return true;
+                    }
+                }
+            }
         }
-        return changelog;
+        return false;
     }
 
     @Override
     protected boolean upgradeChangelog(int outputLevel, DataFileMeta file) {
-        return outputLevel == maxLevel;
+        return file.level() == 0;
     }
 
     @Override
     protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int 
outputLevel) {
-        return new FullChangelogMergeFunctionWrapper(mfFactory.create(), 
maxLevel);
+        return new LookupChangelogMergeFunctionWrapper(
+                mfFactory,
+                key -> {
+                    try {
+                        return lookupLevels.lookup(key, outputLevel + 1);
+                    } catch (IOException e) {
+                        throw new UncheckedIOException(e);
+                    }
+                });
+    }
+
+    @Override
+    public void close() throws IOException {
+        lookupLevels.close();
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
index 99a09be0..58c02c4c 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.store.utils.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
@@ -185,4 +186,9 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
                 });
         return result;
     }
+
+    @Override
+    public void close() throws IOException {
+        rewriter.close();
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
index 933c7a4b..91e380ee 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
@@ -162,6 +162,7 @@ public class UniversalCompaction implements CompactStrategy 
{
         if (runCount == runs.size()) {
             outputLevel = maxLevel;
         } else {
+            // level of next run - 1
             outputLevel = Math.max(0, runs.get(runCount).level() - 1);
         }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 12a160b1..23cbbe6d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -19,8 +19,10 @@
 package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
 import org.apache.flink.table.store.data.BinaryRow;
 import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.data.serializer.InternalRowSerializer;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.compact.CompactManager;
 import org.apache.flink.table.store.file.compact.NoopCompactManager;
@@ -28,10 +30,13 @@ import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.io.KeyValueFileReaderFactory;
 import org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
 import org.apache.flink.table.store.file.mergetree.Levels;
+import org.apache.flink.table.store.file.mergetree.LookupLevels;
 import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
 import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
 import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
 import 
org.apache.flink.table.store.file.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
+import org.apache.flink.table.store.file.mergetree.compact.LookupCompaction;
+import 
org.apache.flink.table.store.file.mergetree.compact.LookupMergeTreeCompactRewriter;
 import 
org.apache.flink.table.store.file.mergetree.compact.MergeFunctionFactory;
 import 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManager;
 import 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactRewriter;
@@ -43,6 +48,7 @@ import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.format.FileFormatDiscover;
 import org.apache.flink.table.store.fs.FileIO;
+import org.apache.flink.table.store.lookup.hash.HashLookupStoreFactory;
 import org.apache.flink.table.store.types.RowType;
 
 import org.slf4j.Logger;
@@ -67,6 +73,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
     private final MergeFunctionFactory<KeyValue> mfFactory;
     private final CoreOptions options;
     private final FileIO fileIO;
+    private final RowType keyType;
+    private final RowType valueType;
 
     public KeyValueFileStoreWrite(
             FileIO fileIO,
@@ -84,6 +92,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             KeyValueFieldsExtractor extractor) {
         super(commitUser, snapshotManager, scan, options);
         this.fileIO = fileIO;
+        this.keyType = keyType;
+        this.valueType = valueType;
         this.readerFactoryBuilder =
                 KeyValueFileReaderFactory.builder(
                         fileIO,
@@ -147,17 +157,18 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 writerFactoryBuilder.build(partition, bucket, 
options.fileCompressionPerLevel());
         Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
         Levels levels = new Levels(keyComparator, restoreFiles, 
options.numLevels());
+        UniversalCompaction universalCompaction =
+                new UniversalCompaction(
+                        options.maxSizeAmplificationPercent(),
+                        options.sortedRunSizeRatio(),
+                        options.numSortedRunCompactionTrigger(),
+                        options.maxSortedRunNum());
+        CompactStrategy compactStrategy =
+                options.changelogProducer() == ChangelogProducer.LOOKUP
+                        ? new LookupCompaction(universalCompaction)
+                        : universalCompaction;
         CompactManager compactManager =
-                createCompactManager(
-                        partition,
-                        bucket,
-                        new UniversalCompaction(
-                                options.maxSizeAmplificationPercent(),
-                                options.sortedRunSizeRatio(),
-                                options.numSortedRunCompactionTrigger(),
-                                options.maxSortedRunNum()),
-                        compactExecutor,
-                        levels);
+                createCompactManager(partition, bucket, compactStrategy, 
compactExecutor, levels);
         return new MergeTreeWriter(
                 bufferSpillable(),
                 options.localSortMaxNumFileHandles(),
@@ -185,7 +196,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             return new NoopCompactManager();
         } else {
             Comparator<InternalRow> keyComparator = 
keyComparatorSupplier.get();
-            CompactRewriter rewriter = createRewriter(partition, bucket, 
keyComparator);
+            CompactRewriter rewriter = createRewriter(partition, bucket, 
keyComparator, levels);
             return new MergeTreeCompactManager(
                     compactExecutor,
                     levels,
@@ -198,21 +209,47 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
     }
 
     private MergeTreeCompactRewriter createRewriter(
-            BinaryRow partition, int bucket, Comparator<InternalRow> 
keyComparator) {
+            BinaryRow partition, int bucket, Comparator<InternalRow> 
keyComparator, Levels levels) {
         KeyValueFileReaderFactory readerFactory = 
readerFactoryBuilder.build(partition, bucket);
         KeyValueFileWriterFactory writerFactory =
                 writerFactoryBuilder.build(partition, bucket, 
options.fileCompressionPerLevel());
+        switch (options.changelogProducer()) {
+            case FULL_COMPACTION:
+                return new FullChangelogMergeTreeCompactRewriter(
+                        options.numLevels() - 1,
+                        readerFactory,
+                        writerFactory,
+                        keyComparator,
+                        mfFactory);
+            case LOOKUP:
+                LookupLevels lookupLevels = createLookupLevels(levels, 
readerFactory);
+                return new LookupMergeTreeCompactRewriter(
+                        lookupLevels, readerFactory, writerFactory, 
keyComparator, mfFactory);
+            default:
+                return new MergeTreeCompactRewriter(
+                        readerFactory, writerFactory, keyComparator, 
mfFactory);
+        }
+    }
 
-        if (options.changelogProducer() == 
CoreOptions.ChangelogProducer.FULL_COMPACTION) {
-            return new FullChangelogMergeTreeCompactRewriter(
-                    options.numLevels() - 1,
-                    readerFactory,
-                    writerFactory,
-                    keyComparator,
-                    mfFactory);
-        } else {
-            return new MergeTreeCompactRewriter(
-                    readerFactory, writerFactory, keyComparator, mfFactory);
+    private LookupLevels createLookupLevels(
+            Levels levels, KeyValueFileReaderFactory readerFactory) {
+        if (ioManager == null) {
+            throw new RuntimeException(
+                    "Can not use lookup, there is no temp disk directory to 
use.");
         }
+        return new LookupLevels(
+                levels,
+                keyComparatorSupplier.get(),
+                new InternalRowSerializer(keyType),
+                new InternalRowSerializer(valueType),
+                file ->
+                        readerFactory.createRecordReader(
+                                file.schemaId(), file.fileName(), 
file.level()),
+                () -> ioManager.createChannel().getPathFile(),
+                new HashLookupStoreFactory(
+                        cacheManager,
+                        
options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR)),
+                
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
+                
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
index 65788b4f..1032032c 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
@@ -24,12 +24,15 @@ import org.apache.flink.table.store.file.memory.MemoryOwner;
 import org.apache.flink.table.store.file.memory.MemoryPoolFactory;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.io.cache.CacheManager;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
 
 import java.util.Iterator;
 import java.util.Map;
 
+import static 
org.apache.flink.table.store.CoreOptions.LOOKUP_CACHE_MAX_MEMORY_SIZE;
+
 /**
  * Base {@link FileStoreWrite} implementation which supports using shared 
memory and preempting
  * memory from other writers.
@@ -37,7 +40,9 @@ import java.util.Map;
  * @param <T> type of record to write.
  */
 public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T> {
-    private final MemoryPoolFactory memoryPoolFactory;
+
+    private final MemoryPoolFactory writeBufferPool;
+    protected final CacheManager cacheManager;
 
     public MemoryFileStoreWrite(
             String commitUser,
@@ -47,7 +52,11 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
         super(commitUser, snapshotManager, scan);
         HeapMemorySegmentPool memoryPool =
                 new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize());
-        this.memoryPoolFactory = new MemoryPoolFactory(memoryPool, 
this::memoryOwners);
+        this.writeBufferPool = new MemoryPoolFactory(memoryPool, 
this::memoryOwners);
+        this.cacheManager =
+                new CacheManager(
+                        options.pageSize(),
+                        
options.toConfiguration().get(LOOKUP_CACHE_MAX_MEMORY_SIZE));
     }
 
     private Iterator<MemoryOwner> memoryOwners() {
@@ -79,6 +88,6 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
                             + " but this is: "
                             + writer.getClass());
         }
-        memoryPoolFactory.notifyNewOwner((MemoryOwner) writer);
+        writeBufferPool.notifyNewOwner((MemoryOwner) writer);
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaValidation.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaValidation.java
index 4e6481d8..c4dc7878 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaValidation.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaValidation.java
@@ -85,12 +85,19 @@ public class SchemaValidation {
                         + " should not be larger than "
                         + SNAPSHOT_NUM_RETAINED_MAX.key());
 
-        // Only changelog tables with primary keys support full compaction
-        if (options.changelogProducer() == 
CoreOptions.ChangelogProducer.FULL_COMPACTION
-                && options.writeMode() == WriteMode.CHANGE_LOG
-                && schema.primaryKeys().isEmpty()) {
-            throw new UnsupportedOperationException(
-                    "Changelog table with full compaction must have primary 
keys");
+        // Only changelog tables with primary keys support full compaction or 
lookup changelog
+        // producer
+        if (options.writeMode() == WriteMode.CHANGE_LOG) {
+            switch (options.changelogProducer()) {
+                case FULL_COMPACTION:
+                case LOOKUP:
+                    if (schema.primaryKeys().isEmpty()) {
+                        throw new UnsupportedOperationException(
+                                "Changelog table with full compaction must 
have primary keys");
+                    }
+                    break;
+                default:
+            }
         }
 
         // Check column names in schema
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index db23df03..552f2f77 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -19,11 +19,13 @@
 package org.apache.flink.table.store.table;
 
 import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
 import org.apache.flink.table.store.data.InternalRow;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueFileStore;
 import org.apache.flink.table.store.file.WriteMode;
 import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.LookupMergeFunction;
 import 
org.apache.flink.table.store.file.mergetree.compact.MergeFunctionFactory;
 import 
org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
 import 
org.apache.flink.table.store.file.mergetree.compact.aggregate.AggregateMergeFunction;
@@ -77,7 +79,8 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
         if (lazyStore == null) {
             RowType rowType = tableSchema.logicalRowType();
             Options conf = Options.fromMap(tableSchema.options());
-            CoreOptions.MergeEngine mergeEngine = 
conf.get(CoreOptions.MERGE_ENGINE);
+            CoreOptions options = new CoreOptions(conf);
+            CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
             MergeFunctionFactory<KeyValue> mfFactory;
             switch (mergeEngine) {
                 case DEDUPLICATE:
@@ -102,7 +105,10 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                             "Unsupported merge engine: " + mergeEngine);
             }
 
-            CoreOptions options = new CoreOptions(conf);
+            if (options.changelogProducer() == ChangelogProducer.LOOKUP) {
+                mfFactory = LookupMergeFunction.wrap(mfFactory);
+            }
+
             KeyValueFieldsExtractor extractor = 
ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
             lazyStore =
                     new KeyValueFileStore(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
index b92ab4ec..46922c4b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
@@ -29,8 +29,6 @@ import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.util.HashMap;
 
-import static 
org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
-
 /** {@link DataTableScan} for streaming planning. */
 public interface StreamDataTableScan extends DataTableScan, 
InnerStreamTableScan {
 
@@ -52,13 +50,16 @@ public interface StreamDataTableScan extends DataTableScan, 
InnerStreamTableScan
                         put(CoreOptions.MergeEngine.AGGREGATE, 
"Pre-aggregate");
                     }
                 };
-        if (schema.primaryKeys().size() > 0
-                && mergeEngineDesc.containsKey(mergeEngine)
-                && options.changelogProducer() != FULL_COMPACTION) {
-            throw new RuntimeException(
-                    mergeEngineDesc.get(mergeEngine)
-                            + " continuous reading is not supported. "
-                            + "You can use full compaction changelog producer 
to support streaming reading.");
+        if (schema.primaryKeys().size() > 0 && 
mergeEngineDesc.containsKey(mergeEngine)) {
+            switch (options.changelogProducer()) {
+                case NONE:
+                case INPUT:
+                    throw new RuntimeException(
+                            mergeEngineDesc.get(mergeEngine)
+                                    + " continuous reading is not supported. 
You can use "
+                                    + "'lookup' or 'full-compaction' changelog 
producer to support streaming reading.");
+                default:
+            }
         }
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
index de8d2f0b..c0e61caa 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
@@ -149,18 +149,26 @@ public class StreamDataTableScanImpl extends 
AbstractDataTableScan implements St
     private FollowUpScanner createFollowUpScanner() {
         CoreOptions.ChangelogProducer changelogProducer = 
options.changelogProducer();
         FollowUpScanner followUpScanner;
-        if (changelogProducer == CoreOptions.ChangelogProducer.NONE) {
-            followUpScanner = new DeltaFollowUpScanner();
-        } else if (changelogProducer == CoreOptions.ChangelogProducer.INPUT) {
-            followUpScanner = new InputChangelogFollowUpScanner();
-        } else if (changelogProducer == 
CoreOptions.ChangelogProducer.FULL_COMPACTION) {
-            // this change in data split reader will affect both starting 
scanner and follow-up
-            // scanner
-            snapshotSplitReader.withLevelFilter(level -> level == 
options.numLevels() - 1);
-            followUpScanner = new CompactionChangelogFollowUpScanner();
-        } else {
-            throw new UnsupportedOperationException(
-                    "Unknown changelog producer " + changelogProducer.name());
+        switch (changelogProducer) {
+            case NONE:
+                followUpScanner = new DeltaFollowUpScanner();
+                break;
+            case INPUT:
+                followUpScanner = new InputChangelogFollowUpScanner();
+                break;
+            case FULL_COMPACTION:
+                // this change in data split reader will affect both starting 
scanner and follow-up
+                snapshotSplitReader.withLevelFilter(level -> level == 
options.numLevels() - 1);
+                followUpScanner = new CompactionChangelogFollowUpScanner();
+                break;
+            case LOOKUP:
+                // this change in data split reader will affect both starting 
scanner and follow-up
+                snapshotSplitReader.withLevelFilter(level -> level > 0);
+                followUpScanner = new CompactionChangelogFollowUpScanner();
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unknown changelog producer " + 
changelogProducer.name());
         }
 
         Long boundedWatermark = options.scanBoundedWatermark();
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LookupLevelsTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LookupLevelsTest.java
new file mode 100644
index 00000000..d68d9e05
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LookupLevelsTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.store.data.BinaryRow;
+import org.apache.flink.table.store.data.GenericRow;
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.data.serializer.InternalRowSerializer;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.format.FlushingFileFormat;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.io.KeyValueFileReaderFactory;
+import org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
+import org.apache.flink.table.store.file.io.RollingFileWriter;
+import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.fs.FileIOFinder;
+import org.apache.flink.table.store.fs.Path;
+import org.apache.flink.table.store.io.cache.CacheManager;
+import org.apache.flink.table.store.lookup.hash.HashLookupStoreFactory;
+import org.apache.flink.table.store.options.MemorySize;
+import org.apache.flink.table.store.table.SchemaEvolutionTableTestBase;
+import org.apache.flink.table.store.types.DataField;
+import org.apache.flink.table.store.types.DataTypes;
+import org.apache.flink.table.store.types.RowKind;
+import org.apache.flink.table.store.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.table.store.CoreOptions.TARGET_FILE_SIZE;
+import static org.apache.flink.table.store.file.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test {@link LookupLevels}. */
+public class LookupLevelsTest {
+
+    private static final String LOOKUP_FILE_PREFIX = "lookup-";
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private final Comparator<InternalRow> comparator = 
Comparator.comparingInt(o -> o.getInt(0));
+
+    private final RowType keyType = DataTypes.ROW(DataTypes.FIELD(0, "_key", 
DataTypes.INT()));
+    private final RowType rowType =
+            DataTypes.ROW(
+                    DataTypes.FIELD(0, "key", DataTypes.INT()),
+                    DataTypes.FIELD(1, "value", DataTypes.INT()));
+
+    @Test
+    public void testMultiLevels() throws IOException {
+        Levels levels =
+                new Levels(
+                        comparator,
+                        Arrays.asList(
+                                newFile(1, kv(1, 11), kv(3, 33), kv(5, 5)),
+                                newFile(2, kv(2, 22), kv(5, 55))),
+                        3);
+        LookupLevels lookupLevels = createLookupLevels(levels, 
MemorySize.ofMebiBytes(10));
+
+        // only in level 1
+        KeyValue kv = lookupLevels.lookup(row(1), 1);
+        assertThat(kv).isNotNull();
+        assertThat(kv.level()).isEqualTo(1);
+        assertThat(kv.value().getInt(1)).isEqualTo(11);
+
+        // only in level 2
+        kv = lookupLevels.lookup(row(2), 1);
+        assertThat(kv).isNotNull();
+        assertThat(kv.level()).isEqualTo(2);
+        assertThat(kv.value().getInt(1)).isEqualTo(22);
+
+        // both in level 1 and level 2
+        kv = lookupLevels.lookup(row(5), 1);
+        assertThat(kv).isNotNull();
+        assertThat(kv.level()).isEqualTo(1);
+        assertThat(kv.value().getInt(1)).isEqualTo(5);
+
+        // no exists
+        kv = lookupLevels.lookup(row(4), 1);
+        assertThat(kv).isNull();
+
+        lookupLevels.close();
+        assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testMultiFiles() throws IOException {
+        Levels levels =
+                new Levels(
+                        comparator,
+                        Arrays.asList(
+                                newFile(1, kv(1, 11), kv(2, 22)),
+                                newFile(1, kv(4, 44), kv(5, 55)),
+                                newFile(1, kv(7, 77), kv(8, 88)),
+                                newFile(1, kv(10, 1010), kv(11, 1111))),
+                        1);
+        LookupLevels lookupLevels = createLookupLevels(levels, 
MemorySize.ofMebiBytes(10));
+
+        Map<Integer, Integer> contains =
+                new HashMap<Integer, Integer>() {
+                    {
+                        this.put(1, 11);
+                        this.put(2, 22);
+                        this.put(4, 44);
+                        this.put(5, 55);
+                        this.put(7, 77);
+                        this.put(8, 88);
+                        this.put(10, 1010);
+                        this.put(11, 1111);
+                    }
+                };
+        for (Map.Entry<Integer, Integer> entry : contains.entrySet()) {
+            KeyValue kv = lookupLevels.lookup(row(entry.getKey()), 1);
+            assertThat(kv).isNotNull();
+            assertThat(kv.level()).isEqualTo(1);
+            assertThat(kv.value().getInt(1)).isEqualTo(entry.getValue());
+        }
+
+        int[] notContains = new int[] {0, 3, 6, 9, 12};
+        for (int key : notContains) {
+            KeyValue kv = lookupLevels.lookup(row(key), 1);
+            assertThat(kv).isNull();
+        }
+
+        lookupLevels.close();
+        assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testMaxDiskSize() throws IOException {
+        List<DataFileMeta> files = new ArrayList<>();
+        int fileNum = 10;
+        int recordInFile = 100;
+        for (int i = 0; i < fileNum; i++) {
+            List<KeyValue> kvs = new ArrayList<>();
+            for (int j = 0; j < recordInFile; j++) {
+                int key = i * recordInFile + j;
+                kvs.add(kv(key, key));
+            }
+            files.add(newFile(1, kvs.toArray(new KeyValue[0])));
+        }
+        Levels levels = new Levels(comparator, files, 1);
+        LookupLevels lookupLevels = createLookupLevels(levels, 
MemorySize.ofKibiBytes(20));
+
+        for (int i = 0; i < fileNum * recordInFile; i++) {
+            KeyValue kv = lookupLevels.lookup(row(i), 1);
+            assertThat(kv).isNotNull();
+            assertThat(kv.level()).isEqualTo(1);
+            assertThat(kv.value().getInt(1)).isEqualTo(i);
+        }
+
+        // some files are invalided
+        long fileNumber = lookupLevels.lookupFiles().size();
+        String[] lookupFiles =
+                tempDir.toFile().list((dir, name) -> 
name.startsWith(LOOKUP_FILE_PREFIX));
+        assertThat(lookupFiles).isNotNull();
+        
assertThat(fileNumber).isNotEqualTo(fileNum).isEqualTo(lookupFiles.length);
+
+        lookupLevels.close();
+        assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+    }
+
+    private LookupLevels createLookupLevels(Levels levels, MemorySize 
maxDiskSize) {
+        return new LookupLevels(
+                levels,
+                comparator,
+                new InternalRowSerializer(keyType),
+                new InternalRowSerializer(rowType),
+                file -> createReaderFactory().createRecordReader(0, 
file.fileName(), file.level()),
+                () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
+                new HashLookupStoreFactory(new CacheManager(2048, 
MemorySize.ofMebiBytes(1)), 0.75),
+                Duration.ofHours(1),
+                maxDiskSize);
+    }
+
+    private KeyValue kv(int key, int value) {
+        return new KeyValue()
+                .replace(GenericRow.of(key), RowKind.INSERT, 
GenericRow.of(key, value));
+    }
+
+    private DataFileMeta newFile(int level, KeyValue... records) throws 
IOException {
+        RollingFileWriter<KeyValue, DataFileMeta> writer =
+                createWriterFactory().createRollingMergeTreeFileWriter(level);
+        for (KeyValue kv : records) {
+            writer.write(kv);
+        }
+        writer.close();
+        return writer.result().get(0);
+    }
+
+    private KeyValueFileWriterFactory createWriterFactory() {
+        Path path = new Path(tempDir.toUri().toString());
+        return KeyValueFileWriterFactory.builder(
+                        FileIOFinder.find(path),
+                        0,
+                        keyType,
+                        rowType,
+                        new FlushingFileFormat("avro"),
+                        new FileStorePathFactory(path),
+                        TARGET_FILE_SIZE.defaultValue().getBytes())
+                .build(BinaryRow.EMPTY_ROW, 0, null);
+    }
+
+    private KeyValueFileReaderFactory createReaderFactory() {
+        Path path = new Path(tempDir.toUri().toString());
+        KeyValueFileReaderFactory.Builder builder =
+                KeyValueFileReaderFactory.builder(
+                        FileIOFinder.find(path),
+                        createSchemaManager(path),
+                        0,
+                        keyType,
+                        rowType,
+                        ignore -> new FlushingFileFormat("avro"),
+                        new FileStorePathFactory(path),
+                        new KeyValueFieldsExtractor() {
+                            @Override
+                            public List<DataField> keyFields(TableSchema 
schema) {
+                                return keyType.getFields();
+                            }
+
+                            @Override
+                            public List<DataField> valueFields(TableSchema 
schema) {
+                                return schema.fields();
+                            }
+                        });
+        return builder.build(BinaryRow.EMPTY_ROW, 0);
+    }
+
+    private SchemaManager createSchemaManager(Path path) {
+        TableSchema tableSchema =
+                new TableSchema(
+                        0,
+                        rowType.getFields(),
+                        rowType.getFieldCount(),
+                        Collections.emptyList(),
+                        Collections.singletonList("key"),
+                        Collections.emptyMap(),
+                        "");
+        Map<Long, TableSchema> schemas = new HashMap<>();
+        schemas.put(tableSchema.id(), tableSchema);
+        return new SchemaEvolutionTableTestBase.TestingSchemaManager(path, 
schemas);
+    }
+}
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
index 722bebff..e07bef95 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
@@ -110,6 +110,17 @@ public class FlinkConnectorOptions {
                                     + 
CoreOptions.ChangelogProducer.FULL_COMPACTION.name()
                                     + ", full compaction will be constantly 
triggered after this interval.");
 
+    public static final ConfigOption<Boolean> CHANGELOG_PRODUCER_LOOKUP_WAIT =
+            key("changelog-producer.lookup-wait")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "When "
+                                    + CoreOptions.CHANGELOG_PRODUCER.key()
+                                    + " is set to "
+                                    + 
CoreOptions.ChangelogProducer.LOOKUP.name()
+                                    + ", commit will wait for changelog 
generation by lookup.");
+
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = FlinkConnectorOptions.class.getFields();
         final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
index c842183f..81e13809 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
@@ -31,8 +31,8 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.CoreOptions;
 import 
org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUtils;
+import org.apache.flink.table.store.options.Options;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.utils.Preconditions;
 import org.apache.flink.util.function.SerializableFunction;
@@ -41,6 +41,7 @@ import java.io.Serializable;
 import java.util.UUID;
 
 import static 
org.apache.flink.table.store.connector.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
+import static 
org.apache.flink.table.store.connector.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
 
 /** Abstract sink of table store. */
 public abstract class FlinkSink implements Serializable {
@@ -59,26 +60,34 @@ public abstract class FlinkSink implements Serializable {
     }
 
     protected StoreSinkWrite.Provider createWriteProvider(String 
initialCommitUser) {
-        if (table.options().changelogProducer() == 
CoreOptions.ChangelogProducer.FULL_COMPACTION
-                && !table.options().writeOnly()) {
-            long fullCompactionThresholdMs =
-                    table.options()
-                            .toConfiguration()
-                            
.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)
-                            .toMillis();
-            return (table, context, ioManager) ->
-                    new FullChangelogStoreSinkWrite(
-                            table,
-                            context,
-                            initialCommitUser,
-                            ioManager,
-                            isOverwrite,
-                            fullCompactionThresholdMs);
-        } else {
-            return (table, context, ioManager) ->
-                    new StoreSinkWriteImpl(
-                            table, context, initialCommitUser, ioManager, 
isOverwrite);
+        if (!table.options().writeOnly()) {
+            Options options = table.options().toConfiguration();
+            switch (table.options().changelogProducer()) {
+                case FULL_COMPACTION:
+                    long fullCompactionThresholdMs =
+                            
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)
+                                    .toMillis();
+                    return (table, context, ioManager) ->
+                            new FullChangelogStoreSinkWrite(
+                                    table,
+                                    context,
+                                    initialCommitUser,
+                                    ioManager,
+                                    isOverwrite,
+                                    fullCompactionThresholdMs);
+                case LOOKUP:
+                    if (options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT)) {
+                        return (table, context, ioManager) ->
+                                new LookupChangelogStoreSinkWrite(
+                                        table, context, initialCommitUser, 
ioManager, isOverwrite);
+                    }
+                    break;
+                default:
+            }
         }
+
+        return (table, context, ioManager) ->
+                new StoreSinkWriteImpl(table, context, initialCommitUser, 
ioManager, isOverwrite);
     }
 
     public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/LookupChangelogStoreSinkWrite.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/LookupChangelogStoreSinkWrite.java
new file mode 100644
index 00000000..4c64d1d7
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/LookupChangelogStoreSinkWrite.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.table.FileStoreTable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link StoreSinkWrite} for {@link CoreOptions.ChangelogProducer#LOOKUP} 
changelog producer. This
+ * writer will wait compaction in {@link #prepareCommit}.
+ */
+public class LookupChangelogStoreSinkWrite extends StoreSinkWriteImpl {
+
+    public LookupChangelogStoreSinkWrite(
+            FileStoreTable table,
+            StateInitializationContext context,
+            String initialCommitUser,
+            IOManager ioManager,
+            boolean isOverwrite)
+            throws Exception {
+        super(table, context, initialCommitUser, ioManager, isOverwrite);
+    }
+
+    @Override
+    public List<Committable> prepareCommit(boolean doCompaction, long 
checkpointId)
+            throws IOException {
+        return super.prepareCommit(true, checkpointId);
+    }
+}
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
index 6b03d548..fe52936b 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
 import org.apache.flink.table.store.connector.util.AbstractTestBase;
 import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.fs.Path;
 import org.apache.flink.table.store.fs.local.LocalFileIO;
@@ -125,6 +126,10 @@ public abstract class CatalogITCaseBase extends 
AbstractTestBase {
         return sEnv.executeSql(String.format(query, args)).collect();
     }
 
+    protected BlockingIterator<Row, Row> streamSqlBlockIter(String query, 
Object... args) {
+        return BlockingIterator.of(sEnv.executeSql(String.format(query, 
args)).collect());
+    }
+
     protected CatalogTable table(String tableName) throws 
TableNotExistException {
         Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
         CatalogBaseTable table =
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
index e57e6693..efedcf28 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
@@ -43,6 +43,7 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +51,7 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -95,79 +97,10 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
 
     @Test
     public void testFullCompactionTriggerInterval() throws Exception {
-        TableEnvironment sEnv =
-                
createStreamingTableEnvironment(ThreadLocalRandom.current().nextInt(900) + 100);
-        sEnv.getConfig()
-                .getConfiguration()
-                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
-
-        sEnv.executeSql(
-                String.format(
-                        "CREATE CATALOG testCatalog WITH 
('type'='table-store', 'warehouse'='%s/warehouse')",
-                        path));
-        sEnv.executeSql("USE CATALOG testCatalog");
-        sEnv.executeSql(
-                "CREATE TABLE T ( k INT, v STRING, PRIMARY KEY (k) NOT 
ENFORCED ) "
-                        + "WITH ( "
-                        + "'bucket' = '2', "
-                        + "'changelog-producer' = 'full-compaction', "
-                        + "'changelog-producer.compaction-interval' = '1s' )");
-
-        Path inputPath = new Path(path, "input");
-        sEnv.executeSql(
-                "CREATE TABLE `default_catalog`.`default_database`.`S` ( i 
INT, g STRING ) "
-                        + "WITH ( 'connector' = 'filesystem', 'format' = 
'testcsv', 'path' = '"
-                        + inputPath
-                        + "', 'source.monitor-interval' = '500ms' )");
-
-        sEnv.executeSql(
-                "INSERT INTO T SELECT SUM(i) AS k, g AS v FROM 
`default_catalog`.`default_database`.`S` GROUP BY g");
-        CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM 
T").collect();
-
-        // write initial data
-        sEnv.executeSql(
-                        "INSERT INTO `default_catalog`.`default_database`.`S` "
-                                + "VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 
'D')")
-                .await();
-
-        // read initial data
-        List<String> actual = new ArrayList<>();
-        for (int i = 0; i < 4; i++) {
-            actual.add(it.next().toString());
-        }
-        actual.sort(String::compareTo);
-
-        List<String> expected =
-                new ArrayList<>(Arrays.asList("+I[1, A]", "+I[2, B]", "+I[3, 
C]", "+I[4, D]"));
-        expected.sort(String::compareTo);
-        assertEquals(expected, actual);
-
-        // write update data
-        sEnv.executeSql(
-                        "INSERT INTO `default_catalog`.`default_database`.`S` "
-                                + "VALUES (1, 'A'), (1, 'B'), (1, 'C'), (1, 
'D')")
-                .await();
-
-        // read update data
-        actual.clear();
-        for (int i = 0; i < 8; i++) {
-            actual.add(it.next().toString());
-        }
-        actual.sort(String::compareTo);
-
-        expected =
-                new ArrayList<>(
-                        Arrays.asList(
-                                "-D[1, A]",
-                                "-U[2, B]",
-                                "+U[2, A]",
-                                "-U[3, C]",
-                                "+U[3, B]",
-                                "-U[4, D]",
-                                "+U[4, C]",
-                                "+I[5, D]"));
-        expected.sort(String::compareTo);
-        assertEquals(expected, actual);
+        innerTestChangelogProducing(
+                Arrays.asList(
+                        "'changelog-producer' = 'full-compaction'",
+                        "'changelog-producer.compaction-interval' = '1s'"));
     }
 
     @Test
@@ -233,6 +166,80 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
         it.close();
     }
 
+    @Test
+    public void testLookupChangelog() throws Exception {
+        
innerTestChangelogProducing(Collections.singletonList("'changelog-producer' = 
'lookup'"));
+    }
+
+    private void innerTestChangelogProducing(List<String> options) throws 
Exception {
+        TableEnvironment sEnv =
+                
createStreamingTableEnvironment(ThreadLocalRandom.current().nextInt(900) + 100);
+        sEnv.getConfig()
+                .getConfiguration()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+        sEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG testCatalog WITH 
('type'='table-store', 'warehouse'='%s/warehouse')",
+                        path));
+        sEnv.executeSql("USE CATALOG testCatalog");
+        sEnv.executeSql(
+                "CREATE TABLE T ( k INT, v STRING, PRIMARY KEY (k) NOT 
ENFORCED ) "
+                        + "WITH ( "
+                        + "'bucket' = '2', "
+                        + String.join(",", options)
+                        + ")");
+
+        Path inputPath = new Path(path, "input");
+        sEnv.executeSql(
+                "CREATE TABLE `default_catalog`.`default_database`.`S` ( i 
INT, g STRING ) "
+                        + "WITH ( 'connector' = 'filesystem', 'format' = 
'testcsv', 'path' = '"
+                        + inputPath
+                        + "', 'source.monitor-interval' = '500ms' )");
+
+        sEnv.executeSql(
+                "INSERT INTO T SELECT SUM(i) AS k, g AS v FROM 
`default_catalog`.`default_database`.`S` GROUP BY g");
+        CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM 
T").collect();
+
+        // write initial data
+        sEnv.executeSql(
+                        "INSERT INTO `default_catalog`.`default_database`.`S` "
+                                + "VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 
'D')")
+                .await();
+
+        // read initial data
+        List<String> actual = new ArrayList<>();
+        for (int i = 0; i < 4; i++) {
+            actual.add(it.next().toString());
+        }
+
+        assertThat(actual)
+                .containsExactlyInAnyOrder("+I[1, A]", "+I[2, B]", "+I[3, C]", 
"+I[4, D]");
+
+        // write update data
+        sEnv.executeSql(
+                        "INSERT INTO `default_catalog`.`default_database`.`S` "
+                                + "VALUES (1, 'A'), (1, 'B'), (1, 'C'), (1, 
'D')")
+                .await();
+
+        // read update data
+        actual.clear();
+        for (int i = 0; i < 8; i++) {
+            actual.add(it.next().toString());
+        }
+
+        assertThat(actual)
+                .containsExactlyInAnyOrder(
+                        "-D[1, A]",
+                        "-U[2, B]",
+                        "+U[2, A]",
+                        "-U[3, C]",
+                        "+U[3, B]",
+                        "-U[4, D]",
+                        "+U[4, C]",
+                        "+I[5, D]");
+    }
+
     // ------------------------------------------------------------------------
     //  Random Tests
     // ------------------------------------------------------------------------
@@ -275,6 +282,29 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
         testStandAloneFullCompactJobRandom(sEnv, random.nextInt(1, 3), 
random.nextBoolean());
     }
 
+    @Test
+    @Timeout(600000)
+    public void testLookupChangelogProducerBatchRandom() throws Exception {
+        TableEnvironment bEnv = createBatchTableEnvironment();
+        testLookupChangelogProducerRandom(bEnv, 1, false);
+    }
+
+    @Test
+    @Timeout(600000)
+    public void testLookupChangelogProducerStreamingRandom() throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        TableEnvironment sEnv = 
createStreamingTableEnvironment(random.nextInt(900) + 100);
+        testLookupChangelogProducerRandom(sEnv, random.nextInt(1, 3), 
random.nextBoolean());
+    }
+
+    @Test
+    @Timeout(600000)
+    public void testStandAloneLookupJobRandom() throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        TableEnvironment sEnv = 
createStreamingTableEnvironment(random.nextInt(900) + 100);
+        testStandAloneLookupJobRandom(sEnv, random.nextInt(1, 3), 
random.nextBoolean());
+    }
+
     private static final int NUM_PARTS = 4;
     private static final int NUM_KEYS = 64;
     private static final int NUM_VALUES = 1024;
@@ -308,7 +338,28 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
         // if we can first read complete records then read incremental records 
correctly
         Thread.sleep(random.nextInt(5000));
 
-        checkFullCompactionTestResult(numProducers);
+        checkChangelogTestResult(numProducers);
+    }
+
+    private void testLookupChangelogProducerRandom(
+            TableEnvironment tEnv, int numProducers, boolean enableFailure) 
throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        testRandom(
+                tEnv,
+                numProducers,
+                enableFailure,
+                "'bucket' = '4',"
+                        + String.format(
+                                "'write-buffer-size' = '%s',",
+                                random.nextBoolean() ? "512kb" : "1mb")
+                        + "'changelog-producer' = 'lookup'");
+
+        // sleep for a random amount of time to check
+        // if we can first read complete records then read incremental records 
correctly
+        Thread.sleep(random.nextInt(5000));
+
+        checkChangelogTestResult(numProducers);
     }
 
     private void testStandAloneFullCompactJobRandom(
@@ -343,10 +394,44 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
         // if we can first read complete records then read incremental records 
correctly
         Thread.sleep(random.nextInt(2500));
 
-        checkFullCompactionTestResult(numProducers);
+        checkChangelogTestResult(numProducers);
+    }
+
+    private void testStandAloneLookupJobRandom(
+            TableEnvironment tEnv, int numProducers, boolean enableConflicts) 
throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        testRandom(
+                tEnv,
+                numProducers,
+                false,
+                "'bucket' = '4',"
+                        + String.format(
+                                "'write-buffer-size' = '%s',",
+                                random.nextBoolean() ? "512kb" : "1mb")
+                        + "'changelog-producer' = 'lookup',"
+                        + "'write-only' = 'true'");
+
+        // sleep for a random amount of time to check
+        // if dedicated compactor job can find first snapshot to compact 
correctly
+        Thread.sleep(random.nextInt(2500));
+
+        for (int i = enableConflicts ? 2 : 1; i > 0; i--) {
+            StreamExecutionEnvironment env =
+                    createStreamExecutionEnvironment(random.nextInt(1900) + 
100);
+            env.setParallelism(2);
+            FlinkActions.compact(path, "default", "T").build(env);
+            env.executeAsync();
+        }
+
+        // sleep for a random amount of time to check
+        // if we can first read complete records then read incremental records 
correctly
+        Thread.sleep(random.nextInt(2500));
+
+        checkChangelogTestResult(numProducers);
     }
 
-    private void checkFullCompactionTestResult(int numProducers) throws 
Exception {
+    private void checkChangelogTestResult(int numProducers) throws Exception {
         TableEnvironment sEnv = createStreamingTableEnvironment(100);
         sEnv.getConfig()
                 .getConfiguration()
@@ -436,7 +521,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
         List<TableResult> results = new ArrayList<>();
 
         if (enableFailure) {
-            FailingFileIO.reset(failingName, 100, 1000);
+            FailingFileIO.reset(failingName, 100, 10000);
         }
         for (int i = 0; i < numProducers; i++) {
             // for the last `NUM_PARTS * NUM_KEYS` records, we update every 
key to a specific value
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupChangelogWithAggITCase.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupChangelogWithAggITCase.java
new file mode 100644
index 00000000..a863e113
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupChangelogWithAggITCase.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test Lookup changelog producer with aggregation tables. */
+public class LookupChangelogWithAggITCase extends CatalogITCaseBase {
+
+    @Test
+    public void testMultipleCompaction() throws Exception {
+        sql(
+                "CREATE TABLE T (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ("
+                        + "'bucket'='3', "
+                        + "'changelog-producer'='lookup', "
+                        + "'merge-engine'='aggregation', "
+                        + "'fields.v.aggregate-function'='sum')");
+        BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * 
FROM T");
+
+        sql("INSERT INTO T VALUES (1, 1), (2, 2)");
+        assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1, 
1), Row.of(2, 2));
+
+        for (int i = 1; i < 5; i++) {
+            sql("INSERT INTO T VALUES (1, 1), (2, 2)");
+            assertThat(iterator.collect(4))
+                    .containsExactlyInAnyOrder(
+                            Row.ofKind(RowKind.UPDATE_BEFORE, 1, i),
+                            Row.ofKind(RowKind.UPDATE_BEFORE, 2, 2 * i),
+                            Row.ofKind(RowKind.UPDATE_AFTER, 1, i + 1),
+                            Row.ofKind(RowKind.UPDATE_AFTER, 2, 2 * (i + 1)));
+        }
+
+        iterator.close();
+    }
+}
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
index 76023c8f..4fd36130 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
@@ -466,7 +466,7 @@ public class LookupJoinITCase extends CatalogITCaseBase {
         assertThatThrownBy(() -> sEnv.executeSql(query))
                 .hasRootCauseMessage(
                         "Partial update continuous reading is not supported. "
-                                + "You can use full compaction changelog 
producer to support streaming reading.");
+                                + "You can use 'lookup' or 'full-compaction' 
changelog producer to support streaming reading.");
     }
 
     @Test

Reply via email to