This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b3eeea91e [core] Introduce file index read/write framework. (#3177)
b3eeea91e is described below

commit b3eeea91e8e1a862b375732333768f823c3fb770
Author: YeJunHao <[email protected]>
AuthorDate: Mon Apr 15 18:54:33 2024 +0800

    [core] Introduce file index read/write framework. (#3177)
---
 .../shortcodes/generated/core_configuration.html   |  12 ++
 .../main/java/org/apache/paimon/CoreOptions.java   |  97 +++++++++++
 .../apache/paimon/fileindex/FileIndexFormat.java   |  17 +-
 .../apache/paimon/fileindex/FileIndexOptions.java  |  69 ++++++++
 .../bloomfilter/BloomFilterFileIndex.java          |  22 +--
 .../org/apache/paimon/utils/BloomFilter64.java     |  42 ++++-
 .../org/apache/paimon/AppendOnlyFileStore.java     |   6 +-
 .../java/org/apache/paimon/KeyValueFileStore.java  |   3 +-
 .../org/apache/paimon/append/AppendOnlyWriter.java |   9 +-
 .../java/org/apache/paimon/io/DataFileMeta.java    |  54 +++++-
 .../apache/paimon/io/DataFileMetaSerializer.java   |   6 +-
 .../org/apache/paimon/io/DataFilePathFactory.java  |   6 +
 .../apache/paimon/io/FileIndexRecordReader.java    |  90 ++++++++++
 .../java/org/apache/paimon/io/FileIndexWriter.java | 194 +++++++++++++++++++++
 .../org/apache/paimon/io/FileRecordReader.java     |  36 ++--
 .../apache/paimon/io/KeyValueDataFileWriter.java   |   4 +-
 .../org/apache/paimon/io/RowDataFileWriter.java    |  31 +++-
 .../apache/paimon/io/RowDataRollingFileWriter.java |   7 +-
 .../paimon/operation/AppendOnlyFileStoreScan.java  |  45 ++++-
 .../paimon/operation/AppendOnlyFileStoreWrite.java |   9 +-
 .../apache/paimon/operation/RawFileSplitRead.java  |  94 +++++++---
 .../apache/paimon/stats/FieldStatsConverters.java  |  15 ++
 .../AppendOnlyTableCompactionCoordinatorTest.java  |   3 +-
 .../apache/paimon/append/AppendOnlyWriterTest.java |   4 +-
 .../paimon/crosspartition/IndexBootstrapTest.java  |   3 +-
 .../apache/paimon/format/FileFormatSuffixTest.java |   4 +-
 .../paimon/io/DataFileTestDataGenerator.java       |   3 +-
 .../org/apache/paimon/io/DataFileTestUtils.java    |   9 +-
 .../apache/paimon/io/RollingFileWriterTest.java    |   4 +-
 .../ManifestCommittableSerializerTest.java         |   3 +-
 .../paimon/manifest/ManifestFileMetaTestBase.java  |   6 +-
 .../org/apache/paimon/mergetree/LevelsTest.java    |  14 +-
 .../mergetree/compact/IntervalPartitionTest.java   |   3 +-
 .../mergetree/compact/UniversalCompactionTest.java |   2 +-
 .../paimon/operation/ExpireSnapshotsTest.java      |   3 +-
 .../paimon/table/AppendOnlyFileStoreTableTest.java | 149 ++++++++++++++++
 .../paimon/table/source/SplitGeneratorTest.java    |   3 +-
 .../flink/UnawareBucketAppendOnlyTableITCase.java  |  12 +-
 .../sink/CompactionTaskSimpleSerializerTest.java   |   3 +-
 .../source/FileStoreSourceSplitGeneratorTest.java  |   3 +-
 .../source/FileStoreSourceSplitSerializerTest.java |   3 +-
 .../flink/source/TestChangelogDataReadWrite.java   |   3 +-
 42 files changed, 994 insertions(+), 111 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 8d39919d5..4c42bbfbb 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -206,6 +206,18 @@ under the License.
             <td>Boolean</td>
             <td>Whether only overwrite dynamic partition when overwriting a 
partitioned table with dynamic partition columns. Works only when the table has 
partition keys.</td>
         </tr>
+        <tr>
+            <td><h5>file-index.in-manifest-threshold</h5></td>
+            <td style="word-wrap: break-word;">500 bytes</td>
+            <td>MemorySize</td>
+            <td>The threshold to store file index bytes in manifest.</td>
+        </tr>
+        <tr>
+            <td><h5>file-index.read.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether enabled read file index.</td>
+        </tr>
         <tr>
             <td><h5>file-reader-async-threshold</h5></td>
             <td style="word-wrap: break-word;">10 mb</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 0d78d2ec8..b73133bae 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Documentation;
 import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
 import org.apache.paimon.annotation.Documentation.Immutable;
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.lookup.LookupStrategy;
@@ -70,6 +71,10 @@ public class CoreOptions implements Serializable {
 
     public static final String DISTINCT = "distinct";
 
+    public static final String FILE_INDEX = "file-index";
+
+    public static final String COLUMNS = "columns";
+
     public static final ConfigOption<Integer> BUCKET =
             key("bucket")
                     .intType()
@@ -135,6 +140,18 @@ public class CoreOptions implements Serializable {
                             "Default file compression format, orc is lz4 and 
parquet is snappy. It can be overridden by "
                                     + FILE_COMPRESSION_PER_LEVEL.key());
 
+    public static final ConfigOption<MemorySize> 
FILE_INDEX_IN_MANIFEST_THRESHOLD =
+            key("file-index.in-manifest-threshold")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("500 B"))
+                    .withDescription("The threshold to store file index bytes 
in manifest.");
+
+    public static final ConfigOption<Boolean> FILE_INDEX_READ_ENABLED =
+            key("file-index.read.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Whether enabled read file index.");
+
     public static final ConfigOption<FileFormatType> MANIFEST_FORMAT =
             key("manifest.format")
                     .enumType(FileFormatType.class)
@@ -1703,6 +1720,86 @@ public class CoreOptions implements Serializable {
         return options.get(DELETION_VECTORS_ENABLED);
     }
 
+    public FileIndexOptions indexColumnsOptions() {
+        String fileIndexPrefix = FILE_INDEX + ".";
+        String fileIndexColumnSuffix = "." + COLUMNS;
+
+        FileIndexOptions fileIndexOptions = new 
FileIndexOptions(fileIndexInManifestThreshold());
+        for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
+            String key = entry.getKey();
+            if (key.startsWith(fileIndexPrefix)) {
+                // start with file-index, decode this option
+                if (key.endsWith(fileIndexColumnSuffix)) {
+                    // if end with .column, set up indexes
+                    String indexType =
+                            key.substring(
+                                    fileIndexPrefix.length(),
+                                    key.length() - 
fileIndexColumnSuffix.length());
+                    String[] names = entry.getValue().split(",");
+                    for (String name : names) {
+                        if (StringUtils.isBlank(name)) {
+                            throw new IllegalArgumentException(
+                                    "Wrong option in " + key + ", should not 
have empty column");
+                        }
+                        fileIndexOptions.computeIfAbsent(name.trim(), 
indexType);
+                    }
+                } else {
+                    // else, it must be an option
+                    String[] kv = 
key.substring(fileIndexPrefix.length()).split("\\.");
+                    if (kv.length != 3) {
+                        continue;
+                    }
+                    String indexType = kv[0];
+                    String cname = kv[1];
+                    String opkey = kv[2];
+
+                    if (fileIndexOptions.get(cname, indexType) == null) {
+                        // if indexes have not set, find .column in options, 
then set them
+                        String columns =
+                                options.get(fileIndexPrefix + indexType + 
fileIndexColumnSuffix);
+                        if (columns == null) {
+                            continue;
+                        }
+                        String[] names = columns.split(",");
+                        boolean foundTarget = false;
+                        for (String name : names) {
+                            if (StringUtils.isBlank(name)) {
+                                throw new IllegalArgumentException(
+                                        "Wrong option in "
+                                                + key
+                                                + ", should not have empty 
column");
+                            }
+                            String tname = name.trim();
+                            if (cname.equals(tname)) {
+                                foundTarget = true;
+                            }
+                            fileIndexOptions.computeIfAbsent(name.trim(), 
indexType);
+                        }
+                        if (!foundTarget) {
+                            throw new IllegalArgumentException(
+                                    "Wrong option in "
+                                            + key
+                                            + ", can't found column "
+                                            + cname
+                                            + " in "
+                                            + columns);
+                        }
+                    }
+                    fileIndexOptions.get(cname, indexType).set(opkey, 
entry.getValue());
+                }
+            }
+        }
+        return fileIndexOptions;
+    }
+
+    public long fileIndexInManifestThreshold() {
+        return options.get(FILE_INDEX_IN_MANIFEST_THRESHOLD).getBytes();
+    }
+
+    public boolean fileIndexReadEnabled() {
+        return options.get(FILE_INDEX_READ_ENABLED);
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row.", 
true, true),
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
index ec809dff4..4f03faf90 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
@@ -46,12 +46,12 @@ import java.util.stream.Collectors;
  * File index file format. Put all column and offset in the header.
  *
  * <pre>
- * _______________________________________    _____________________
+ *   _____________________________________    _____________________
  * |     magic    |version|head length |
  * |-------------------------------------|
- * |              column size            |
+ * |            column number            |
  * |-------------------------------------|
- * |   column 1        |   index size   |
+ * |   column 1        | index number   |
  * |-------------------------------------|
  * |  index name 1 |start pos |length  |
  * |-------------------------------------|
@@ -59,7 +59,7 @@ import java.util.stream.Collectors;
  * |-------------------------------------|
  * |  index name 3 |start pos |length  |
  * |-------------------------------------|            HEAD
- * |   column 2        |   index size   |
+ * |   column 2        | index number   |
  * |-------------------------------------|
  * |  index name 1 |start pos |length  |
  * |-------------------------------------|
@@ -82,14 +82,15 @@ import java.util.stream.Collectors;
  * magic:                            8 bytes long
  * version:                          4 bytes int
  * head length:                      4 bytes int
- * index type:                       var bytes utf (length + bytes)
- * body info size:                   4 bytes int (how many column items below)
- * column name:                      var bytes utf
+ * column number:                    4 bytes int
+ * column x:                         var bytes utf (length + bytes)
+ * index number:                     4 bytes int (how many column items below)
+ * index name x:                     var bytes utf
  * start pos:                        4 bytes int
  * length:                           4 bytes int
  * redundant length:                 4 bytes int (for compatibility with later 
versions, in this version, content is zero)
  * redundant bytes:                  var bytes (for compatibility with later 
version, in this version, is empty)
- * BODY:                             column bytes + column bytes + column 
bytes + .......
+ * BODY:                             column index bytes + column index bytes + 
column index bytes + .......
  *
  * </pre>
  */
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java
new file mode 100644
index 000000000..e8751245c
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.Options;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Options of file index column. */
+public class FileIndexOptions {
+
+    // if the filter size greater than fileIndexInManifestThreshold, we put it 
in file
+    private final long fileIndexInManifestThreshold;
+
+    private final Map<String, Map<String, Options>> indexTypeOptions;
+
+    public FileIndexOptions() {
+        
this(CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.defaultValue().getBytes());
+    }
+
+    public FileIndexOptions(long fileIndexInManifestThreshold) {
+        this.indexTypeOptions = new HashMap<>();
+        this.fileIndexInManifestThreshold = fileIndexInManifestThreshold;
+    }
+
+    public void computeIfAbsent(String column, String indexType) {
+        indexTypeOptions
+                .computeIfAbsent(column, c -> new HashMap<>())
+                .computeIfAbsent(indexType, i -> new Options());
+    }
+
+    public Options get(String column, String indexType) {
+        return Optional.ofNullable(indexTypeOptions.getOrDefault(column, null))
+                .map(x -> x.get(indexType))
+                .orElse(null);
+    }
+
+    public boolean isEmpty() {
+        return indexTypeOptions.isEmpty();
+    }
+
+    public long fileIndexInManifestThreshold() {
+        return fileIndexInManifestThreshold;
+    }
+
+    public Set<Map.Entry<String, Map<String, Options>>> entrySet() {
+        return indexTypeOptions.entrySet();
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
index c0486d535..37ba4d205 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
@@ -25,11 +25,10 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.BloomFilter64;
+import org.apache.paimon.utils.BloomFilter64.BitSet;
 
 import org.apache.hadoop.util.bloom.HashFunction;
 
-import java.util.BitSet;
-
 /**
  * Bloom filter for file index.
  *
@@ -40,7 +39,7 @@ import java.util.BitSet;
  */
 public class BloomFilterFileIndex implements FileIndexer {
 
-    public static final String BLOOM_FILTER = "bloom";
+    public static final String BLOOM_FILTER = "bloom-filter";
 
     private static final int DEFAULT_ITEMS = 1_000_000;
     private static final double DEFAULT_FPP = 0.1;
@@ -84,19 +83,21 @@ public class BloomFilterFileIndex implements FileIndexer {
 
         @Override
         public void write(Object key) {
-            filter.addHash(hashFunction.hash(key));
+            if (key != null) {
+                filter.addHash(hashFunction.hash(key));
+            }
         }
 
         @Override
         public byte[] serializedBytes() {
             int numHashFunctions = filter.getNumHashFunctions();
-            byte[] bytes = filter.getBitSet().toByteArray();
-            byte[] serialized = new byte[bytes.length + Integer.BYTES];
+            byte[] serialized = new byte[filter.getBitSet().bitSize() / 
Byte.SIZE + Integer.BYTES];
+            // little endian
             serialized[0] = (byte) ((numHashFunctions >>> 24) & 0xFF);
             serialized[1] = (byte) ((numHashFunctions >>> 16) & 0xFF);
             serialized[2] = (byte) ((numHashFunctions >>> 8) & 0xFF);
             serialized[3] = (byte) (numHashFunctions & 0xFF);
-            System.arraycopy(bytes, 0, serialized, 4, bytes.length);
+            filter.getBitSet().toByteArray(serialized, 4, serialized.length - 
4);
             return serialized;
         }
     }
@@ -107,21 +108,20 @@ public class BloomFilterFileIndex implements FileIndexer {
         private final FastHash hashFunction;
 
         public Reader(DataType type, byte[] serializedBytes) {
+            // little endian
             int numHashFunctions =
                     ((serializedBytes[0] << 24)
                             + (serializedBytes[1] << 16)
                             + (serializedBytes[2] << 8)
                             + serializedBytes[3]);
-            byte[] bytes = new byte[serializedBytes.length - Integer.BYTES];
-            System.arraycopy(serializedBytes, 4, bytes, 0, bytes.length);
-            BitSet bitSet = BitSet.valueOf(bytes);
+            BitSet bitSet = new BitSet(serializedBytes, 4);
             this.filter = new BloomFilter64(numHashFunctions, bitSet);
             this.hashFunction = FastHash.getHashFunction(type);
         }
 
         @Override
         public Boolean visitEqual(FieldRef fieldRef, Object key) {
-            return filter.testHash(hashFunction.hash(key));
+            return key == null || filter.testHash(hashFunction.hash(key));
         }
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java
index 75b8661a2..5a4be5a45 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.utils;
 
-import java.util.BitSet;
-
 /** Bloom filter 64 handle 64 bits hash. */
 public final class BloomFilter64 {
 
@@ -29,15 +27,15 @@ public final class BloomFilter64 {
 
     public BloomFilter64(long items, double fpp) {
         int nb = (int) (-items * Math.log(fpp) / (Math.log(2) * Math.log(2)));
-        this.numBits = nb + (Long.SIZE - (nb % Long.SIZE));
+        this.numBits = nb + (Byte.SIZE - (nb % Byte.SIZE));
         this.numHashFunctions =
                 Math.max(1, (int) Math.round((double) numBits / items * 
Math.log(2)));
-        this.bitSet = new BitSet(numBits);
+        this.bitSet = new BitSet(new byte[numBits / Byte.SIZE], 0);
     }
 
     public BloomFilter64(int numHashFunctions, BitSet bitSet) {
         this.numHashFunctions = numHashFunctions;
-        this.numBits = bitSet.size();
+        this.numBits = bitSet.bitSize();
         this.bitSet = bitSet;
     }
 
@@ -81,4 +79,38 @@ public final class BloomFilter64 {
     public BitSet getBitSet() {
         return bitSet;
     }
+
+    /** Bit set used for bloom filter 64. */
+    public static class BitSet {
+
+        private static final byte MAST = 0x07;
+
+        private final byte[] data;
+        private final int offset;
+
+        public BitSet(byte[] data, int offset) {
+            assert data.length > 0 : "data length is zero!";
+            assert offset >= 0 : "offset is negative!";
+            this.data = data;
+            this.offset = offset;
+        }
+
+        public void set(int index) {
+            data[(index >>> 3) + offset] |= (byte) ((byte) 1 << (index & 
MAST));
+        }
+
+        public boolean get(int index) {
+            return (data[(index >>> 3) + offset] & ((byte) 1 << (index & 
MAST))) != 0;
+        }
+
+        public int bitSize() {
+            return (data.length - offset) * Byte.SIZE;
+        }
+
+        public void toByteArray(byte[] bytes, int offset, int length) {
+            if (length >= 0) {
+                System.arraycopy(data, this.offset, bytes, offset, length);
+            }
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 186fd4799..99efac540 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -86,7 +86,8 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 schema,
                 rowType,
                 FileFormatDiscover.of(options),
-                pathFactory());
+                pathFactory(),
+                options.fileIndexReadEnabled());
     }
 
     @Override
@@ -145,7 +146,8 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 options.bucket(),
                 forWrite,
                 options.scanManifestParallelism(),
-                branchName);
+                branchName,
+                options.fileIndexReadEnabled());
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 354cc6dda..aeb30731d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -138,7 +138,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 schema,
                 valueType,
                 FileFormatDiscover.of(options),
-                pathFactory());
+                pathFactory(),
+                options.fileIndexReadEnabled());
     }
 
     public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 9d9f41897..0ae21e6b2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.disk.RowBuffer;
+import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.CompactIncrement;
@@ -78,6 +79,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
     private SinkWriter sinkWriter;
     private final FieldStatsCollector.Factory[] statsCollectors;
     private final IOManager ioManager;
+    private final FileIndexOptions fileIndexOptions;
 
     private MemorySegmentPool memorySegmentPool;
     private MemorySize maxDiskSize;
@@ -100,7 +102,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
             String fileCompression,
             String spillCompression,
             FieldStatsCollector.Factory[] statsCollectors,
-            MemorySize maxDiskSize) {
+            MemorySize maxDiskSize,
+            FileIndexOptions fileIndexOptions) {
         this.fileIO = fileIO;
         this.schemaId = schemaId;
         this.fileFormat = fileFormat;
@@ -120,6 +123,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
         this.ioManager = ioManager;
         this.statsCollectors = statsCollectors;
         this.maxDiskSize = maxDiskSize;
+        this.fileIndexOptions = fileIndexOptions;
 
         this.sinkWriter =
                 useWriteBuffer
@@ -246,7 +250,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
                 pathFactory,
                 seqNumCounter,
                 fileCompression,
-                statsCollectors);
+                statsCollectors,
+                fileIndexOptions);
     }
 
     private void trySyncLatestCompaction(boolean blocking)
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 30dacd130..fdf7266bd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -83,6 +83,9 @@ public class DataFileMeta {
     // We have to keep the compatibility.
     private final @Nullable Long deleteRowCount;
 
+    // file index filter bytes, if it is small, store in data file meta
+    private final @Nullable byte[] embeddedIndex;
+
     public static DataFileMeta forAppend(
             String fileName,
             long fileSize,
@@ -91,6 +94,28 @@ public class DataFileMeta {
             long minSequenceNumber,
             long maxSequenceNumber,
             long schemaId) {
+        return forAppend(
+                fileName,
+                fileSize,
+                rowCount,
+                rowStats,
+                minSequenceNumber,
+                maxSequenceNumber,
+                schemaId,
+                Collections.emptyList(),
+                null);
+    }
+
+    public static DataFileMeta forAppend(
+            String fileName,
+            long fileSize,
+            long rowCount,
+            BinaryTableStats rowStats,
+            long minSequenceNumber,
+            long maxSequenceNumber,
+            long schemaId,
+            List<String> extraFiles,
+            @Nullable byte[] embeddedIndex) {
         return new DataFileMeta(
                 fileName,
                 fileSize,
@@ -103,7 +128,10 @@ public class DataFileMeta {
                 maxSequenceNumber,
                 schemaId,
                 DUMMY_LEVEL,
-                0L);
+                extraFiles,
+                
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
+                0L,
+                embeddedIndex);
     }
 
     public DataFileMeta(
@@ -118,7 +146,8 @@ public class DataFileMeta {
             long maxSequenceNumber,
             long schemaId,
             int level,
-            @Nullable Long deleteRowCount) {
+            @Nullable Long deleteRowCount,
+            @Nullable byte[] embeddedIndex) {
         this(
                 fileName,
                 fileSize,
@@ -133,7 +162,8 @@ public class DataFileMeta {
                 level,
                 Collections.emptyList(),
                 
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
-                deleteRowCount);
+                deleteRowCount,
+                embeddedIndex);
     }
 
     public DataFileMeta(
@@ -150,12 +180,14 @@ public class DataFileMeta {
             int level,
             List<String> extraFiles,
             Timestamp creationTime,
-            @Nullable Long deleteRowCount) {
+            @Nullable Long deleteRowCount,
+            @Nullable byte[] embeddedIndex) {
         this.fileName = fileName;
         this.fileSize = fileSize;
 
         this.rowCount = rowCount;
 
+        this.embeddedIndex = embeddedIndex;
         this.minKey = minKey;
         this.maxKey = maxKey;
         this.keyStats = keyStats;
@@ -191,6 +223,10 @@ public class DataFileMeta {
         return Optional.ofNullable(deleteRowCount);
     }
 
+    public byte[] embeddedIndex() {
+        return embeddedIndex;
+    }
+
     public BinaryRow minKey() {
         return minKey;
     }
@@ -276,7 +312,8 @@ public class DataFileMeta {
                 newLevel,
                 extraFiles,
                 creationTime,
-                deleteRowCount);
+                deleteRowCount,
+                embeddedIndex);
     }
 
     public List<Path> collectFiles(DataFilePathFactory pathFactory) {
@@ -301,7 +338,8 @@ public class DataFileMeta {
                 level,
                 newExtraFiles,
                 creationTime,
-                deleteRowCount);
+                deleteRowCount,
+                embeddedIndex);
     }
 
     @Override
@@ -316,6 +354,7 @@ public class DataFileMeta {
         return Objects.equals(fileName, that.fileName)
                 && fileSize == that.fileSize
                 && rowCount == that.rowCount
+                && Objects.equals(embeddedIndex, that.embeddedIndex)
                 && Objects.equals(minKey, that.minKey)
                 && Objects.equals(maxKey, that.maxKey)
                 && Objects.equals(keyStats, that.keyStats)
@@ -335,6 +374,7 @@ public class DataFileMeta {
                 fileName,
                 fileSize,
                 rowCount,
+                embeddedIndex,
                 minKey,
                 maxKey,
                 keyStats,
@@ -358,6 +398,7 @@ public class DataFileMeta {
                 fileName,
                 fileSize,
                 rowCount,
+                embeddedIndex,
                 minKey,
                 maxKey,
                 keyStats,
@@ -387,6 +428,7 @@ public class DataFileMeta {
         fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false, 
newStringType(false))));
         fields.add(new DataField(12, "_CREATION_TIME", 
DataTypes.TIMESTAMP_MILLIS()));
         fields.add(new DataField(13, "_DELETE_ROW_COUNT", new 
BigIntType(true)));
+        fields.add(new DataField(14, "_EMBEDDED_FILE_INDEX", 
newBytesType(true)));
         return new RowType(fields);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
index f91e3d293..3de1fcac9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
@@ -54,7 +54,8 @@ public class DataFileMetaSerializer extends 
ObjectSerializer<DataFileMeta> {
                 meta.level(),
                 toStringArrayData(meta.extraFiles()),
                 meta.creationTime(),
-                meta.deleteRowCount().orElse(null));
+                meta.deleteRowCount().orElse(null),
+                meta.embeddedIndex());
     }
 
     @Override
@@ -73,6 +74,7 @@ public class DataFileMetaSerializer extends 
ObjectSerializer<DataFileMeta> {
                 row.getInt(10),
                 fromStringArrayData(row.getArray(11)),
                 row.getTimestamp(12, 3),
-                row.isNullAt(13) ? null : row.getLong(13));
+                row.isNullAt(13) ? null : row.getLong(13),
+                row.isNullAt(14) ? null : row.getBinary(14));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index 385609de3..ef83e8598 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -34,6 +34,8 @@ public class DataFilePathFactory {
 
     public static final String CHANGELOG_FILE_PREFIX = "changelog-";
 
+    public static final String INDEX_PATH_SUFFIX = ".index";
+
     private final Path parent;
     private final String uuid;
 
@@ -70,6 +72,10 @@ public class DataFilePathFactory {
         return uuid;
     }
 
+    public static Path toFileIndexPath(Path filePath) {
+        return new Path(filePath.getParent(), filePath.getName() + 
INDEX_PATH_SUFFIX);
+    }
+
     public static String formatIdentifier(String fileName) {
         int index = fileName.lastIndexOf('.');
         if (index == -1) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java
new file mode 100644
index 000000000..0a91d4f7f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexPredicate;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** File index reader, do the filter in the constructor. */
+public class FileIndexRecordReader implements RecordReader<InternalRow> {
+
+    private final RecordReader<InternalRow> reader;
+
+    public FileIndexRecordReader(
+            FileIO fileIO,
+            TableSchema dataSchema,
+            List<Predicate> dataFilter,
+            DataFilePathFactory dataFilePathFactory,
+            DataFileMeta file,
+            ConcatRecordReader.ReaderSupplier<InternalRow> readerSupplier)
+            throws IOException {
+        boolean filterThisFile = false;
+        if (dataFilter != null && !dataFilter.isEmpty()) {
+            List<String> indexFiles =
+                    file.extraFiles().stream()
+                            .filter(name -> 
name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+                            .collect(Collectors.toList());
+            if (!indexFiles.isEmpty()) {
+                if (indexFiles.size() > 1) {
+                    throw new RuntimeException(
+                            "Found more than one index file for one data file: 
"
+                                    + String.join(" and ", indexFiles));
+                }
+                // go to file index check
+                try (FileIndexPredicate predicate =
+                        new FileIndexPredicate(
+                                dataFilePathFactory.toPath(indexFiles.get(0)),
+                                fileIO,
+                                dataSchema.logicalRowType())) {
+                    if (!predicate.testPredicate(
+                            PredicateBuilder.and(dataFilter.toArray(new 
Predicate[0])))) {
+                        filterThisFile = true;
+                    }
+                }
+            }
+        }
+
+        this.reader = filterThisFile ? null : readerSupplier.get();
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<InternalRow> readBatch() throws IOException {
+        return reader == null ? null : reader.readBatch();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (reader != null) {
+            reader.close();
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
new file mode 100644
index 000000000..6ba635c5e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexFormat;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.fileindex.FileIndexer;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Index file writer. */
+public final class FileIndexWriter implements Closeable {
+
+    public static final FileIndexResult EMPTY_RESULT = 
FileIndexResult.of(null, null);
+
+    private final FileIO fileIO;
+
+    private final Path path;
+
+    // if the filter size greater than fileIndexInManifestThreshold, we put it 
in file
+    private final long inManifestThreshold;
+
+    private final List<IndexMaintainer> indexMaintainers = new ArrayList<>();
+
+    private String resultFileName;
+
+    private byte[] embeddedIndexBytes;
+
+    public FileIndexWriter(
+            FileIO fileIO, Path path, RowType rowType, FileIndexOptions 
fileIndexOptions) {
+        this.fileIO = fileIO;
+        this.path = path;
+        List<DataField> fields = rowType.getFields();
+        Map<String, DataField> map = new HashMap<>();
+        Map<String, Integer> index = new HashMap<>();
+        fields.forEach(
+                dataField -> {
+                    map.put(dataField.name(), dataField);
+                    index.put(dataField.name(), 
rowType.getFieldIndex(dataField.name()));
+                });
+        for (Map.Entry<String, Map<String, Options>> entry : 
fileIndexOptions.entrySet()) {
+            String columnName = entry.getKey();
+            DataField field = map.get(columnName);
+            if (field == null) {
+                throw new IllegalArgumentException(columnName + " does not 
exist in column fields");
+            }
+            for (Map.Entry<String, Options> typeEntry : 
entry.getValue().entrySet()) {
+                String indexType = typeEntry.getKey();
+                indexMaintainers.add(
+                        new IndexMaintainer(
+                                columnName,
+                                indexType,
+                                FileIndexer.create(indexType, field.type(), 
typeEntry.getValue())
+                                        .createWriter(),
+                                InternalRow.createFieldGetter(
+                                        field.type(), index.get(columnName))));
+            }
+        }
+        this.inManifestThreshold = 
fileIndexOptions.fileIndexInManifestThreshold();
+    }
+
+    public void write(InternalRow row) {
+        indexMaintainers.forEach(indexMaintainer -> 
indexMaintainer.write(row));
+    }
+
+    @Override
+    public void close() throws IOException {
+        Map<String, Map<String, byte[]>> indexMaps = new HashMap<>();
+
+        for (IndexMaintainer indexMaintainer : indexMaintainers) {
+            indexMaps
+                    .computeIfAbsent(indexMaintainer.getColumnName(), k -> new 
HashMap<>())
+                    .put(indexMaintainer.getIndexType(), 
indexMaintainer.serializedBytes());
+        }
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (FileIndexFormat.Writer writer = 
FileIndexFormat.createWriter(baos)) {
+            writer.writeColumnIndexes(indexMaps);
+        }
+
+        if (baos.size() > inManifestThreshold) {
+            try (OutputStream outputStream = fileIO.newOutputStream(path, 
false)) {
+                outputStream.write(baos.toByteArray());
+            }
+            resultFileName = path.getName();
+        } else {
+            embeddedIndexBytes = baos.toByteArray();
+        }
+    }
+
+    public FileIndexResult result() {
+        return FileIndexResult.of(embeddedIndexBytes, resultFileName);
+    }
+
+    @Nullable
+    public static FileIndexWriter create(
+            FileIO fileIO, Path path, RowType rowType, FileIndexOptions 
fileIndexOptions) {
+        return fileIndexOptions.isEmpty()
+                ? null
+                : new FileIndexWriter(fileIO, path, rowType, fileIndexOptions);
+    }
+
+    /** File index result. */
+    public interface FileIndexResult {
+
+        @Nullable
+        byte[] embeddedIndexBytes();
+
+        @Nullable
+        String independentIndexFile();
+
+        static FileIndexResult of(byte[] embeddedIndexBytes, String 
resultFileName) {
+            return new FileIndexResult() {
+
+                @Override
+                public byte[] embeddedIndexBytes() {
+                    return embeddedIndexBytes;
+                }
+
+                @Override
+                public String independentIndexFile() {
+                    return resultFileName;
+                }
+            };
+        }
+    }
+
+    /** One index maintainer for one column. */
+    private static class IndexMaintainer {
+
+        private final String columnName;
+        private final String indexType;
+        private final org.apache.paimon.fileindex.FileIndexWriter 
fileIndexWriter;
+        private final InternalRow.FieldGetter getter;
+
+        public IndexMaintainer(
+                String columnName,
+                String indexType,
+                org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter,
+                InternalRow.FieldGetter getter) {
+            this.columnName = columnName;
+            this.indexType = indexType;
+            this.fileIndexWriter = fileIndexWriter;
+            this.getter = getter;
+        }
+
+        public void write(InternalRow row) {
+            fileIndexWriter.write(getter.getFieldOrNull(row));
+        }
+
+        public String getIndexType() {
+            return indexType;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        public byte[] serializedBytes() {
+            return fileIndexWriter.serializedBytes();
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
index 0eac2961a..1e12025ba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
@@ -41,24 +41,6 @@ public class FileRecordReader implements 
RecordReader<InternalRow> {
     @Nullable private final PartitionInfo partitionInfo;
     @Nullable private final CastFieldGetter[] castMapping;
 
-    public FileRecordReader(
-            FormatReaderFactory readerFactory,
-            FormatReaderFactory.Context context,
-            @Nullable int[] indexMapping,
-            @Nullable CastFieldGetter[] castMapping,
-            @Nullable PartitionInfo partitionInfo)
-            throws IOException {
-        try {
-            this.reader = readerFactory.createReader(context);
-        } catch (Exception e) {
-            FileUtils.checkExists(context.fileIO(), context.filePath());
-            throw e;
-        }
-        this.indexMapping = indexMapping;
-        this.partitionInfo = partitionInfo;
-        this.castMapping = castMapping;
-    }
-
     @Nullable
     @Override
     public RecordReader.RecordIterator<InternalRow> readBatch() throws 
IOException {
@@ -89,6 +71,24 @@ public class FileRecordReader implements 
RecordReader<InternalRow> {
         return iterator;
     }
 
+    public FileRecordReader(
+            FormatReaderFactory readerFactory,
+            FormatReaderFactory.Context context,
+            @Nullable int[] indexMapping,
+            @Nullable CastFieldGetter[] castMapping,
+            @Nullable PartitionInfo partitionInfo)
+            throws IOException {
+        try {
+            this.reader = readerFactory.createReader(context);
+        } catch (Exception e) {
+            FileUtils.checkExists(context.fileIO(), context.filePath());
+            throw e;
+        }
+        this.indexMapping = indexMapping;
+        this.partitionInfo = partitionInfo;
+        this.castMapping = castMapping;
+    }
+
     @Override
     public void close() throws IOException {
         reader.close();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index e2e5441f6..0f1223cca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -168,6 +168,8 @@ public class KeyValueDataFileWriter
                 maxSeqNumber,
                 schemaId,
                 level,
-                deleteRecordCount);
+                deleteRecordCount,
+                // TODO: enable file filter for primary key table (e.g. 
deletion table).
+                null);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 3b085c645..ce441b279 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.io;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.TableStatsExtractor;
 import org.apache.paimon.fs.FileIO;
@@ -32,8 +33,11 @@ import org.apache.paimon.utils.LongCounter;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.function.Function;
 
+import static org.apache.paimon.io.DataFilePathFactory.toFileIndexPath;
+
 /**
  * A {@link StatsCollectingSingleFileWriter} to write data files containing 
{@link InternalRow}.
  * Also produces {@link DataFileMeta} after writing a file.
@@ -43,6 +47,7 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
     private final long schemaId;
     private final LongCounter seqNumCounter;
     private final FieldStatsArraySerializer statsArraySerializer;
+    @Nullable private final FileIndexWriter fileIndexWriter;
 
     public RowDataFileWriter(
             FileIO fileIO,
@@ -53,7 +58,8 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
             long schemaId,
             LongCounter seqNumCounter,
             String fileCompression,
-            FieldStatsCollector.Factory[] statsCollectors) {
+            FieldStatsCollector.Factory[] statsCollectors,
+            FileIndexOptions fileIndexOptions) {
         super(
                 fileIO,
                 factory,
@@ -66,17 +72,34 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
         this.schemaId = schemaId;
         this.seqNumCounter = seqNumCounter;
         this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
+        this.fileIndexWriter =
+                FileIndexWriter.create(
+                        fileIO, toFileIndexPath(path), writeSchema, 
fileIndexOptions);
     }
 
     @Override
     public void write(InternalRow row) throws IOException {
         super.write(row);
+        // add row to index if needed
+        if (fileIndexWriter != null) {
+            fileIndexWriter.write(row);
+        }
         seqNumCounter.add(1L);
     }
 
+    @Override
+    public void close() throws IOException {
+        if (fileIndexWriter != null) {
+            fileIndexWriter.close();
+        }
+        super.close();
+    }
+
     @Override
     public DataFileMeta result() throws IOException {
         BinaryTableStats stats = statsArraySerializer.toBinary(fieldStats());
+        FileIndexWriter.FileIndexResult indexResult =
+                fileIndexWriter == null ? FileIndexWriter.EMPTY_RESULT : 
fileIndexWriter.result();
         return DataFileMeta.forAppend(
                 path.getName(),
                 fileIO.getFileSize(path),
@@ -84,6 +107,10 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
                 stats,
                 seqNumCounter.getValue() - super.recordCount(),
                 seqNumCounter.getValue() - 1,
-                schemaId);
+                schemaId,
+                indexResult.independentIndexFile() == null
+                        ? Collections.emptyList()
+                        : 
Collections.singletonList(indexResult.independentIndexFile()),
+                indexResult.embeddedIndexBytes());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index 153929a36..8c92f17c8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.io;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.avro.AvroFileFormat;
 import org.apache.paimon.fs.FileIO;
@@ -38,7 +39,8 @@ public class RowDataRollingFileWriter extends 
RollingFileWriter<InternalRow, Dat
             DataFilePathFactory pathFactory,
             LongCounter seqNumCounter,
             String fileCompression,
-            FieldStatsCollector.Factory[] statsCollectors) {
+            FieldStatsCollector.Factory[] statsCollectors,
+            FileIndexOptions fileIndexOptions) {
         super(
                 () ->
                         new RowDataFileWriter(
@@ -54,7 +56,8 @@ public class RowDataRollingFileWriter extends 
RollingFileWriter<InternalRow, Dat
                                 schemaId,
                                 seqNumCounter,
                                 fileCompression,
-                                statsCollectors),
+                                statsCollectors,
+                                fileIndexOptions),
                 targetFileSize);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 866c87d75..baa2e9f4a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.operation;
 
 import org.apache.paimon.AppendOnlyFileStore;
+import org.apache.paimon.fileindex.FileIndexPredicate;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
@@ -31,15 +32,25 @@ import org.apache.paimon.stats.FieldStatsConverters;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.SnapshotManager;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /** {@link FileStoreScan} for {@link AppendOnlyFileStore}. */
 public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
 
     private final FieldStatsConverters fieldStatsConverters;
 
+    private final boolean fileIndexReadEnabled;
+
     private Predicate filter;
 
+    // just cache.
+    private final Map<Long, Predicate> dataFilterMapping = new HashMap<>();
+
     public AppendOnlyFileStoreScan(
             RowType partitionType,
             ScanBucketFilter bucketFilter,
@@ -51,7 +62,8 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
             int numOfBuckets,
             boolean checkNumOfBuckets,
             Integer scanManifestParallelism,
-            String branchName) {
+            String branchName,
+            boolean fileIndexReadEnabled) {
         super(
                 partitionType,
                 bucketFilter,
@@ -66,6 +78,7 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
                 branchName);
         this.fieldStatsConverters =
                 new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), 
schema.id());
+        this.fileIndexReadEnabled = fileIndexReadEnabled;
     }
 
     public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
@@ -84,11 +97,13 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
         FieldStatsArraySerializer serializer =
                 fieldStatsConverters.getOrCreate(entry.file().schemaId());
         BinaryTableStats stats = entry.file().valueStats();
+
         return filter.test(
-                entry.file().rowCount(),
-                serializer.evolution(stats.minValues()),
-                serializer.evolution(stats.maxValues()),
-                serializer.evolution(stats.nullCounts(), 
entry.file().rowCount()));
+                        entry.file().rowCount(),
+                        serializer.evolution(stats.minValues()),
+                        serializer.evolution(stats.maxValues()),
+                        serializer.evolution(stats.nullCounts(), 
entry.file().rowCount()))
+                && (!fileIndexReadEnabled || 
testFileIndex(entry.file().embeddedIndex(), entry));
     }
 
     @Override
@@ -96,4 +111,24 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
         // We don't need to filter per-bucket entries here
         return entries;
     }
+
+    private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, 
ManifestEntry entry) {
+        if (embeddedIndexBytes == null) {
+            return true;
+        }
+
+        RowType dataRowType = 
scanTableSchema(entry.file().schemaId()).logicalRowType();
+
+        Predicate dataPredicate =
+                dataFilterMapping.computeIfAbsent(
+                        entry.file().schemaId(),
+                        id -> 
fieldStatsConverters.convertFilter(entry.file().schemaId(), filter));
+
+        try (FileIndexPredicate predicate =
+                new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
+            return predicate.testPredicate(dataPredicate);
+        } catch (IOException e) {
+            throw new RuntimeException("Exception happens while checking 
predicate.", e);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index b3361b0df..fc3f2a3d6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -27,6 +27,7 @@ import org.apache.paimon.compact.NoopCompactManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
@@ -74,6 +75,7 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
     private final boolean spillable;
     private final MemorySize maxDiskSize;
     private final FieldStatsCollector.Factory[] statsCollectors;
+    private final FileIndexOptions fileIndexOptions;
 
     private boolean forceBufferSpill = false;
     private boolean skipCompaction;
@@ -109,6 +111,7 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
         this.maxDiskSize = options.writeBufferSpillDiskSize();
         this.statsCollectors =
                 StatsCollectorFactories.createStatsFactories(options, 
rowType.getFieldNames());
+        this.fileIndexOptions = options.indexColumnsOptions();
     }
 
     @Override
@@ -155,7 +158,8 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
                 fileCompression,
                 spillCompression,
                 statsCollectors,
-                maxDiskSize);
+                maxDiskSize,
+                fileIndexOptions);
     }
 
     public AppendOnlyCompactManager.CompactRewriter compactRewriter(
@@ -174,7 +178,8 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
                             pathFactory.createDataFilePathFactory(partition, 
bucket),
                             new 
LongCounter(toCompact.get(0).minSequenceNumber()),
                             fileCompression,
-                            statsCollectors);
+                            statsCollectors,
+                            fileIndexOptions);
             try {
                 rewriter.write(bucketReader(partition, 
bucket).read(toCompact));
             } finally {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index c801dcaa8..cf3b76e67 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.casting.CastFieldGetter;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
@@ -25,9 +26,11 @@ import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.format.FormatKey;
 import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.FileIndexRecordReader;
 import org.apache.paimon.io.FileRecordReader;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.partition.PartitionUtils;
@@ -68,7 +71,8 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
     private final TableSchema schema;
     private final FileFormatDiscover formatDiscover;
     private final FileStorePathFactory pathFactory;
-    private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
+    private final Map<FormatKey, RawFileBulkFormatMapping> bulkFormatMappings;
+    private final boolean fileIndexReadEnabled;
 
     private int[][] projection;
 
@@ -80,13 +84,15 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
             TableSchema schema,
             RowType rowType,
             FileFormatDiscover formatDiscover,
-            FileStorePathFactory pathFactory) {
+            FileStorePathFactory pathFactory,
+            boolean fileIndexReadEnabled) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
         this.schema = schema;
         this.formatDiscover = formatDiscover;
         this.pathFactory = pathFactory;
         this.bulkFormatMappings = new HashMap<>();
+        this.fileIndexReadEnabled = fileIndexReadEnabled;
 
         this.projection = Projection.range(0, 
rowType.getFieldCount()).toNestedIndexes();
     }
@@ -121,7 +127,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
 
         for (DataFileMeta file : split.dataFiles()) {
             String formatIdentifier = 
DataFilePathFactory.formatIdentifier(file.fileName());
-            BulkFormatMapping bulkFormatMapping =
+            RawFileBulkFormatMapping bulkFormatMapping =
                     bulkFormatMappings.computeIfAbsent(
                             new FormatKey(file.schemaId(), formatIdentifier),
                             this::createBulkFormatMapping);
@@ -140,7 +146,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
         return ConcatRecordReader.create(suppliers);
     }
 
-    private BulkFormatMapping createBulkFormatMapping(FormatKey key) {
+    private RawFileBulkFormatMapping createBulkFormatMapping(FormatKey key) {
         TableSchema tableSchema = schema;
         TableSchema dataSchema =
                 key.schemaId == schema.id() ? schema : 
schemaManager.schema(key.schemaId);
@@ -180,37 +186,81 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
         RowType projectedRowType =
                 
Projection.of(dataProjection).project(dataSchema.logicalRowType());
 
-        return new BulkFormatMapping(
+        return new RawFileBulkFormatMapping(
                 indexCastMapping.getIndexMapping(),
                 indexCastMapping.getCastMapping(),
                 partitionPair,
                 formatDiscover
                         .discover(key.format)
-                        .createReaderFactory(projectedRowType, dataFilters));
+                        .createReaderFactory(projectedRowType, dataFilters),
+                dataSchema,
+                dataFilters);
     }
 
     private RecordReader<InternalRow> createFileReader(
             BinaryRow partition,
             DataFileMeta file,
             DataFilePathFactory dataFilePathFactory,
-            BulkFormatMapping bulkFormatMapping,
+            RawFileBulkFormatMapping bulkFormatMapping,
             DeletionVector.Factory dvFactory)
             throws IOException {
-        FileRecordReader fileRecordReader =
-                new FileRecordReader(
-                        bulkFormatMapping.getReaderFactory(),
-                        new FormatReaderContext(
-                                fileIO,
-                                dataFilePathFactory.toPath(file.fileName()),
-                                file.fileSize()),
-                        bulkFormatMapping.getIndexMapping(),
-                        bulkFormatMapping.getCastMapping(),
-                        
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
-
-        Optional<DeletionVector> deletionVector = 
dvFactory.create(file.fileName());
-        if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
-            return new ApplyDeletionVectorReader<>(fileRecordReader, 
deletionVector.get());
+        ConcatRecordReader.ReaderSupplier<InternalRow> supplier =
+                () -> {
+                    FileRecordReader fileRecordReader =
+                            new FileRecordReader(
+                                    bulkFormatMapping.getReaderFactory(),
+                                    new FormatReaderContext(
+                                            fileIO,
+                                            
dataFilePathFactory.toPath(file.fileName()),
+                                            file.fileSize()),
+                                    bulkFormatMapping.getIndexMapping(),
+                                    bulkFormatMapping.getCastMapping(),
+                                    PartitionUtils.create(
+                                            
bulkFormatMapping.getPartitionPair(), partition));
+
+                    Optional<DeletionVector> deletionVector = 
dvFactory.create(file.fileName());
+                    if (deletionVector.isPresent() && 
!deletionVector.get().isEmpty()) {
+                        return new ApplyDeletionVectorReader<>(
+                                fileRecordReader, deletionVector.get());
+                    }
+                    return fileRecordReader;
+                };
+
+        return fileIndexReadEnabled
+                ? new FileIndexRecordReader(
+                        fileIO,
+                        bulkFormatMapping.getDataSchema(),
+                        bulkFormatMapping.getDataFilters(),
+                        dataFilePathFactory,
+                        file,
+                        supplier)
+                : supplier.get();
+    }
+
+    /** Bulk format mapping with data schema and data filters. */
+    private static class RawFileBulkFormatMapping extends BulkFormatMapping {
+
+        private final TableSchema dataSchema;
+        private final List<Predicate> dataFilters;
+
+        public RawFileBulkFormatMapping(
+                int[] indexMapping,
+                CastFieldGetter[] castMapping,
+                Pair<int[], RowType> partitionPair,
+                FormatReaderFactory bulkFormat,
+                TableSchema dataSchema,
+                List<Predicate> dataFilters) {
+            super(indexMapping, castMapping, partitionPair, bulkFormat);
+            this.dataSchema = dataSchema;
+            this.dataFilters = dataFilters;
+        }
+
+        public TableSchema getDataSchema() {
+            return dataSchema;
+        }
+
+        public List<Predicate> getDataFilters() {
+            return dataFilters;
         }
-        return fileRecordReader;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java 
b/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java
index 6733e1fea..d3eb0e416 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java
@@ -18,13 +18,17 @@
 
 package org.apache.paimon.stats;
 
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.IndexCastMapping;
+import org.apache.paimon.schema.SchemaEvolutionUtil;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
@@ -71,6 +75,17 @@ public class FieldStatsConverters {
                 });
     }
 
+    public Predicate convertFilter(long dataSchemaId, Predicate filter) {
+        return tableSchemaId == dataSchemaId
+                ? filter
+                : Objects.requireNonNull(
+                                SchemaEvolutionUtil.createDataFilters(
+                                        schemaFields.apply(tableSchemaId),
+                                        schemaFields.apply(dataSchemaId),
+                                        Collections.singletonList(filter)))
+                        .get(0);
+    }
+
     public List<DataField> tableDataFields() {
         return tableDataFields;
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
index e6209e223..21b8825f7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
@@ -186,6 +186,7 @@ public class AppendOnlyTableCompactionCoordinatorTest {
                 0,
                 0,
                 0,
-                0L);
+                0L,
+                null);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 565be9f90..9211cfe60 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -27,6 +27,7 @@ import org.apache.paimon.disk.ChannelWithMeta;
 import org.apache.paimon.disk.ExternalBuffer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.disk.RowBuffer;
+import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FieldStats;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.Path;
@@ -604,7 +605,8 @@ public class AppendOnlyWriterTest {
                         CoreOptions.SPILL_COMPRESSION.defaultValue(),
                         StatsCollectorFactories.createStatsFactories(
                                 options, 
AppendOnlyWriterTest.SCHEMA.getFieldNames()),
-                        MemorySize.MAX_VALUE);
+                        MemorySize.MAX_VALUE,
+                        new FileIndexOptions());
         writer.setMemoryPool(
                 new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
         return Pair.of(writer, compactManager.allFiles());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index a96ee1429..3eff9b7cd 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -138,7 +138,8 @@ public class IndexBootstrapTest extends TableTestBase {
                         Instant.ofEpochMilli(timeMillis)
                                 .atZone(ZoneId.systemDefault())
                                 .toLocalDateTime()),
-                0L);
+                0L,
+                null);
     }
 
     private Pair<InternalRow, Integer> row(int pt, int col, int pk, int 
bucket) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index ffa290e0a..1a3503f7a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.append.AppendOnlyWriter;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
@@ -89,7 +90,8 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         CoreOptions.SPILL_COMPRESSION.defaultValue(),
                         StatsCollectorFactories.createStatsFactories(
                                 options, SCHEMA.getFieldNames()),
-                        MemorySize.MAX_VALUE);
+                        MemorySize.MAX_VALUE,
+                        new FileIndexOptions());
         appendOnlyWriter.setMemoryPool(
                 new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
         appendOnlyWriter.write(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
index aed01ca21..3b4921c88 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
@@ -161,7 +161,8 @@ public class DataFileTestDataGenerator {
                         maxSequenceNumber,
                         0,
                         level,
-                        kvs.stream().filter(kv -> 
kv.valueKind().isRetract()).count()),
+                        kvs.stream().filter(kv -> 
kv.valueKind().isRetract()).count(),
+                        null),
                 kvs);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
index b902ae967..d5f6ceda3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
@@ -51,7 +51,8 @@ public class DataFileTestUtils {
                 DataFileMeta.DUMMY_LEVEL,
                 Collections.emptyList(),
                 Timestamp.fromEpochMillis(100),
-                maxSeq - minSeq + 1);
+                maxSeq - minSeq + 1,
+                null);
     }
 
     public static DataFileMeta newFile() {
@@ -67,7 +68,8 @@ public class DataFileTestUtils {
                 0,
                 0,
                 0,
-                0L);
+                0L,
+                null);
     }
 
     public static DataFileMeta newFile(
@@ -89,7 +91,8 @@ public class DataFileTestUtils {
                 maxSequence,
                 0,
                 level,
-                deleteRowCount);
+                deleteRowCount,
+                null);
     }
 
     public static BinaryRow row(int i) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index d91b8a9a0..1801c2dd0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.io;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
