This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 32ad5d876 [core] Introduce index metadata (#1344)
32ad5d876 is described below

commit 32ad5d87662324a5d17e6501195eef619872f803
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 12 15:53:49 2023 +0800

    [core] Introduce index metadata (#1344)
---
 .../java/org/apache/paimon/utils/IntFileUtils.java |  80 ++++++++++
 .../java/org/apache/paimon/utils/IntHashSet.java   |  30 +++-
 .../java/org/apache/paimon/utils/IntIterator.java  |  70 +++++++++
 .../org/apache/paimon/utils/IntHashSetTest.java    |   2 +-
 .../java/org/apache/paimon/AbstractFileStore.java  |  16 ++
 .../src/main/java/org/apache/paimon/FileStore.java |   3 +
 .../src/main/java/org/apache/paimon/Snapshot.java  |  19 +++
 .../org/apache/paimon/index/HashIndexFile.java     |  62 ++++++++
 .../org/apache/paimon/index/IndexFileHandler.java  | 126 +++++++++++++++
 .../org/apache/paimon/index/IndexFileMeta.java     | 105 +++++++++++++
 .../paimon/index/IndexFileMetaSerializer.java      |  51 ++++++
 .../java/org/apache/paimon/io/IndexIncrement.java  |  64 ++++++++
 .../apache/paimon/manifest/IndexManifestEntry.java | 174 +++++++++++++++++++++
 .../manifest/IndexManifestEntrySerializer.java     |  71 +++++++++
 .../apache/paimon/manifest/IndexManifestFile.java  | 105 +++++++++++++
 .../paimon/operation/FileStoreCommitImpl.java      |  75 ++++++++-
 .../paimon/table/sink/CommitMessageImpl.java       |  39 ++++-
 .../paimon/table/sink/CommitMessageSerializer.java |   8 +-
 .../apache/paimon/utils/FileStorePathFactory.java  |  38 +++++
 .../test/java/org/apache/paimon/TestFileStore.java |  27 +++-
 .../org/apache/paimon/index/HashIndexFileTest.java |  75 +++++++++
 .../paimon/index/IndexFileMetaSerializerTest.java  |  44 +++---
 .../manifest/IndexManifestEntrySerializerTest.java |  55 +++++++
 .../apache/paimon/operation/FileDeletionTest.java  |   1 +
 .../paimon/operation/FileStoreCommitTest.java      |  97 ++++++++++++
 .../table/sink/CommitMessageSerializerTest.java    |  17 +-
 .../apache/paimon/utils/SnapshotManagerTest.java   |   2 +
 27 files changed, 1411 insertions(+), 45 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/IntFileUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/IntFileUtils.java
new file mode 100644
index 000000000..e1758dd74
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IntFileUtils.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream;
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+/** File to store ints. */
+public class IntFileUtils {
+
+    public static IntIterator readInts(FileIO fileIO, Path path) throws 
IOException {
+        FastBufferedInputStream in = new 
FastBufferedInputStream(fileIO.newInputStream(path));
+        return new IntIterator() {
+
+            @Override
+            public int next() throws IOException {
+                return readInt(in);
+            }
+
+            @Override
+            public void close() throws IOException {
+                in.close();
+            }
+        };
+    }
+
+    public static void writeInts(FileIO fileIO, Path path, IntIterator input) 
throws IOException {
+        try (FastBufferedOutputStream out =
+                        new 
FastBufferedOutputStream(fileIO.newOutputStream(path, false));
+                IntIterator iterator = input) {
+            while (true) {
+                try {
+                    writeInt(out, iterator.next());
+                } catch (EOFException ignored) {
+                    break;
+                }
+            }
+        }
+    }
+
+    private static int readInt(FastBufferedInputStream in) throws IOException {
+        int ch1 = in.read();
+        int ch2 = in.read();
+        int ch3 = in.read();
+        int ch4 = in.read();
+        if ((ch1 | ch2 | ch3 | ch4) < 0) {
+            throw new EOFException();
+        }
+        return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4);
+    }
+
+    private static void writeInt(FastBufferedOutputStream out, int v) throws 
IOException {
+        out.write((v >>> 24) & 0xFF);
+        out.write((v >>> 16) & 0xFF);
+        out.write((v >>> 8) & 0xFF);
+        out.write(v & 0xFF);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/IntHashSet.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/IntHashSet.java
index f5924b42d..2d73bc6d6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/IntHashSet.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IntHashSet.java
@@ -18,9 +18,10 @@
 
 package org.apache.paimon.utils;
 
-import it.unimi.dsi.fastutil.ints.IntArrays;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 
+import java.io.EOFException;
+
 /** A hash set for ints. */
 public class IntHashSet {
 
@@ -38,9 +39,28 @@ public class IntHashSet {
         set.add(value);
     }
 
-    public int[] toSortedInts() {
-        int[] ints = set.toIntArray();
-        IntArrays.stableSort(ints);
-        return ints;
+    public int size() {
+        return set.size();
+    }
+
+    public IntIterator toIntIterator() {
+        it.unimi.dsi.fastutil.ints.IntIterator iterator = set.intIterator();
+        return new IntIterator() {
+
+            @Override
+            public int next() throws EOFException {
+                if (!iterator.hasNext()) {
+                    throw new EOFException();
+                }
+                return iterator.nextInt();
+            }
+
+            @Override
+            public void close() {}
+        };
+    }
+
+    public int[] toInts() {
+        return set.toIntArray();
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/IntIterator.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/IntIterator.java
new file mode 100644
index 000000000..f2ef79367
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IntIterator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Iterator for ints. */
+public interface IntIterator extends Closeable {
+
+    int next() throws IOException;
+
+    static int[] toInts(IntIterator input) {
+        return toIntList(input).stream().mapToInt(Integer::intValue).toArray();
+    }
+
+    static List<Integer> toIntList(IntIterator input) {
+        List<Integer> ints = new ArrayList<>();
+        try (IntIterator iterator = input) {
+            while (true) {
+                try {
+                    ints.add(iterator.next());
+                } catch (EOFException ignored) {
+                    break;
+                }
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+        return ints;
+    }
+
+    static IntIterator create(int[] ints) {
+        return new IntIterator() {
+
+            int pos = -1;
+
+            @Override
+            public int next() throws EOFException {
+                if (pos >= ints.length - 1) {
+                    throw new EOFException();
+                }
+                return ints[++pos];
+            }
+
+            @Override
+            public void close() {}
+        };
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/IntHashSetTest.java 
b/paimon-common/src/test/java/org/apache/paimon/utils/IntHashSetTest.java
index c476ff563..25e3e033f 100644
--- a/paimon-common/src/test/java/org/apache/paimon/utils/IntHashSetTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/IntHashSetTest.java
@@ -46,6 +46,6 @@ public class IntHashSetTest {
         values.forEach(set::add);
 
         int[] expected = 
values.stream().mapToInt(Integer::intValue).sorted().toArray();
-        assertThat(set.toSortedInts()).containsExactly(expected);
+        assertThat(set.toInts()).containsExactlyInAnyOrder(expected);
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 26e711121..d1025706e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -21,6 +21,9 @@ package org.apache.paimon;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.index.HashIndexFile;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.operation.FileStoreCommitImpl;
@@ -116,6 +119,18 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 forWrite ? writeManifestCache : null);
     }
 
+    protected IndexManifestFile.Factory indexManifestFileFactory() {
+        return new IndexManifestFile.Factory(fileIO, options.manifestFormat(), 
pathFactory());
+    }
+
+    @Override
+    public IndexFileHandler newIndexFileHandler() {
+        return new IndexFileHandler(
+                snapshotManager(),
+                indexManifestFileFactory().create(),
+                new HashIndexFile(fileIO, pathFactory().indexFileFactory()));
+    }
+
     @Override
     public RowType partitionType() {
         return partitionType;
@@ -137,6 +152,7 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 snapshotManager(),
                 manifestFileFactory(),
                 manifestListFactory(),
+                indexManifestFileFactory(),
                 newScan(),
                 options.bucket(),
                 options.manifestTargetSize(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 1bc724a4b..d0d1b94c9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon;
 
+import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.FileStoreExpire;
@@ -51,6 +52,8 @@ public interface FileStore<T> extends Serializable {
 
     FileStoreScan newScan();
 
+    IndexFileHandler newIndexFileHandler();
+
     FileStoreRead<T> newRead();
 
     FileStoreWrite<T> newWrite(String commitUser);
diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java 
b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 8c9139b94..8cb82922c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -73,6 +73,7 @@ public class Snapshot {
     private static final String FIELD_BASE_MANIFEST_LIST = "baseManifestList";
     private static final String FIELD_DELTA_MANIFEST_LIST = 
"deltaManifestList";
     private static final String FIELD_CHANGELOG_MANIFEST_LIST = 
"changelogManifestList";
+    private static final String FIELD_INDEX_MANIFEST = "indexManifest";
     private static final String FIELD_COMMIT_USER = "commitUser";
     private static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
     private static final String FIELD_COMMIT_KIND = "commitKind";
@@ -110,6 +111,12 @@ public class Snapshot {
     @Nullable
     private final String changelogManifestList;
 
+    // a manifest recording all index files of this table
+    // null if no index file
+    @JsonProperty(FIELD_INDEX_MANIFEST)
+    @Nullable
+    private final String indexManifest;
+
     @JsonProperty(FIELD_COMMIT_USER)
     private final String commitUser;
 
@@ -164,6 +171,7 @@ public class Snapshot {
             String baseManifestList,
             String deltaManifestList,
             @Nullable String changelogManifestList,
+            @Nullable String indexManifest,
             String commitUser,
             long commitIdentifier,
             CommitKind commitKind,
@@ -180,6 +188,7 @@ public class Snapshot {
                 baseManifestList,
                 deltaManifestList,
                 changelogManifestList,
+                indexManifest,
                 commitUser,
                 commitIdentifier,
                 commitKind,
@@ -199,6 +208,7 @@ public class Snapshot {
             @JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList,
             @JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList,
             @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String 
changelogManifestList,
+            @JsonProperty(FIELD_INDEX_MANIFEST) @Nullable String indexManifest,
             @JsonProperty(FIELD_COMMIT_USER) String commitUser,
             @JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier,
             @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
@@ -214,6 +224,7 @@ public class Snapshot {
         this.baseManifestList = baseManifestList;
         this.deltaManifestList = deltaManifestList;
         this.changelogManifestList = changelogManifestList;
+        this.indexManifest = indexManifest;
         this.commitUser = commitUser;
         this.commitIdentifier = commitIdentifier;
         this.commitKind = commitKind;
@@ -257,6 +268,12 @@ public class Snapshot {
         return changelogManifestList;
     }
 
+    @JsonGetter(FIELD_INDEX_MANIFEST)
+    @Nullable
+    public String indexManifest() {
+        return indexManifest;
+    }
+
     @JsonGetter(FIELD_COMMIT_USER)
     public String commitUser() {
         return commitUser;
@@ -397,6 +414,7 @@ public class Snapshot {
                 baseManifestList,
                 deltaManifestList,
                 changelogManifestList,
+                indexManifest,
                 commitUser,
                 commitIdentifier,
                 commitKind,
@@ -420,6 +438,7 @@ public class Snapshot {
                 && Objects.equals(baseManifestList, that.baseManifestList)
                 && Objects.equals(deltaManifestList, that.deltaManifestList)
                 && Objects.equals(changelogManifestList, 
that.changelogManifestList)
+                && Objects.equals(indexManifest, that.indexManifest)
                 && Objects.equals(commitUser, that.commitUser)
                 && commitIdentifier == that.commitIdentifier
                 && commitKind == that.commitKind
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java 
b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
new file mode 100644
index 000000000..0c13b69ce
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.IntIterator;
+import org.apache.paimon.utils.PathFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+import static org.apache.paimon.utils.IntFileUtils.readInts;
+import static org.apache.paimon.utils.IntFileUtils.writeInts;
+
+/** Hash index file contains ints. */
+public class HashIndexFile {
+
+    public static final String HASH_INDEX = "HASH";
+
+    private final FileIO fileIO;
+    private final PathFactory pathFactory;
+
+    public HashIndexFile(FileIO fileIO, PathFactory pathFactory) {
+        this.fileIO = fileIO;
+        this.pathFactory = pathFactory;
+    }
+
+    public long fileSize(String fileName) {
+        try {
+            return fileIO.getFileSize(pathFactory.toPath(fileName));
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    public IntIterator read(String fileName) throws IOException {
+        return readInts(fileIO, pathFactory.toPath(fileName));
+    }
+
+    public String write(IntIterator input) throws IOException {
+        Path path = pathFactory.newPath();
+        writeInts(fileIO, path, input);
+        return path.getName();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
new file mode 100644
index 000000000..725b9772d
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.IndexManifestFile;
+import org.apache.paimon.utils.IntIterator;
+import org.apache.paimon.utils.SnapshotManager;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
+
+/** Handle index files. */
+public class IndexFileHandler {
+
+    private final SnapshotManager snapshotManager;
+    private final IndexManifestFile indexManifestFile;
+    private final HashIndexFile hashIndex;
+
+    public IndexFileHandler(
+            SnapshotManager snapshotManager,
+            IndexManifestFile indexManifestFile,
+            HashIndexFile hashIndex) {
+        this.snapshotManager = snapshotManager;
+        this.indexManifestFile = indexManifestFile;
+        this.hashIndex = hashIndex;
+    }
+
+    public Optional<IndexFileMeta> scan(
+            long snapshotId, String indexType, BinaryRow partition, int 
bucket) {
+        List<IndexManifestEntry> entries = scan(snapshotId, indexType, 
partition);
+        List<IndexManifestEntry> result = new ArrayList<>();
+        for (IndexManifestEntry file : entries) {
+            if (file.bucket() == bucket) {
+                result.add(file);
+            }
+        }
+        if (result.size() > 1) {
+            throw new IllegalArgumentException(
+                    "Find multiple index files for one bucket: " + result);
+        }
+        return result.isEmpty() ? Optional.empty() : 
Optional.of(result.get(0).indexFile());
+    }
+
+    public List<IndexManifestEntry> scan(String indexType, BinaryRow 
partition) {
+        Long snapshot = snapshotManager.latestSnapshotId();
+        if (snapshot == null) {
+            return Collections.emptyList();
+        }
+
+        return scan(snapshot, indexType, partition);
+    }
+
+    public List<IndexManifestEntry> scan(long snapshotId, String indexType, 
BinaryRow partition) {
+        Snapshot snapshot = snapshotManager.snapshot(snapshotId);
+        String indexManifest = snapshot.indexManifest();
+        if (indexManifest == null) {
+            throw new IllegalArgumentException("Index manifest is null in 
snapshot: " + snapshot);
+        }
+
+        List<IndexManifestEntry> allFiles = 
indexManifestFile.read(indexManifest);
+        List<IndexManifestEntry> result = new ArrayList<>();
+        for (IndexManifestEntry file : allFiles) {
+            if (file.indexFile().indexType().equals(indexType)
+                    && file.partition().equals(partition)) {
+                result.add(file);
+            }
+        }
+
+        return result;
+    }
+
+    public List<Integer> readHashIndexList(IndexFileMeta file) {
+        return IntIterator.toIntList(readHashIndex(file));
+    }
+
+    public IntIterator readHashIndex(IndexFileMeta file) {
+        if (!file.indexType().equals(HASH_INDEX)) {
+            throw new IllegalArgumentException("Input file is not hash index: 
" + file.indexType());
+        }
+
+        try {
+            return hashIndex.read(file.fileName());
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    public IndexFileMeta writeHashIndex(int[] ints) {
+        return writeHashIndex(ints.length, IntIterator.create(ints));
+    }
+
+    public IndexFileMeta writeHashIndex(int size, IntIterator iterator) {
+        String file;
+        try {
+            file = hashIndex.write(iterator);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+        return new IndexFileMeta(HASH_INDEX, file, hashIndex.fileSize(file), 
size);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
new file mode 100644
index 000000000..6fb3e176b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+
+/** Metadata of index file. */
+public class IndexFileMeta {
+
+    private final String indexType;
+    private final String fileName;
+    private final long fileSize;
+    private final long rowCount;
+
+    public IndexFileMeta(String indexType, String fileName, long fileSize, 
long rowCount) {
+        this.indexType = indexType;
+        this.fileName = fileName;
+        this.fileSize = fileSize;
+        this.rowCount = rowCount;
+    }
+
+    public String indexType() {
+        return indexType;
+    }
+
+    public String fileName() {
+        return fileName;
+    }
+
+    public long fileSize() {
+        return fileSize;
+    }
+
+    public long rowCount() {
+        return rowCount;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        IndexFileMeta that = (IndexFileMeta) o;
+        return Objects.equals(indexType, that.indexType)
+                && Objects.equals(fileName, that.fileName)
+                && fileSize == that.fileSize
+                && rowCount == that.rowCount;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(indexType, fileName, fileSize, rowCount);
+    }
+
+    @Override
+    public String toString() {
+        return "IndexManifestEntry{"
+                + "indexType="
+                + indexType
+                + ", fileName='"
+                + fileName
+                + '\''
+                + ", fileSize="
+                + fileSize
+                + ", rowCount="
+                + rowCount
+                + '}';
+    }
+
+    public static RowType schema() {
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "_INDEX_TYPE", newStringType(false)));
+        fields.add(new DataField(1, "_FILE_NAME", newStringType(false)));
+        fields.add(new DataField(2, "_FILE_SIZE", new BigIntType(false)));
+        fields.add(new DataField(3, "_ROW_COUNT", new BigIntType(false)));
+        return new RowType(fields);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
new file mode 100644
index 000000000..2813dd48b
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.ObjectSerializer;
+import org.apache.paimon.utils.VersionedObjectSerializer;
+
+/** A {@link VersionedObjectSerializer} for {@link IndexFileMeta}. */
+public class IndexFileMetaSerializer extends ObjectSerializer<IndexFileMeta> {
+
+    public IndexFileMetaSerializer() {
+        super(IndexFileMeta.schema());
+    }
+
+    @Override
+    public InternalRow toRow(IndexFileMeta record) {
+        return GenericRow.of(
+                BinaryString.fromString(record.indexType()),
+                BinaryString.fromString(record.fileName()),
+                record.fileSize(),
+                record.rowCount());
+    }
+
+    @Override
+    public IndexFileMeta fromRow(InternalRow row) {
+        return new IndexFileMeta(
+                row.getString(0).toString(),
+                row.getString(1).toString(),
+                row.getLong(2),
+                row.getLong(3));
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java 
b/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
new file mode 100644
index 000000000..a62d24109
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.index.IndexFileMeta;
+
+import java.util.List;
+import java.util.Objects;
+
+/** Incremental index files. */
+public class IndexIncrement {
+
+    private final List<IndexFileMeta> newIndexFiles;
+
+    public IndexIncrement(List<IndexFileMeta> newIndexFiles) {
+        this.newIndexFiles = newIndexFiles;
+    }
+
+    public List<IndexFileMeta> newIndexFiles() {
+        return newIndexFiles;
+    }
+
+    public boolean isEmpty() {
+        return newIndexFiles.isEmpty();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        IndexIncrement that = (IndexIncrement) o;
+        return Objects.equals(newIndexFiles, that.newIndexFiles);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(newIndexFiles);
+    }
+
+    @Override
+    public String toString() {
+        return "IndexIncrement{" + "newIndexFiles=" + newIndexFiles + '}';
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
new file mode 100644
index 000000000..e2b136855
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.SerializationUtils.newBytesType;
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+
+/** Manifest entry for index file. */
+public class IndexManifestEntry {
+
+    private final FileKind kind;
+    private final BinaryRow partition;
+    private final int bucket;
+    private final IndexFileMeta indexFile;
+
+    public IndexManifestEntry(
+            FileKind kind, BinaryRow partition, int bucket, IndexFileMeta 
indexFile) {
+        this.kind = kind;
+        this.partition = partition;
+        this.bucket = bucket;
+        this.indexFile = indexFile;
+    }
+
+    public IndexManifestEntry toDeleteEntry() {
+        checkArgument(kind == FileKind.ADD);
+        return new IndexManifestEntry(FileKind.DELETE, partition, bucket, 
indexFile);
+    }
+
+    public FileKind kind() {
+        return kind;
+    }
+
+    public BinaryRow partition() {
+        return partition;
+    }
+
+    public int bucket() {
+        return bucket;
+    }
+
+    public IndexFileMeta indexFile() {
+        return indexFile;
+    }
+
+    public static RowType schema() {
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "_KIND", new TinyIntType(false)));
+        fields.add(new DataField(1, "_PARTITION", newBytesType(false)));
+        fields.add(new DataField(2, "_BUCKET", new IntType(false)));
+        fields.add(new DataField(3, "_INDEX_TYPE", newStringType(false)));
+        fields.add(new DataField(4, "_FILE_NAME", newStringType(false)));
+        fields.add(new DataField(5, "_FILE_SIZE", new BigIntType(false)));
+        fields.add(new DataField(6, "_ROW_COUNT", new BigIntType(false)));
+        return new RowType(fields);
+    }
+
+    public Identifier identifier() {
+        return new Identifier(partition, bucket, indexFile.indexType());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        IndexManifestEntry entry = (IndexManifestEntry) o;
+        return bucket == entry.bucket
+                && kind == entry.kind
+                && Objects.equals(partition, entry.partition)
+                && Objects.equals(indexFile, entry.indexFile);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(kind, partition, bucket, indexFile);
+    }
+
+    @Override
+    public String toString() {
+        return "IndexManifestEntry{"
+                + "kind="
+                + kind
+                + ", partition="
+                + partition
+                + ", bucket="
+                + bucket
+                + ", indexFile="
+                + indexFile
+                + '}';
+    }
+
+    /** The {@link Identifier} of a {@link IndexFileMeta}. */
+    public static class Identifier {
+
+        public final BinaryRow partition;
+        public final int bucket;
+        public final String indexType;
+
+        private Integer hash;
+
+        private Identifier(BinaryRow partition, int bucket, String indexType) {
+            this.partition = partition;
+            this.bucket = bucket;
+            this.indexType = indexType;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            Identifier that = (Identifier) o;
+            return bucket == that.bucket
+                    && Objects.equals(partition, that.partition)
+                    && Objects.equals(indexType, that.indexType);
+        }
+
+        @Override
+        public int hashCode() {
+            if (hash == null) {
+                hash = Objects.hash(partition, bucket, indexType);
+            }
+            return hash;
+        }
+
+        @Override
+        public String toString() {
+            return "Identifier{"
+                    + "partition="
+                    + partition
+                    + ", bucket="
+                    + bucket
+                    + ", indexType='"
+                    + indexType
+                    + '\''
+                    + '}';
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
new file mode 100644
index 000000000..40c4cb5a8
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.utils.VersionedObjectSerializer;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** A {@link VersionedObjectSerializer} for {@link IndexManifestEntry}. */
+public class IndexManifestEntrySerializer extends 
VersionedObjectSerializer<IndexManifestEntry> {
+
+    public IndexManifestEntrySerializer() {
+        super(IndexManifestEntry.schema());
+    }
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public InternalRow convertTo(IndexManifestEntry record) {
+        IndexFileMeta indexFile = record.indexFile();
+        return GenericRow.of(
+                record.kind().toByteValue(),
+                serializeBinaryRow(record.partition()),
+                record.bucket(),
+                BinaryString.fromString(indexFile.indexType()),
+                BinaryString.fromString(indexFile.fileName()),
+                indexFile.fileSize(),
+                indexFile.rowCount());
+    }
+
+    @Override
+    public IndexManifestEntry convertFrom(int version, InternalRow row) {
+        if (version != 1) {
+            throw new UnsupportedOperationException("Unsupported version: " + 
version);
+        }
+
+        return new IndexManifestEntry(
+                FileKind.fromByteValue(row.getByte(0)),
+                deserializeBinaryRow(row.getBinary(1)),
+                row.getInt(2),
+                new IndexFileMeta(
+                        row.getString(3).toString(),
+                        row.getString(4).toString(),
+                        row.getLong(5),
+                        row.getLong(6)));
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
new file mode 100644
index 000000000..8df6d67ec
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.manifest.IndexManifestEntry.Identifier;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.ObjectsFile;
+import org.apache.paimon.utils.PathFactory;
+import org.apache.paimon.utils.VersionedObjectSerializer;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Index manifest file. */
+public class IndexManifestFile extends ObjectsFile<IndexManifestEntry> {
+
+    private IndexManifestFile(
+            FileIO fileIO,
+            FormatReaderFactory readerFactory,
+            FormatWriterFactory writerFactory,
+            PathFactory pathFactory) {
+        super(
+                fileIO,
+                new IndexManifestEntrySerializer(),
+                readerFactory,
+                writerFactory,
+                pathFactory,
+                null);
+    }
+
+    /** Merge new index files to index manifest. */
+    @Nullable
+    public String merge(
+            @Nullable String previousIndexManifest, List<IndexManifestEntry> 
newIndexFiles) {
+        String indexManifest = previousIndexManifest;
+        if (newIndexFiles.size() > 0) {
+            Map<Identifier, IndexManifestEntry> indexEntries = new 
LinkedHashMap<>();
+            List<IndexManifestEntry> entries =
+                    indexManifest == null ? new ArrayList<>() : 
read(indexManifest);
+            entries.addAll(newIndexFiles);
+            for (IndexManifestEntry file : entries) {
+                if (file.kind() == FileKind.ADD) {
+                    indexEntries.put(file.identifier(), file);
+                    if (file.indexFile().rowCount() == 0) {
+                        indexEntries.remove(file.identifier());
+                        
fileIO.deleteQuietly(pathFactory.toPath(file.indexFile().fileName()));
+                    }
+                } else {
+                    indexEntries.remove(file.identifier());
+                }
+            }
+            indexManifest = writeWithoutRolling(indexEntries.values());
+        }
+
+        return indexManifest;
+    }
+
+    /** Creator of {@link IndexManifestFile}. */
+    public static class Factory {
+
+        private final FileIO fileIO;
+        private final FileFormat fileFormat;
+        private final FileStorePathFactory pathFactory;
+
+        public Factory(FileIO fileIO, FileFormat fileFormat, 
FileStorePathFactory pathFactory) {
+            this.fileIO = fileIO;
+            this.fileFormat = fileFormat;
+            this.pathFactory = pathFactory;
+        }
+
+        public IndexManifestFile create() {
+            RowType schema = 
VersionedObjectSerializer.versionType(IndexManifestEntry.schema());
+            return new IndexManifestFile(
+                    fileIO,
+                    fileFormat.createReaderFactory(schema),
+                    fileFormat.createWriterFactory(schema),
+                    pathFactory.indexManifestFileFactory());
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index c7628e677..c7e242e80 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -27,6 +27,8 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
@@ -98,6 +100,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
     private final SnapshotManager snapshotManager;
     private final ManifestFile manifestFile;
     private final ManifestList manifestList;
+    private final IndexManifestFile indexManifestFile;
     private final FileStoreScan scan;
     private final int numBucket;
     private final MemorySize manifestTargetSize;
@@ -118,6 +121,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             SnapshotManager snapshotManager,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
+            IndexManifestFile.Factory indexManifestFileFactory,
             FileStoreScan scan,
             int numBucket,
             MemorySize manifestTargetSize,
@@ -134,6 +138,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.snapshotManager = snapshotManager;
         this.manifestFile = manifestFileFactory.create();
         this.manifestList = manifestListFactory.create();
+        this.indexManifestFile = indexManifestFileFactory.create();
         this.scan = scan;
         this.numBucket = numBucket;
         this.manifestTargetSize = manifestTargetSize;
@@ -194,14 +199,19 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         List<ManifestEntry> appendChangelog = new ArrayList<>();
         List<ManifestEntry> compactTableFiles = new ArrayList<>();
         List<ManifestEntry> compactChangelog = new ArrayList<>();
+        List<IndexManifestEntry> appendIndexFiles = new ArrayList<>();
         collectChanges(
                 committable.fileCommittables(),
                 appendTableFiles,
                 appendChangelog,
                 compactTableFiles,
-                compactChangelog);
+                compactChangelog,
+                appendIndexFiles);
 
-        if (!ignoreEmptyCommit || !appendTableFiles.isEmpty() || 
!appendChangelog.isEmpty()) {
+        if (!ignoreEmptyCommit
+                || !appendTableFiles.isEmpty()
+                || !appendChangelog.isEmpty()
+                || !appendIndexFiles.isEmpty()) {
             // Optimization for common path.
             // Step 1:
             // Read manifest entries from changed partitions here and check 
for conflicts.
@@ -222,6 +232,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             tryCommit(
                     appendTableFiles,
                     appendChangelog,
+                    appendIndexFiles,
                     committable.identifier(),
                     committable.watermark(),
                     committable.logOffsets(),
@@ -246,6 +257,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             tryCommit(
                     compactTableFiles,
                     compactChangelog,
+                    Collections.emptyList(),
                     committable.identifier(),
                     committable.watermark(),
                     committable.logOffsets(),
@@ -271,12 +283,14 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         List<ManifestEntry> appendChangelog = new ArrayList<>();
         List<ManifestEntry> compactTableFiles = new ArrayList<>();
         List<ManifestEntry> compactChangelog = new ArrayList<>();
+        List<IndexManifestEntry> appendIndexFiles = new ArrayList<>();
         collectChanges(
                 committable.fileCommittables(),
                 appendTableFiles,
                 appendChangelog,
                 compactTableFiles,
-                compactChangelog);
+                compactChangelog,
+                appendIndexFiles);
 
         if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) {
             StringBuilder warnMessage =
@@ -337,6 +351,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             tryOverwrite(
                     partitionFilter,
                     appendTableFiles,
+                    appendIndexFiles,
                     committable.identifier(),
                     committable.watermark(),
                     committable.logOffsets());
@@ -346,6 +361,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             tryCommit(
                     compactTableFiles,
                     Collections.emptyList(),
+                    Collections.emptyList(),
                     committable.identifier(),
                     committable.watermark(),
                     committable.logOffsets(),
@@ -373,6 +389,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         tryOverwrite(
                 partitionFilter,
                 Collections.emptyList(),
+                Collections.emptyList(),
                 commitIdentifier,
                 null,
                 Collections.emptyMap());
@@ -406,7 +423,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             List<ManifestEntry> appendTableFiles,
             List<ManifestEntry> appendChangelog,
             List<ManifestEntry> compactTableFiles,
-            List<ManifestEntry> compactChangelog) {
+            List<ManifestEntry> compactChangelog,
+            List<IndexManifestEntry> appendIndexFiles) {
         for (CommitMessage message : commitMessages) {
             CommitMessageImpl commitMessage = (CommitMessageImpl) message;
             commitMessage
@@ -432,6 +450,17 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     .compactIncrement()
                     .changelogFiles()
                     .forEach(m -> compactChangelog.add(makeEntry(FileKind.ADD, 
commitMessage, m)));
+            commitMessage
+                    .indexIncrement()
+                    .newIndexFiles()
+                    .forEach(
+                            f ->
+                                    appendIndexFiles.add(
+                                            new IndexManifestEntry(
+                                                    FileKind.ADD,
+                                                    commitMessage.partition(),
+                                                    commitMessage.bucket(),
+                                                    f)));
         }
     }
 
@@ -443,6 +472,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     private void tryCommit(
             List<ManifestEntry> tableFiles,
             List<ManifestEntry> changelogFiles,
+            List<IndexManifestEntry> indexFiles,
             long identifier,
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets,
@@ -453,6 +483,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             if (tryCommitOnce(
                     tableFiles,
                     changelogFiles,
+                    indexFiles,
                     identifier,
                     watermark,
                     logOffsets,
@@ -467,6 +498,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     private void tryOverwrite(
             Predicate partitionFilter,
             List<ManifestEntry> changes,
+            List<IndexManifestEntry> indexFiles,
             long identifier,
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets) {
@@ -474,6 +506,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             Long latestSnapshotId = snapshotManager.latestSnapshotId();
 
             List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
+            List<IndexManifestEntry> indexChangesWithOverwrite = new 
ArrayList<>();
             if (latestSnapshotId != null) {
                 List<ManifestEntry> currentEntries =
                         scan.withSnapshot(latestSnapshotId)
@@ -489,12 +522,29 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                                     entry.totalBuckets(),
                                     entry.file()));
                 }
+
+                // collect index files
+                Snapshot snapshot = snapshotManager.snapshot(latestSnapshotId);
+                if (snapshot.indexManifest() != null) {
+                    RowDataToObjectArrayConverter converter =
+                            new RowDataToObjectArrayConverter(partitionType);
+                    List<IndexManifestEntry> entries =
+                            indexManifestFile.read(snapshot.indexManifest());
+                    for (IndexManifestEntry entry : entries) {
+                        if (partitionFilter == null
+                                || 
partitionFilter.test(converter.convert(entry.partition()))) {
+                            
indexChangesWithOverwrite.add(entry.toDeleteEntry());
+                        }
+                    }
+                }
             }
             changesWithOverwrite.addAll(changes);
+            indexChangesWithOverwrite.addAll(indexFiles);
 
             if (tryCommitOnce(
                     changesWithOverwrite,
                     Collections.emptyList(),
+                    indexChangesWithOverwrite,
                     identifier,
                     watermark,
                     logOffsets,
@@ -510,6 +560,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     public boolean tryCommitOnce(
             List<ManifestEntry> tableFiles,
             List<ManifestEntry> changelogFiles,
+            List<IndexManifestEntry> indexFiles,
             long identifier,
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets,
@@ -545,12 +596,14 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         String previousChangesListName = null;
         String newChangesListName = null;
         String changelogListName = null;
+        String newIndexManifest = null;
         List<ManifestFileMeta> oldMetas = new ArrayList<>();
         List<ManifestFileMeta> newMetas = new ArrayList<>();
         List<ManifestFileMeta> changelogMetas = new ArrayList<>();
         try {
             long previousTotalRecordCount = 0L;
             Long currentWatermark = watermark;
+            String previousIndexManifest = null;
             if (latestSnapshot != null) {
                 previousTotalRecordCount = 
latestSnapshot.totalRecordCount(scan);
                 List<ManifestFileMeta> previousManifests =
@@ -567,6 +620,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                     ? latestWatermark
                                     : Math.max(currentWatermark, 
latestWatermark);
                 }
+                previousIndexManifest = latestSnapshot.indexManifest();
             }
             // merge manifest files with changes
             newMetas.addAll(
@@ -591,6 +645,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 changelogListName = manifestList.write(changelogMetas);
             }
 
+            // write new index manifest
+            String indexManifest = 
indexManifestFile.merge(previousIndexManifest, indexFiles);
+            if (!Objects.equals(indexManifest, previousIndexManifest)) {
+                newIndexManifest = indexManifest;
+            }
+
             // prepare snapshot file
             newSnapshot =
                     new Snapshot(
@@ -599,6 +659,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                             previousChangesListName,
                             newChangesListName,
                             changelogListName,
+                            indexManifest,
                             commitUser,
                             identifier,
                             commitKind,
@@ -614,6 +675,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                     previousChangesListName,
                     newChangesListName,
                     changelogListName,
+                    newIndexManifest,
                     oldMetas,
                     newMetas,
                     changelogMetas);
@@ -695,6 +757,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 previousChangesListName,
                 newChangesListName,
                 changelogListName,
+                newIndexManifest,
                 oldMetas,
                 newMetas,
                 changelogMetas);
@@ -813,6 +876,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             String previousChangesListName,
             String newChangesListName,
             String changelogListName,
+            String newIndexManifest,
             List<ManifestFileMeta> oldMetas,
             List<ManifestFileMeta> newMetas,
             List<ManifestFileMeta> changelogMetas) {
@@ -826,6 +890,9 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         if (changelogListName != null) {
             manifestList.delete(changelogListName);
         }
+        if (newIndexManifest != null) {
+            indexManifestFile.delete(newIndexManifest);
+        }
         // clean up newly merged manifest files
         Set<ManifestFileMeta> oldMetaSet = new HashSet<>(oldMetas); // for 
faster searching
         for (ManifestFileMeta suspect : newMetas) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
index ee45f121a..1730fd7e6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
@@ -18,15 +18,18 @@
 
 package org.apache.paimon.table.sink;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataInputViewStreamWrapper;
 import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.io.NewFilesIncrement;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.util.Collections;
 import java.util.Objects;
 
 import static org.apache.paimon.utils.SerializationUtils.deserializedBytes;
@@ -44,16 +47,33 @@ public class CommitMessageImpl implements CommitMessage {
     private transient int bucket;
     private transient NewFilesIncrement newFilesIncrement;
     private transient CompactIncrement compactIncrement;
+    private transient IndexIncrement indexIncrement;
 
+    @VisibleForTesting
     public CommitMessageImpl(
             BinaryRow partition,
             int bucket,
             NewFilesIncrement newFilesIncrement,
             CompactIncrement compactIncrement) {
+        this(
+                partition,
+                bucket,
+                newFilesIncrement,
+                compactIncrement,
+                new IndexIncrement(Collections.emptyList()));
+    }
+
+    public CommitMessageImpl(
+            BinaryRow partition,
+            int bucket,
+            NewFilesIncrement newFilesIncrement,
+            CompactIncrement compactIncrement,
+            IndexIncrement indexIncrement) {
         this.partition = partition;
         this.bucket = bucket;
         this.newFilesIncrement = newFilesIncrement;
         this.compactIncrement = compactIncrement;
+        this.indexIncrement = indexIncrement;
     }
 
     @Override
@@ -74,8 +94,14 @@ public class CommitMessageImpl implements CommitMessage {
         return compactIncrement;
     }
 
+    public IndexIncrement indexIncrement() {
+        return indexIncrement;
+    }
+
     public boolean isEmpty() {
-        return newFilesIncrement.isEmpty() && compactIncrement.isEmpty();
+        return newFilesIncrement.isEmpty()
+                && compactIncrement.isEmpty()
+                && indexIncrement.isEmpty();
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
@@ -94,6 +120,7 @@ public class CommitMessageImpl implements CommitMessage {
         this.bucket = message.bucket;
         this.newFilesIncrement = message.newFilesIncrement;
         this.compactIncrement = message.compactIncrement;
+        this.indexIncrement = message.indexIncrement;
     }
 
     @Override
@@ -109,12 +136,13 @@ public class CommitMessageImpl implements CommitMessage {
         return bucket == that.bucket
                 && Objects.equals(partition, that.partition)
                 && Objects.equals(newFilesIncrement, that.newFilesIncrement)
-                && Objects.equals(compactIncrement, that.compactIncrement);
+                && Objects.equals(compactIncrement, that.compactIncrement)
+                && Objects.equals(indexIncrement, that.indexIncrement);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(partition, bucket, newFilesIncrement, 
compactIncrement);
+        return Objects.hash(partition, bucket, newFilesIncrement, 
compactIncrement, indexIncrement);
     }
 
     @Override
@@ -124,7 +152,8 @@ public class CommitMessageImpl implements CommitMessage {
                         + "partition = %s, "
                         + "bucket = %d, "
                         + "newFilesIncrement = %s, "
-                        + "compactIncrement = %s}",
-                partition, bucket, newFilesIncrement, compactIncrement);
+                        + "compactIncrement = %s, "
+                        + "indexIncrement = %s}",
+                partition, bucket, newFilesIncrement, compactIncrement, 
indexIncrement);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 38f993a72..da70c766d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -19,12 +19,14 @@
 package org.apache.paimon.table.sink;
 
 import org.apache.paimon.data.serializer.VersionedSerializer;
+import org.apache.paimon.index.IndexFileMetaSerializer;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.io.DataInputDeserializer;
 import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.io.DataOutputView;
 import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.io.NewFilesIncrement;
 
 import java.io.ByteArrayOutputStream;
@@ -41,9 +43,11 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
     private static final int CURRENT_VERSION = 2;
 
     private final DataFileMetaSerializer dataFileSerializer;
+    private final IndexFileMetaSerializer indexEntrySerializer;
 
     public CommitMessageSerializer() {
         this.dataFileSerializer = new DataFileMetaSerializer();
+        this.indexEntrySerializer = new IndexFileMetaSerializer();
     }
 
     @Override
@@ -75,6 +79,7 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
         
dataFileSerializer.serializeList(message.compactIncrement().compactBefore(), 
view);
         
dataFileSerializer.serializeList(message.compactIncrement().compactAfter(), 
view);
         
dataFileSerializer.serializeList(message.compactIncrement().changelogFiles(), 
view);
+        
indexEntrySerializer.serializeList(message.indexIncrement().newIndexFiles(), 
view);
     }
 
     @Override
@@ -116,6 +121,7 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
                 new CompactIncrement(
                         dataFileSerializer.deserializeList(view),
                         dataFileSerializer.deserializeList(view),
-                        dataFileSerializer.deserializeList(view)));
+                        dataFileSerializer.deserializeList(view)),
+                new 
IndexIncrement(indexEntrySerializer.deserializeList(view)));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 304fbdb82..17d167d72 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -54,6 +54,8 @@ public class FileStorePathFactory {
 
     private final AtomicInteger manifestFileCount;
     private final AtomicInteger manifestListCount;
+    private final AtomicInteger indexManifestCount;
+    private final AtomicInteger indexFileCount;
 
     public FileStorePathFactory(Path root) {
         this(
@@ -74,6 +76,8 @@ public class FileStorePathFactory {
 
         this.manifestFileCount = new AtomicInteger(0);
         this.manifestListCount = new AtomicInteger(0);
+        this.indexManifestCount = new AtomicInteger(0);
+        this.indexFileCount = new AtomicInteger(0);
     }
 
     public Path root() {
@@ -169,4 +173,38 @@ public class FileStorePathFactory {
             }
         };
     }
+
+    public PathFactory indexManifestFileFactory() {
+        return new PathFactory() {
+            @Override
+            public Path newPath() {
+                return new Path(
+                        root
+                                + "/manifest/index-manifest-"
+                                + uuid
+                                + "-"
+                                + indexManifestCount.getAndIncrement());
+            }
+
+            @Override
+            public Path toPath(String fileName) {
+                return new Path(root + "/manifest/" + fileName);
+            }
+        };
+    }
+
+    public PathFactory indexFileFactory() {
+        return new PathFactory() {
+            @Override
+            public Path newPath() {
+                return new Path(
+                        root + "/index/index-" + uuid + "-" + 
indexFileCount.getAndIncrement());
+            }
+
+            @Override
+            public Path toPath(String fileName) {
+                return new Path(root + "/index/" + fileName);
+            }
+        };
+    }
 }
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 4552008e1..68ce6e190 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -23,7 +23,9 @@ import 
org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
@@ -58,6 +60,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -156,6 +159,7 @@ public class TestFileStore extends KeyValueFileStore {
                 false,
                 null,
                 watermark,
+                Collections.emptyList(),
                 (commit, committable) -> commit.commit(committable, 
Collections.emptyMap()));
     }
 
@@ -172,6 +176,7 @@ public class TestFileStore extends KeyValueFileStore {
                 false,
                 null,
                 null,
+                Collections.emptyList(),
                 (commit, committable) -> {
                     logOffsets.forEach(committable::addLogOffset);
                     commit.commit(committable, Collections.emptyMap());
@@ -191,6 +196,7 @@ public class TestFileStore extends KeyValueFileStore {
                 true,
                 null,
                 null,
+                Collections.emptyList(),
                 (commit, committable) ->
                         commit.overwrite(partition, committable, 
Collections.emptyMap()));
     }
@@ -212,6 +218,23 @@ public class TestFileStore extends KeyValueFileStore {
         return snapshotManager.snapshot(snapshotIdAfterCommit);
     }
 
+    public List<Snapshot> commitDataIndex(
+            KeyValue kv,
+            Function<KeyValue, BinaryRow> partitionCalculator,
+            int bucket,
+            IndexFileMeta... indexFiles)
+            throws Exception {
+        return commitDataImpl(
+                Collections.singletonList(kv),
+                partitionCalculator,
+                ignore -> bucket,
+                false,
+                null,
+                null,
+                Arrays.asList(indexFiles),
+                (commit, committable) -> commit.commit(committable, 
Collections.emptyMap()));
+    }
+
     public List<Snapshot> commitDataImpl(
             List<KeyValue> kvs,
             Function<KeyValue, BinaryRow> partitionCalculator,
@@ -219,6 +242,7 @@ public class TestFileStore extends KeyValueFileStore {
             boolean emptyWriter,
             Long identifier,
             Long watermark,
+            List<IndexFileMeta> indexFiles,
             BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
             throws Exception {
         AbstractFileStoreWrite<KeyValue> write = newWrite();
@@ -262,7 +286,8 @@ public class TestFileStore extends KeyValueFileStore {
                                 entryWithPartition.getKey(),
                                 entryWithBucket.getKey(),
                                 increment.newFilesIncrement(),
-                                increment.compactIncrement()));
+                                increment.compactIncrement(),
+                                new IndexIncrement(indexFiles)));
             }
         }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/HashIndexFileTest.java 
b/paimon-core/src/test/java/org/apache/paimon/index/HashIndexFileTest.java
new file mode 100644
index 000000000..9fcc459c3
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/index/HashIndexFileTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.utils.IntIterator;
+import org.apache.paimon.utils.PathFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link HashIndexFile}. */
+public class HashIndexFileTest {
+
+    @TempDir java.nio.file.Path tempPath;
+
+    @Test
+    public void test() throws IOException {
+        Path dir = new Path(tempPath.toUri());
+        PathFactory pathFactory =
+                new PathFactory() {
+                    @Override
+                    public Path newPath() {
+                        return new Path(dir, UUID.randomUUID().toString());
+                    }
+
+                    @Override
+                    public Path toPath(String fileName) {
+                        return new Path(dir, fileName);
+                    }
+                };
+
+        HashIndexFile file = new HashIndexFile(LocalFileIO.create(), 
pathFactory);
+
+        Random rnd = new Random();
+        List<Integer> random = new ArrayList<>();
+        for (int i = 0; i < rnd.nextInt(100_000); i++) {
+            random.add(rnd.nextInt());
+        }
+
+        String name =
+                file.write(
+                        
IntIterator.create(random.stream().mapToInt(Integer::intValue).toArray()));
+
+        List<Integer> result = IntIterator.toIntList(file.read(name));
+        assertThat(result).containsExactlyInAnyOrderElementsOf(random);
+
+        assertThat(file.fileSize(name)).isEqualTo(random.size() * 4L);
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/IntHashSetTest.java 
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
similarity index 52%
copy from 
paimon-common/src/test/java/org/apache/paimon/utils/IntHashSetTest.java
copy to 
paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
index c476ff563..8f1ef392a 100644
--- a/paimon-common/src/test/java/org/apache/paimon/utils/IntHashSetTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
@@ -16,36 +16,32 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.utils;
+package org.apache.paimon.index;
 
-import org.junit.jupiter.api.Test;
+import org.apache.paimon.utils.ObjectSerializer;
+import org.apache.paimon.utils.ObjectSerializerTestBase;
 
-import java.util.HashSet;
 import java.util.Random;
-import java.util.Set;
 
-import static org.assertj.core.api.Assertions.assertThat;
+/** Test for {@link org.apache.paimon.index.IndexFileMetaSerializer}. */
+public class IndexFileMetaSerializerTest extends 
ObjectSerializerTestBase<IndexFileMeta> {
 
-/** Test for {@link IntHashSet}. */
-public class IntHashSetTest {
-
-    @Test
-    public void testRandom() {
-        Set<Integer> values = new HashSet<>();
-        Random rnd = new Random();
-        for (int i = 0; i < rnd.nextInt(100); i++) {
-            values.add(rnd.nextInt());
-        }
-        if (rnd.nextBoolean()) {
-            values.add(0);
-            values.add(-1);
-            values.add(1);
-        }
+    @Override
+    protected ObjectSerializer<IndexFileMeta> serializer() {
+        return new IndexFileMetaSerializer();
+    }
 
-        IntHashSet set = new IntHashSet();
-        values.forEach(set::add);
+    @Override
+    protected IndexFileMeta object() {
+        return randomIndexFile();
+    }
 
-        int[] expected = 
values.stream().mapToInt(Integer::intValue).sorted().toArray();
-        assertThat(set.toSortedInts()).containsExactly(expected);
+    public static IndexFileMeta randomIndexFile() {
+        Random rnd = new Random();
+        return new IndexFileMeta(
+                HashIndexFile.HASH_INDEX,
+                "my_file_name" + rnd.nextLong(),
+                rnd.nextInt(),
+                rnd.nextInt());
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestEntrySerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestEntrySerializerTest.java
new file mode 100644
index 000000000..1ee3d6b5d
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestEntrySerializerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.index.HashIndexFile;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.utils.ObjectSerializer;
+import org.apache.paimon.utils.ObjectSerializerTestBase;
+
+import java.util.Random;
+
+import static org.apache.paimon.io.DataFileTestUtils.row;
+
+/** Test for {@link IndexManifestEntrySerializer}. */
+public class IndexManifestEntrySerializerTest extends 
ObjectSerializerTestBase<IndexManifestEntry> {
+
+    @Override
+    protected ObjectSerializer<IndexManifestEntry> serializer() {
+        return new IndexManifestEntrySerializer();
+    }
+
+    @Override
+    protected IndexManifestEntry object() {
+        return randomIndexEntry();
+    }
+
+    public static IndexManifestEntry randomIndexEntry() {
+        Random rnd = new Random();
+        return new IndexManifestEntry(
+                rnd.nextBoolean() ? FileKind.ADD : FileKind.DELETE,
+                row(rnd.nextInt()),
+                rnd.nextInt(),
+                new IndexFileMeta(
+                        HashIndexFile.HASH_INDEX,
+                        "my_file_name" + rnd.nextLong(),
+                        rnd.nextInt(),
+                        rnd.nextInt()));
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index c7469c735..d31332be6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -434,6 +434,7 @@ public class FileDeletionTest {
                 .tryCommitOnce(
                         delete,
                         Collections.emptyList(),
+                        Collections.emptyList(),
                         commitIdentifier++,
                         null,
                         Collections.emptyMap(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index fbcb4c054..e42dce33a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -26,6 +26,9 @@ import org.apache.paimon.TestKeyValueGenerator;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -56,12 +59,14 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -501,6 +506,7 @@ public class FileStoreCommitTest {
                 false,
                 null,
                 null,
+                Collections.emptyList(),
                 (commit, committable) -> commit.commit(committable, 
Collections.emptyMap()));
         
assertThat(store.snapshotManager().latestSnapshotId()).isEqualTo(snapshot.id());
 
@@ -512,6 +518,7 @@ public class FileStoreCommitTest {
                 false,
                 null,
                 null,
+                Collections.emptyList(),
                 (commit, committable) -> {
                     commit.ignoreEmptyCommit(false);
                     commit.commit(committable, Collections.emptyMap());
@@ -533,6 +540,7 @@ public class FileStoreCommitTest {
                     false,
                     (long) i,
                     null,
+                    Collections.emptyList(),
                     (commit, committable) -> {
                         commit.commit(committable, Collections.emptyMap());
                         committables.add(committable);
@@ -679,6 +687,95 @@ public class FileStoreCommitTest {
                                 "Partitions list cannot be empty."));
     }
 
+    @Test
+    public void testIndexFiles() throws Exception {
+        TestFileStore store = createStore(false, 2);
+        IndexFileHandler indexFileHandler = store.newIndexFileHandler();
+
+        KeyValue record1 = gen.next();
+        BinaryRow part1 = gen.getPartition(record1);
+        KeyValue record2 = record1;
+        BinaryRow part2 = part1;
+        while (part1.equals(part2)) {
+            record2 = gen.next();
+            part2 = gen.getPartition(record2);
+        }
+
+        // init write
+        store.commitDataIndex(
+                record1,
+                gen::getPartition,
+                0,
+                indexFileHandler.writeHashIndex(new int[] {1, 2, 5}));
+        store.commitDataIndex(
+                record1, gen::getPartition, 1, 
indexFileHandler.writeHashIndex(new int[] {6, 8}));
+        store.commitDataIndex(
+                record2, gen::getPartition, 2, 
indexFileHandler.writeHashIndex(new int[] {3, 5}));
+
+        Snapshot snapshot = store.snapshotManager().latestSnapshot();
+
+        // assert part1
+        List<IndexManifestEntry> part1Index =
+                indexFileHandler.scan(snapshot.id(), HASH_INDEX, part1);
+        assertThat(part1Index.size()).isEqualTo(2);
+
+        assertThat(part1Index.get(0).bucket()).isEqualTo(0);
+        
assertThat(indexFileHandler.readHashIndexList(part1Index.get(0).indexFile()))
+                .containsExactlyInAnyOrder(1, 2, 5);
+
+        assertThat(part1Index.get(1).bucket()).isEqualTo(1);
+        
assertThat(indexFileHandler.readHashIndexList(part1Index.get(1).indexFile()))
+                .containsExactlyInAnyOrder(6, 8);
+
+        // assert part2
+        List<IndexManifestEntry> part2Index =
+                indexFileHandler.scan(snapshot.id(), HASH_INDEX, part2);
+        assertThat(part2Index.size()).isEqualTo(1);
+        assertThat(part2Index.get(0).bucket()).isEqualTo(2);
+        
assertThat(indexFileHandler.readHashIndexList(part2Index.get(0).indexFile()))
+                .containsExactlyInAnyOrder(3, 5);
+
+        // update part1
+        store.commitDataIndex(
+                record1, gen::getPartition, 0, 
indexFileHandler.writeHashIndex(new int[] {1, 4}));
+        snapshot = store.snapshotManager().latestSnapshot();
+
+        // assert update part1
+        part1Index = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part1);
+        assertThat(part1Index.size()).isEqualTo(2);
+
+        assertThat(part1Index.get(0).bucket()).isEqualTo(0);
+        
assertThat(indexFileHandler.readHashIndexList(part1Index.get(0).indexFile()))
+                .containsExactlyInAnyOrder(1, 4);
+
+        assertThat(part1Index.get(1).bucket()).isEqualTo(1);
+        
assertThat(indexFileHandler.readHashIndexList(part1Index.get(1).indexFile()))
+                .containsExactlyInAnyOrder(6, 8);
+
+        // assert scan one bucket
+        Optional<IndexFileMeta> file = indexFileHandler.scan(snapshot.id(), 
HASH_INDEX, part1, 0);
+        assertThat(file).isPresent();
+        
assertThat(indexFileHandler.readHashIndexList(file.get())).containsExactlyInAnyOrder(1,
 4);
+
+        // overwrite one partition
+        
store.options().toConfiguration().set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE, 
true);
+        store.overwriteData(
+                Collections.singletonList(record1), gen::getPartition, kv -> 
0, new HashMap<>());
+        snapshot = store.snapshotManager().latestSnapshot();
+        file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part1, 0);
+        assertThat(file).isEmpty();
+        file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part2, 2);
+        assertThat(file).isPresent();
+
+        // overwrite all partitions
+        
store.options().toConfiguration().set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE, 
false);
+        store.overwriteData(
+                Collections.singletonList(record1), gen::getPartition, kv -> 
0, new HashMap<>());
+        snapshot = store.snapshotManager().latestSnapshot();
+        file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part2, 2);
+        assertThat(file).isEmpty();
+    }
+
     private TestFileStore createStore(boolean failing) throws Exception {
         return createStore(failing, 1);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
index d789d4176..a6cf04b0e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
@@ -19,12 +19,15 @@
 package org.apache.paimon.table.sink;
 
 import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.io.NewFilesIncrement;
 
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 
+import static 
org.apache.paimon.index.IndexFileMetaSerializerTest.randomIndexFile;
 import static 
org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomCompactIncrement;
 import static 
org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomNewFilesIncrement;
 import static 
org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
@@ -38,9 +41,15 @@ public class CommitMessageSerializerTest {
         CommitMessageSerializer serializer = new CommitMessageSerializer();
         NewFilesIncrement newFilesIncrement = randomNewFilesIncrement();
         CompactIncrement compactIncrement = randomCompactIncrement();
-        CommitMessage committable =
-                new CommitMessageImpl(row(0), 1, newFilesIncrement, 
compactIncrement);
-        CommitMessage newCommittable = serializer.deserialize(2, 
serializer.serialize(committable));
-        assertThat(newCommittable).isEqualTo(committable);
+        IndexIncrement indexIncrement =
+                new IndexIncrement(Arrays.asList(randomIndexFile(), 
randomIndexFile()));
+        CommitMessageImpl committable =
+                new CommitMessageImpl(
+                        row(0), 1, newFilesIncrement, compactIncrement, 
indexIncrement);
+        CommitMessageImpl newCommittable =
+                (CommitMessageImpl) serializer.deserialize(2, 
serializer.serialize(committable));
+        
assertThat(newCommittable.compactIncrement()).isEqualTo(committable.compactIncrement());
+        
assertThat(newCommittable.newFilesIncrement()).isEqualTo(committable.newFilesIncrement());
+        
assertThat(newCommittable.indexIncrement()).isEqualTo(committable.indexIncrement());
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index a5735cc51..3052b093b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -66,6 +66,7 @@ public class SnapshotManagerTest {
                             null,
                             null,
                             null,
+                            null,
                             0L,
                             Snapshot.CommitKind.APPEND,
                             millis + i * 1000,
@@ -102,6 +103,7 @@ public class SnapshotManagerTest {
                             null,
                             null,
                             null,
+                            null,
                             0L,
                             Snapshot.CommitKind.APPEND,
                             i * 1000,


Reply via email to