@@ -87,7 +88,8 @@ public class RollingFileWriterTest {
                                         
CoreOptions.FILE_COMPRESSION.defaultValue(),
                                         
StatsCollectorFactories.createStatsFactories(
                                                 new CoreOptions(new 
HashMap<>()),
-                                                SCHEMA.getFieldNames())),
+                                                SCHEMA.getFieldNames()),
+                                        new FileIndexOptions()),
                         TARGET_FILE_SIZE);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index ad5fe2f80..473b333a6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -115,6 +115,7 @@ public class ManifestCommittableSerializerTest {
                 1,
                 0,
                 level,
-                0L);
+                0L,
+                null);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index e066eeaf9..9120e7b6c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -82,7 +82,8 @@ public abstract class ManifestFileMetaTestBase {
                         0, // not used
                         Collections.emptyList(),
                         Timestamp.fromEpochMillis(200000),
-                        0L // not used
+                        0L, // not used
+                        null // not used
                         ));
     }
 
@@ -243,6 +244,7 @@ public abstract class ManifestFileMetaTestBase {
                         0, // not used
                         0, // not used
                         0, // not used
-                        0L));
+                        0L,
+                        null));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
index c424b6094..4763585b0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
@@ -69,6 +69,18 @@ public class LevelsTest {
 
     public static DataFileMeta newFile(int level) {
         return new DataFileMeta(
-                UUID.randomUUID().toString(), 0, 1, row(0), row(0), null, 
null, 0, 1, 0, level, 0L);
+                UUID.randomUUID().toString(),
+                0,
+                1,
+                row(0),
+                row(0),
+                null,
+                null,
+                0,
+                1,
+                0,
+                level,
+                0L,
+                null);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
index c4edd76e2..89007a33a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
@@ -181,7 +181,8 @@ public class IntervalPartitionTest {
                 0,
                 Collections.emptyList(),
                 Timestamp.fromEpochMillis(100000),
-                0L);
+                0L,
+                null);
     }
 
     private List<Map<SortedRun, Integer>> toMultiset(List<List<SortedRun>> 
sections) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index 313f9799a..0d891f2c7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -357,6 +357,6 @@ public class UniversalCompactionTest {
     }
 
     static DataFileMeta file(long size) {
-        return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0, 
0, 0L);
+        return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0, 
0, 0L, null);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index c8c7be7e1..de95819ad 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -200,7 +200,8 @@ public class ExpireSnapshotsTest {
                         0,
                         extraFiles,
                         Timestamp.now(),
-                        0L);
+                        0L,
+                        null);
         ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, 
dataFile);
         ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 
0, 1, dataFile);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 3846ab234..6893649d5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -20,14 +20,19 @@ package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndex;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
@@ -41,6 +46,9 @@ import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.api.Test;
 
@@ -348,6 +356,129 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         commit.close();
     }
 
+    @Test
+    public void testBloomFilterInMemory() throws Exception {
+        RowType rowType =
+                RowType.builder()
+                        .field("id", DataTypes.INT())
+                        .field("index_column", DataTypes.STRING())
+                        .field("index_column2", DataTypes.INT())
+                        .field("index_column3", DataTypes.BIGINT())
+                        .build();
+        // in unaware-bucket mode, we split files into splits all the time
+        FileStoreTable table =
+                createUnawareBucketFileStoreTable(
+                        rowType,
+                        options -> {
+                            options.set(
+                                    CoreOptions.FILE_INDEX
+                                            + "."
+                                            + BloomFilterFileIndex.BLOOM_FILTER
+                                            + "."
+                                            + CoreOptions.COLUMNS,
+                                    "index_column, index_column2, 
index_column3");
+                            options.set(
+                                    CoreOptions.FILE_INDEX
+                                            + "."
+                                            + BloomFilterFileIndex.BLOOM_FILTER
+                                            + ".index_column.items",
+                                    "150");
+                            options.set(
+                                    CoreOptions.FILE_INDEX
+                                            + "."
+                                            + BloomFilterFileIndex.BLOOM_FILTER
+                                            + ".index_column2.items",
+                                    "150");
+                            options.set(
+                                    CoreOptions.FILE_INDEX
+                                            + "."
+                                            + BloomFilterFileIndex.BLOOM_FILTER
+                                            + ".index_column3.items",
+                                    "150");
+                            options.set(
+                                    
CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "500 B");
+                        });
+
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+        List<CommitMessage> result = new ArrayList<>();
+        write.write(GenericRow.of(1, BinaryString.fromString("a"), 2, 3L));
+        write.write(GenericRow.of(1, BinaryString.fromString("c"), 2, 3L));
+        result.addAll(write.prepareCommit(true, 0));
+        write.write(GenericRow.of(1, BinaryString.fromString("b"), 2, 3L));
+        result.addAll(write.prepareCommit(true, 0));
+        commit.commit(0, result);
+        result.clear();
+
+        TableScan.Plan plan =
+                table.newScan()
+                        .withFilter(
+                                new PredicateBuilder(rowType)
+                                        .equal(1, 
BinaryString.fromString("b")))
+                        .plan();
+        List<DataFileMeta> metas =
+                plan.splits().stream()
+                        .flatMap(split -> ((DataSplit) 
split).dataFiles().stream())
+                        .collect(Collectors.toList());
+        assertThat(metas.size()).isEqualTo(1);
+    }
+
+    @Test
+    public void testBloomFilterInDisk() throws Exception {
+        RowType rowType =
+                RowType.builder()
+                        .field("id", DataTypes.INT())
+                        .field("index_column", DataTypes.STRING())
+                        .field("index_column2", DataTypes.INT())
+                        .field("index_column3", DataTypes.BIGINT())
+                        .build();
+        // in unaware-bucket mode, we split files into splits all the time
+        FileStoreTable table =
+                createUnawareBucketFileStoreTable(
+                        rowType,
+                        options -> {
+                            options.set(
+                                    CoreOptions.FILE_INDEX
+                                            + "."
+                                            + BloomFilterFileIndex.BLOOM_FILTER
+                                            + "."
+                                            + CoreOptions.COLUMNS,
+                                    "index_column, index_column2, 
index_column3");
+                            
options.set(CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "50 B");
+                        });
+
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+        List<CommitMessage> result = new ArrayList<>();
+        write.write(GenericRow.of(1, BinaryString.fromString("a"), 2, 3L));
+        write.write(GenericRow.of(1, BinaryString.fromString("c"), 2, 3L));
+        result.addAll(write.prepareCommit(true, 0));
+        write.write(GenericRow.of(1, BinaryString.fromString("b"), 2, 3L));
+        result.addAll(write.prepareCommit(true, 0));
+        commit.commit(0, result);
+        result.clear();
+
+        TableScan.Plan plan =
+                table.newScan()
+                        .withFilter(
+                                new PredicateBuilder(rowType)
+                                        .equal(1, 
BinaryString.fromString("b")))
+                        .plan();
+        List<DataFileMeta> metas =
+                plan.splits().stream()
+                        .flatMap(split -> ((DataSplit) 
split).dataFiles().stream())
+                        .collect(Collectors.toList());
+        assertThat(metas.size()).isEqualTo(2);
+
+        RecordReader<InternalRow> reader =
+                table.newRead()
+                        .withFilter(
+                                new PredicateBuilder(rowType)
+                                        .equal(1, 
BinaryString.fromString("b")))
+                        .createReader(plan.splits());
+        reader.forEachRemaining(row -> 
assertThat(row.getString(1).toString()).isEqualTo("b"));
+    }
+
     @Test
     public void testStreamingProjection() throws Exception {
         writeData();
@@ -596,4 +727,22 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 ""));
         return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, tableSchema);
     }
+
+    protected FileStoreTable createUnawareBucketFileStoreTable(
+            RowType rowType, Consumer<Options> configure) throws Exception {
+        Options conf = new Options();
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        conf.set(CoreOptions.BUCKET, -1);
+        configure.accept(conf);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), tablePath),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                conf.toMap(),
+                                ""));
+        return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, tableSchema);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
index a10413005..8223afa5f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
@@ -55,7 +55,8 @@ public class SplitGeneratorTest {
                 maxSequence,
                 0,
                 0,
-                0L);
+                0L,
+                null);
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index d428bd29a..c096371d3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -307,12 +307,22 @@ public class UnawareBucketAppendOnlyTableITCase extends 
CatalogITCaseBase {
         assertThat(sql("SELECT * FROM append_table LIMIT 1")).hasSize(1);
     }
 
+    @Test
+    public void testFileIndex() {
+        batchSql(
+                "INSERT INTO index_table VALUES (1, 'a', 'AAA'), (1, 'a', 
'AAA'), (2, 'c', 'BBB'), (3, 'c', 'BBB')");
+
+        assertThat(batchSql("SELECT * FROM index_table WHERE indexc = 'c'"))
+                .containsExactlyInAnyOrder(Row.of(2, "c", "BBB"), Row.of(3, 
"c", "BBB"));
+    }
+
     @Override
     protected List<String> ddl() {
         return Arrays.asList(
                 "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) 
WITH ('bucket' = '-1')",
                 "CREATE TABLE IF NOT EXISTS part_table (id INT, data STRING, 
dt STRING) PARTITIONED BY (dt) WITH ('bucket' = '-1')",
-                "CREATE TABLE IF NOT EXISTS complex_table (id INT, data 
MAP<INT, INT>) WITH ('bucket' = '-1')");
+                "CREATE TABLE IF NOT EXISTS complex_table (id INT, data 
MAP<INT, INT>) WITH ('bucket' = '-1')",
+                "CREATE TABLE IF NOT EXISTS index_table (id INT, indexc 
STRING, data STRING) WITH ('bucket' = '-1', 
'file-index.bloom-filter.columns'='indexc', 
'file-index.bloom-filter.indexc.items' = '500')");
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
index c607e7a8f..42c125fbb 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
@@ -75,6 +75,7 @@ public class CompactionTaskSimpleSerializerTest {
                 1,
                 0,
                 0,
-                0L);
+                0L,
+                null);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 4f53932a2..ef364a1ed 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -112,7 +112,8 @@ public class FileStoreSourceSplitGeneratorTest {
                             0, // not used
                             0, // not used
                             0, // not used
-                            0L // not used
+                            0L, // not used
+                            null // not used
                             ));
         }
         return DataSplit.builder()
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
index cbe6cb86d..8d7d4c04e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
@@ -85,7 +85,8 @@ public class FileStoreSourceSplitSerializerTest {
                 1,
                 0,
                 level,
-                0L);
+                0L,
+                null);
     }
 
     public static FileStoreSourceSplit newSourceSplit(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 7fd14b1ad..edd6688da 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -137,7 +137,8 @@ public class TestChangelogDataReadWrite {
                         schema,
                         VALUE_TYPE,
                         FileFormatDiscover.of(options),
-                        pathFactory);
+                        pathFactory,
+                        options.fileIndexReadEnabled());
         return new KeyValueTableRead(() -> read, () -> rawFileRead, null);
     }
 


Reply via email to