This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7524767e6 [flink] Introduce file index rewriter (#3391)
7524767e6 is described below

commit 7524767e6ff9af15f34b4b6990d726c3e9d9485d
Author: YeJunHao <[email protected]>
AuthorDate: Thu Jun 20 15:45:23 2024 +0800

    [flink] Introduce file index rewriter (#3391)
---
 docs/content/concepts/specification.md             |  34 +-
 docs/content/flink/procedures.md                   |  17 +
 .../apache/paimon/fileindex/FileIndexFormat.java   |  20 +-
 .../java/org/apache/paimon/io/DataFileMeta.java    |  20 ++
 .../org/apache/paimon/io/DataFilePathFactory.java  |  19 ++
 .../java/org/apache/paimon/io/FileIndexWriter.java |  54 ++-
 .../paimon/manifest/ManifestEntrySerializer.java   |  18 +
 .../paimon/flink/procedure/FileIndexProcedure.java | 146 +++++++++
 .../paimon/flink/sink/CommittableTypeInfo.java     |  15 +-
 .../paimon/flink/sink/CompactionTaskTypeInfo.java  |  16 +-
 .../apache/paimon/flink/sink/FileIndexSink.java    | 362 +++++++++++++++++++++
 .../flink/sink/MultiTableCommittableTypeInfo.java  |  16 +-
 ...CopyVersionedSerializerTypeSerializerProxy.java |  43 +++
 .../paimon/flink/source/FileIndexScanSource.java   | 299 +++++++++++++++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../flink/procedure/FileIndexProcedureITCase.java  | 235 +++++++++++++
 16 files changed, 1254 insertions(+), 61 deletions(-)

diff --git a/docs/content/concepts/specification.md 
b/docs/content/concepts/specification.md
index 3116f2eed..8463b51ed 100644
--- a/docs/content/concepts/specification.md
+++ b/docs/content/concepts/specification.md
@@ -247,7 +247,33 @@ Global Index is in the index directory, currently, only 
two places will use glob
 
 ## Data File Index
 
-Define `file-index.bloom-filter.columns`, Paimon will create its corresponding 
index file for each file. If the index
-file is too small, it will be stored directly in the manifest, or in the 
directory of the data file. Each data file
-corresponds to an index file, which has a separate file definition and can 
contain different types of indexes with
-multiple columns.
+### Concept
+
+Data file index is an external index file corresponding to a certain data 
file. If the index file is too small, it will
+be stored directly in the manifest, otherwise in the directory of the data 
file. Each data file corresponds to an index file, 
+which has a separate file definition and can contain different types of 
indexes with multiple columns.
+
+### Usage
+
+Different file index may be efficient in different scenario. For example bloom 
filter may speed up query in point lookup
+scenario. Using a bitmap may consume more space but can result in greater 
accuracy. Though we only realize bloom filter 
+currently, but other types of index will be supported in the future. 
+
+Currently, file index is only supported in append-only table.
+
+`Bloom Filter`:
+* `file-index.bloom-filter.columns`: specify the columns that need bloom 
filter index.
+* `file-index.bloom-filter.<column_name>.fpp` to config false positive 
probability.
+* `file-index.bloom-filter.<column_name>.items` to config the expected 
distinct items in one data file.
+
+
+More filter types will be supported...
+
+### Procedure
+
+If you want to add file index to existing table, without any rewrite, you can 
use `file_index_rewrite` procedure. Before
+we use the procedure, you should config appropriate configurations in target 
table. You can use ALTER clause to config 
+`file-index.<filter-type>.columns` to the table. 
+
+How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" 
>}}) 
+
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index cfa905628..c855ee1bf 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -273,6 +273,23 @@ All available procedures are listed below.
             <li>tableName: the target table identifier.</li>
       </td>
       <td>CALL sys.repair('test_db.T')</td>
+   </tr>
+    <tr>
+      <td>file_index_rewrite</td>
+      <td>
+         CALL sys.file_index_rewrite(&ltidentifier&gt [, 
&ltpartitions&gt])<br/><br/>
+      </td>
+      <td>
+         Rewrite the file index for the table. Argument:
+            <li>identifier: &ltdatabaseName&gt.&lttableName&gt.</li>
+            <li>partitions : partition filter.</li>
+      </td>
+      <td>
+         -- rewrite the file index for the whole table<br/>
+         CALL sys.file_index_rewrite('test_db.T')<br/><br/>
+         -- repair all tables in a specific partition<br/>
+         CALL sys.file_index_rewrite('test_db.T', 'pt=a')<br/><br/>
+     </td>
    </tr>
    </tbody>
 </table>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
index 4541cc46d..07ee5a18a 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
@@ -38,6 +38,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -137,13 +138,13 @@ public final class FileIndexFormat {
         public void writeColumnIndexes(Map<String, Map<String, byte[]>> 
indexes)
                 throws IOException {
 
-            Map<String, Map<String, Pair<Integer, Integer>>> bodyInfo = new 
HashMap<>();
+            Map<String, Map<String, Pair<Integer, Integer>>> bodyInfo = new 
LinkedHashMap<>();
 
             // construct body
             ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
             for (Map.Entry<String, Map<String, byte[]>> columnMap : 
indexes.entrySet()) {
                 Map<String, Pair<Integer, Integer>> innerMap =
-                        bodyInfo.computeIfAbsent(columnMap.getKey(), k -> new 
HashMap<>());
+                        bodyInfo.computeIfAbsent(columnMap.getKey(), k -> new 
LinkedHashMap<>());
                 Map<String, byte[]> bytesMap = columnMap.getValue();
                 for (Map.Entry<String, byte[]> entry : bytesMap.entrySet()) {
                     int startPosition = baos.size();
@@ -335,6 +336,21 @@ public final class FileIndexFormat {
             return b;
         }
 
+        public Map<String, Map<String, byte[]>> readAll() {
+            Map<String, Map<String, byte[]>> result = new HashMap<>();
+            for (Map.Entry<String, Map<String, Pair<Integer, Integer>>> 
entryOuter :
+                    header.entrySet()) {
+                for (Map.Entry<String, Pair<Integer, Integer>> entryInner :
+                        entryOuter.getValue().entrySet()) {
+                    result.computeIfAbsent(entryOuter.getKey(), key -> new 
HashMap<>())
+                            .put(
+                                    entryInner.getKey(),
+                                    
getBytesWithStartAndLength(entryInner.getValue()));
+                }
+            }
+            return result;
+        }
+
         @VisibleForTesting
         // only for test yet
         Optional<byte[]> getBytesWithNameAndType(String columnName, String 
indexType) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 834710725..7b2785261 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -376,6 +376,26 @@ public class DataFileMeta {
                 fileSource);
     }
 
+    public DataFileMeta copy(byte[] newEmbeddedIndex) {
+        return new DataFileMeta(
+                fileName,
+                fileSize,
+                rowCount,
+                minKey,
+                maxKey,
+                keyStats,
+                valueStats,
+                minSequenceNumber,
+                maxSequenceNumber,
+                schemaId,
+                level,
+                extraFiles,
+                creationTime,
+                deleteRowCount,
+                newEmbeddedIndex,
+                fileSource);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (o == this) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index ef83e8598..f7da2760e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -76,6 +76,25 @@ public class DataFilePathFactory {
         return new Path(filePath.getParent(), filePath.getName() + 
INDEX_PATH_SUFFIX);
     }
 
+    public static Path fileIndexPathIncrease(Path filePath) {
+        String fileName = filePath.getName();
+        int dot = fileName.lastIndexOf(".");
+        int dash = fileName.lastIndexOf("-");
+
+        if (dash != -1) {
+            try {
+                int num = Integer.parseInt(fileName.substring(dash + 1, dot));
+                return new Path(
+                        filePath.getParent(),
+                        fileName.substring(0, dash + 1) + (num + 1) + 
INDEX_PATH_SUFFIX);
+            } catch (NumberFormatException e) {
+                // ignore
+            }
+        }
+        return new Path(
+                filePath.getParent(), fileName.substring(0, dot) + "-" + 1 + 
INDEX_PATH_SUFFIX);
+    }
+
     public static String formatIdentifier(String fileName) {
         int index = fileName.lastIndexOf('.');
         if (index == -1) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
index 0ec473fed..c21d5418a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
@@ -64,7 +64,11 @@ public final class FileIndexWriter implements Closeable {
     private byte[] embeddedIndexBytes;
 
     public FileIndexWriter(
-            FileIO fileIO, Path path, RowType rowType, FileIndexOptions 
fileIndexOptions) {
+            FileIO fileIO,
+            Path path,
+            RowType rowType,
+            FileIndexOptions fileIndexOptions,
+            @Nullable Map<String, String> evolutionMap) {
         this.fileIO = fileIO;
         this.path = path;
         List<DataField> fields = rowType.getFields();
@@ -78,7 +82,15 @@ public final class FileIndexWriter implements Closeable {
         for (Map.Entry<FileIndexOptions.Column, Map<String, Options>> entry :
                 fileIndexOptions.entrySet()) {
             FileIndexOptions.Column entryColumn = entry.getKey();
-            String columnName = entryColumn.getColumnName();
+            String tempName = entryColumn.getColumnName();
+            if (evolutionMap != null) {
+                tempName = evolutionMap.getOrDefault(tempName, null);
+                if (tempName == null) {
+                    continue;
+                }
+            }
+
+            final String columnName = tempName;
             DataField field = map.get(columnName);
             if (field == null) {
                 throw new IllegalArgumentException(columnName + " does not 
exist in column fields");
@@ -135,16 +147,7 @@ public final class FileIndexWriter implements Closeable {
 
     @Override
     public void close() throws IOException {
-        Map<String, Map<String, byte[]>> indexMaps = new HashMap<>();
-
-        for (IndexMaintainer indexMaintainer : indexMaintainers.values()) {
-            Map<String, byte[]> mapBytes = indexMaintainer.serializedBytes();
-            for (Map.Entry<String, byte[]> entry : mapBytes.entrySet()) {
-                indexMaps
-                        .computeIfAbsent(entry.getKey(), k -> new HashMap<>())
-                        .put(indexMaintainer.getIndexType(), entry.getValue());
-            }
-        }
+        Map<String, Map<String, byte[]>> indexMaps = serializeMaintainers();
 
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         try (FileIndexFormat.Writer writer = 
FileIndexFormat.createWriter(baos)) {
@@ -152,7 +155,7 @@ public final class FileIndexWriter implements Closeable {
         }
 
         if (baos.size() > inManifestThreshold) {
-            try (OutputStream outputStream = fileIO.newOutputStream(path, 
false)) {
+            try (OutputStream outputStream = fileIO.newOutputStream(path, 
true)) {
                 outputStream.write(baos.toByteArray());
             }
             resultFileName = path.getName();
@@ -161,6 +164,19 @@ public final class FileIndexWriter implements Closeable {
         }
     }
 
+    public Map<String, Map<String, byte[]>> serializeMaintainers() {
+        Map<String, Map<String, byte[]>> indexMaps = new HashMap<>();
+        for (IndexMaintainer indexMaintainer : indexMaintainers.values()) {
+            Map<String, byte[]> mapBytes = indexMaintainer.serializedBytes();
+            for (Map.Entry<String, byte[]> entry : mapBytes.entrySet()) {
+                indexMaps
+                        .computeIfAbsent(entry.getKey(), k -> new HashMap<>())
+                        .put(indexMaintainer.getIndexType(), entry.getValue());
+            }
+        }
+        return indexMaps;
+    }
+
     public FileIndexResult result() {
         return FileIndexResult.of(embeddedIndexBytes, resultFileName);
     }
@@ -168,9 +184,19 @@ public final class FileIndexWriter implements Closeable {
     @Nullable
     public static FileIndexWriter create(
             FileIO fileIO, Path path, RowType rowType, FileIndexOptions 
fileIndexOptions) {
+        return create(fileIO, path, rowType, fileIndexOptions, null);
+    }
+
+    @Nullable
+    public static FileIndexWriter create(
+            FileIO fileIO,
+            Path path,
+            RowType rowType,
+            FileIndexOptions fileIndexOptions,
+            @Nullable Map<String, String> evolutionMap) {
         return fileIndexOptions.isEmpty()
                 ? null
-                : new FileIndexWriter(fileIO, path, rowType, fileIndexOptions);
+                : new FileIndexWriter(fileIO, path, rowType, fileIndexOptions, 
evolutionMap);
     }
 
     /** File index result. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
index 83c2be59a..6d3e1a961 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
@@ -23,8 +23,13 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
 import org.apache.paimon.utils.VersionedObjectSerializer;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.function.Function;
 
 import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
@@ -77,6 +82,19 @@ public class ManifestEntrySerializer extends 
VersionedObjectSerializer<ManifestE
                 dataFileMetaSerializer.fromRow(row.getRow(4, 
dataFileMetaSerializer.numFields())));
     }
 
+    public byte[] serializeToBytes(ManifestEntry manifestEntry) throws 
IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+        serialize(manifestEntry, view);
+        return out.toByteArray();
+    }
+
+    public ManifestEntry deserializeFromBytes(byte[] bytes) throws IOException 
{
+        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+        DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+        return deserialize(view);
+    }
+
     public static Function<InternalRow, BinaryRow> partitionGetter() {
         return row -> deserializeBinaryRow(row.getBinary(2));
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FileIndexProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FileIndexProcedure.java
new file mode 100644
index 000000000..21582ae28
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FileIndexProcedure.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.sink.FileIndexSink;
+import 
org.apache.paimon.flink.sink.NoneCopyVersionedSerializerTypeSerializerProxy;
+import org.apache.paimon.flink.source.FileIndexScanSource;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestEntrySerializer;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.ParameterUtils.getPartitions;
+
+/** Migrate procedure to migrate hive table to paimon table. */
+public class FileIndexProcedure extends ProcedureBase {
+
+    @Override
+    public String identifier() {
+        return "file_index_rewrite";
+    }
+
+    public String[] call(ProcedureContext procedureContext, String 
sourceTablePath)
+            throws Exception {
+        return call(procedureContext, sourceTablePath, "");
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext, String sourceTablePath, String 
partitions)
+            throws Exception {
+
+        StreamExecutionEnvironment env = 
procedureContext.getExecutionEnvironment();
+        Table table = catalog.getTable(Identifier.fromString(sourceTablePath));
+
+        List<Map<String, String>> partitionList =
+                StringUtils.isBlank(partitions) ? null : 
getPartitions(partitions.split(";"));
+
+        Predicate partitionPredicate;
+        if (partitionList != null) {
+            // This predicate is based on the row type of the original table, 
not bucket table.
+            // Because TableScan in BucketsTable is the same with 
FileStoreTable,
+            // and partition filter is done by scan.
+            partitionPredicate =
+                    PredicateBuilder.or(
+                            partitionList.stream()
+                                    .map(
+                                            p ->
+                                                    PredicateBuilder.partition(
+                                                            p,
+                                                            table.rowType(),
+                                                            
CoreOptions.PARTITION_DEFAULT_NAME
+                                                                    
.defaultValue()))
+                                    .toArray(Predicate[]::new));
+        } else {
+            partitionPredicate = null;
+        }
+
+        TopoBuilder.build(env, (FileStoreTable) table, partitionPredicate);
+
+        return execute(env, "Add file index for table: " + sourceTablePath);
+    }
+
+    private static class TopoBuilder {
+
+        public static void build(
+                StreamExecutionEnvironment env,
+                FileStoreTable table,
+                Predicate partitionPredicate) {
+            DataStreamSource<ManifestEntry> source =
+                    env.fromSource(
+                            new FileIndexScanSource(table, partitionPredicate),
+                            WatermarkStrategy.noWatermarks(),
+                            "index source",
+                            new TypeInfo());
+            new FileIndexSink(table).sinkFrom(source);
+        }
+    }
+
+    private static class TypeInfo extends GenericTypeInfo<ManifestEntry> {
+
+        public TypeInfo() {
+            super(ManifestEntry.class);
+        }
+
+        @Override
+        public TypeSerializer<ManifestEntry> createSerializer(SerializerConfig 
config) {
+            return new NoneCopyVersionedSerializerTypeSerializerProxy<>(
+                    () ->
+                            new SimpleVersionedSerializer<ManifestEntry>() {
+                                private final ManifestEntrySerializer 
manifestEntrySerializer =
+                                        new ManifestEntrySerializer();
+
+                                @Override
+                                public int getVersion() {
+                                    return 0;
+                                }
+
+                                @Override
+                                public byte[] serialize(ManifestEntry 
manifestEntry)
+                                        throws IOException {
+                                    return 
manifestEntrySerializer.serializeToBytes(manifestEntry);
+                                }
+
+                                @Override
+                                public ManifestEntry deserialize(int i, byte[] 
bytes)
+                                        throws IOException {
+                                    return 
manifestEntrySerializer.deserializeFromBytes(bytes);
+                                }
+                            });
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
index a29956bf7..dcb87238b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
@@ -23,7 +23,6 @@ import org.apache.paimon.table.sink.CommitMessageSerializer;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
 
 /** Type information of {@link Committable}. */
 public class CommittableTypeInfo extends TypeInformation<Committable> {
@@ -61,18 +60,8 @@ public class CommittableTypeInfo extends 
TypeInformation<Committable> {
     @Override
     public TypeSerializer<Committable> createSerializer(ExecutionConfig 
config) {
         // no copy, so that data from writer is directly going into committer 
while chaining
-        return new SimpleVersionedSerializerTypeSerializerProxy<Committable>(
-                () -> new CommittableSerializer(new 
CommitMessageSerializer())) {
-            @Override
-            public Committable copy(Committable from) {
-                return from;
-            }
-
-            @Override
-            public Committable copy(Committable from, Committable reuse) {
-                return from;
-            }
-        };
+        return new NoneCopyVersionedSerializerTypeSerializerProxy<Committable>(
+                () -> new CommittableSerializer(new 
CommitMessageSerializer())) {};
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
index c33ab95e1..1c3d1e8dd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
@@ -24,7 +24,6 @@ import org.apache.paimon.table.sink.CompactionTaskSerializer;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
 
 /** Type information of {@link AppendOnlyCompactionTask}. */
 public class CompactionTaskTypeInfo extends 
TypeInformation<AppendOnlyCompactionTask> {
@@ -62,19 +61,8 @@ public class CompactionTaskTypeInfo extends 
TypeInformation<AppendOnlyCompaction
     @Override
     public TypeSerializer<AppendOnlyCompactionTask> 
createSerializer(ExecutionConfig config) {
         // we don't need copy for task
-        return new 
SimpleVersionedSerializerTypeSerializerProxy<AppendOnlyCompactionTask>(
-                () -> new CompactionTaskSimpleSerializer(new 
CompactionTaskSerializer())) {
-            @Override
-            public AppendOnlyCompactionTask copy(AppendOnlyCompactionTask 
from) {
-                return from;
-            }
-
-            @Override
-            public AppendOnlyCompactionTask copy(
-                    AppendOnlyCompactionTask from, AppendOnlyCompactionTask 
reuse) {
-                return from;
-            }
-        };
+        return new 
NoneCopyVersionedSerializerTypeSerializerProxy<AppendOnlyCompactionTask>(
+                () -> new CompactionTaskSimpleSerializer(new 
CompactionTaskSerializer())) {};
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileIndexSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileIndexSink.java
new file mode 100644
index 000000000..a69f3f23c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileIndexSink.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexCommon;
+import org.apache.paimon.fileindex.FileIndexFormat;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.flink.procedure.FileIndexProcedure;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.FileIndexWriter;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.io.DataFilePathFactory.fileIndexPathIncrease;
+import static org.apache.paimon.io.DataFilePathFactory.toFileIndexPath;
+
+/** File index sink for {@link FileIndexProcedure}. */
+public class FileIndexSink extends FlinkWriteSink<ManifestEntry> {
+
+    public FileIndexSink(FileStoreTable table) {
+        super(table, null);
+    }
+
+    @Override
+    protected OneInputStreamOperator<ManifestEntry, Committable> 
createWriteOperator(
+            StoreSinkWrite.Provider writeProvider, String commitUser) {
+        return new 
FileIndexModificationOperator(table.coreOptions().toConfiguration(), table);
+    }
+
+    /** File index modification operator to rewrite file index. */
+    private static class FileIndexModificationOperator
+            extends PrepareCommitOperator<ManifestEntry, Committable> {
+
+        private final FileStoreTable table;
+
+        private transient FileIndexProcessor fileIndexProcessor;
+        private transient List<CommitMessage> messages;
+
+        public FileIndexModificationOperator(Options options, FileStoreTable 
table) {
+            super(options);
+            this.table = table;
+        }
+
+        @Override
+        public void setup(
+                StreamTask<?, ?> containingTask,
+                StreamConfig config,
+                Output<StreamRecord<Committable>> output) {
+            super.setup(containingTask, config, output);
+
+            this.fileIndexProcessor = new FileIndexProcessor(table);
+            this.messages = new ArrayList<>();
+        }
+
+        @Override
+        public void processElement(StreamRecord<ManifestEntry> element) throws 
Exception {
+            ManifestEntry entry = element.getValue();
+            BinaryRow partition = entry.partition();
+            int bucket = entry.bucket();
+            DataFileMeta file = entry.file();
+            DataFileMeta indexedFile = fileIndexProcessor.process(partition, 
bucket, file);
+
+            CommitMessageImpl commitMessage =
+                    new CommitMessageImpl(
+                            partition,
+                            bucket,
+                            DataIncrement.emptyIncrement(),
+                            new CompactIncrement(
+                                    Collections.singletonList(file),
+                                    Collections.singletonList(indexedFile),
+                                    Collections.emptyList()));
+
+            messages.add(commitMessage);
+        }
+
+        @Override
+        protected List<Committable> prepareCommit(boolean waitCompaction, long 
checkpointId)
+                throws IOException {
+            ArrayList<CommitMessage> temp = new ArrayList<>(messages);
+            messages.clear();
+            return temp.stream()
+                    .map(s -> new Committable(checkpointId, 
Committable.Kind.FILE, s))
+                    .collect(Collectors.toList());
+        }
+    }
+
+    /** Does the file index rewrite. */
+    public static class FileIndexProcessor {
+
+        private final FileStoreTable table;
+        private final FileIndexOptions fileIndexOptions;
+        private final FileIO fileIO;
+        private final FileStorePathFactory pathFactory;
+        private final Map<Pair<BinaryRow, Integer>, DataFilePathFactory> 
dataFilePathFactoryMap;
+        private final SchemaCache schemaCache;
+        private final long sizeInMeta;
+
+        public FileIndexProcessor(FileStoreTable table) {
+            this.table = table;
+            this.fileIndexOptions = table.coreOptions().indexColumnsOptions();
+            this.fileIO = table.fileIO();
+            this.pathFactory = table.store().pathFactory();
+            this.dataFilePathFactoryMap = new HashMap<>();
+            this.schemaCache =
+                    new SchemaCache(fileIndexOptions, new 
SchemaManager(fileIO, table.location()));
+            this.sizeInMeta = 
table.coreOptions().fileIndexInManifestThreshold();
+        }
+
+        public DataFileMeta process(BinaryRow partition, int bucket, 
DataFileMeta dataFileMeta)
+                throws IOException {
+            DataFilePathFactory dataFilePathFactory =
+                    dataFilePathFactoryMap.computeIfAbsent(
+                            Pair.of(partition, bucket),
+                            p -> 
pathFactory.createDataFilePathFactory(partition, bucket));
+
+            Tuple4<RowType, Map<String, String>, int[], Map<String, 
Set<String>>> t4 =
+                    schemaCache.schemaInfo(dataFileMeta.schemaId());
+            RowType fileSchema = t4.f0;
+            Map<String, String> evolutionNameMap = t4.f1;
+            int[] projection = t4.f2;
+            Map<String, Set<String>> expectedFileIndex = t4.f3;
+
+            List<String> extras = new ArrayList<>(dataFileMeta.extraFiles());
+            List<String> indexFiles =
+                    dataFileMeta.extraFiles().stream()
+                            .filter(name -> 
name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+                            .collect(Collectors.toList());
+            extras.removeAll(indexFiles);
+
+            Path newIndexPath;
+            Map<String, Map<String, byte[]>> maintainers;
+            // load
+            if (!indexFiles.isEmpty()) {
+                String indexFile = indexFiles.get(0);
+                try (FileIndexFormat.Reader indexReader =
+                        FileIndexFormat.createReader(
+                                
fileIO.newInputStream(dataFilePathFactory.toPath(indexFile)),
+                                fileSchema)) {
+                    maintainers = indexReader.readAll();
+                }
+                newIndexPath = 
fileIndexPathIncrease(dataFilePathFactory.toPath(indexFile));
+            } else {
+                maintainers = new HashMap<>();
+                newIndexPath = 
toFileIndexPath(dataFilePathFactory.toPath(dataFileMeta.fileName()));
+            }
+
+            // remove unnecessary
+            for (Map.Entry<String, Map<String, byte[]>> entry :
+                    new HashSet<>(maintainers.entrySet())) {
+                String name = entry.getKey();
+                if (!expectedFileIndex.containsKey(name)) {
+                    maintainers.remove(name);
+                } else {
+                    Map<String, byte[]> indexTypeBytes = maintainers.get(name);
+                    for (String indexType : entry.getValue().keySet()) {
+                        if (!indexTypeBytes.containsKey(indexType)) {
+                            indexTypeBytes.remove(indexType);
+                        }
+                    }
+                }
+            }
+
+            // ignore close
+            FileIndexWriter fileIndexWriter =
+                    FileIndexWriter.create(
+                            fileIO,
+                            newIndexPath,
+                            fileSchema.project(projection),
+                            fileIndexOptions,
+                            evolutionNameMap);
+            if (fileIndexWriter != null) {
+                try (RecordReader<InternalRow> reader =
+                        table.newReadBuilder()
+                                .withProjection(projection)
+                                .newRead()
+                                .createReader(
+                                        DataSplit.builder()
+                                                .withPartition(partition)
+                                                .withBucket(bucket)
+                                                .withBucketPath(
+                                                        pathFactory
+                                                                
.bucketPath(partition, bucket)
+                                                                .toString())
+                                                .withDataFiles(
+                                                        
Collections.singletonList(dataFileMeta))
+                                                .rawConvertible(true)
+                                                .build())) {
+                    reader.forEachRemaining(fileIndexWriter::write);
+                }
+
+                fileIndexWriter
+                        .serializeMaintainers()
+                        .forEach(
+                                (key, value) ->
+                                        maintainers
+                                                .computeIfAbsent(key, k -> new 
HashMap<>())
+                                                .putAll(value));
+            }
+
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            try (FileIndexFormat.Writer indexWriter = 
FileIndexFormat.createWriter(baos)) {
+                if (!maintainers.isEmpty()) {
+                    indexWriter.writeColumnIndexes(maintainers);
+                }
+            }
+
+            if (baos.size() > sizeInMeta) {
+                try (OutputStream outputStream = 
fileIO.newOutputStream(newIndexPath, true)) {
+                    outputStream.write(baos.toByteArray());
+                }
+                extras.add(newIndexPath.getName());
+                return dataFileMeta.copy(extras);
+            } else if (baos.size() == 0) {
+                return dataFileMeta.copy(extras);
+            } else {
+                return dataFileMeta.copy(baos.toByteArray());
+            }
+        }
+    }
+
+    /** Schema id to specified information related to schema. */
+    private static class SchemaCache {
+
+        private final FileIndexOptions fileIndexOptions;
+        private final SchemaManager schemaManager;
+        private final TableSchema latest;
+        private final Map<
+                        Long, Tuple4<RowType, Map<String, String>, int[], 
Map<String, Set<String>>>>
+                schemaInfos;
+
+        private final Set<Long> fileSchemaIds;
+
+        public SchemaCache(FileIndexOptions fileIndexOptions, SchemaManager 
schemaManager) {
+            this.fileIndexOptions = fileIndexOptions;
+            this.schemaManager = schemaManager;
+            this.latest = 
schemaManager.latest().orElseThrow(RuntimeException::new);
+            this.schemaInfos = new HashMap<>();
+            this.fileSchemaIds = new HashSet<>();
+        }
+
+        public Tuple4<RowType, Map<String, String>, int[], Map<String, 
Set<String>>> schemaInfo(
+                long schemaId) {
+            if (!fileSchemaIds.contains(schemaId)) {
+                RowType fileSchema = 
schemaManager.schema(schemaId).logicalRowType();
+                Map<String, String> evolutionmap =
+                        schemaId == latest.id()
+                                ? null
+                                : createIndexNameMapping(latest.fields(), 
fileSchema.getFields());
+
+                List<String> projectedColumnNames = new ArrayList<>();
+                Map<String, Set<String>> expectedIndexNameType = new 
HashMap<>();
+                for (Map.Entry<FileIndexOptions.Column, Map<String, Options>> 
entry :
+                        fileIndexOptions.entrySet()) {
+                    FileIndexOptions.Column column = entry.getKey();
+                    String columnName;
+                    if (evolutionmap != null) {
+                        columnName = 
evolutionmap.getOrDefault(column.getColumnName(), null);
+                        // if column name has no corresponding field, then we 
just skip it
+                        if (columnName == null) {
+                            continue;
+                        }
+                    } else {
+                        columnName = column.getColumnName();
+                    }
+                    projectedColumnNames.add(columnName);
+                    String fullColumnName =
+                            column.isNestedColumn()
+                                    ? FileIndexCommon.toMapKey(
+                                            columnName, 
column.getNestedColumnName())
+                                    : column.getColumnName();
+                    expectedIndexNameType
+                            .computeIfAbsent(fullColumnName, name -> new 
HashSet<>())
+                            .addAll(entry.getValue().keySet());
+                }
+
+                schemaInfos.put(
+                        schemaId,
+                        Tuple4.of(
+                                fileSchema,
+                                evolutionmap,
+                                projectedColumnNames.stream()
+                                        .mapToInt(fileSchema::getFieldIndex)
+                                        .toArray(),
+                                expectedIndexNameType));
+                fileSchemaIds.add(schemaId);
+            }
+
+            return schemaInfos.get(schemaId);
+        }
+
+        private static Map<String, String> createIndexNameMapping(
+                List<DataField> tableFields, List<DataField> dataFields) {
+            Map<String, String> indexMapping = new HashMap<>();
+            Map<Integer, String> fieldIdToIndex = new HashMap<>();
+            for (DataField dataField : tableFields) {
+                fieldIdToIndex.put(dataField.id(), dataField.name());
+            }
+
+            for (DataField tableField : dataFields) {
+                String dataFieldIndex = 
fieldIdToIndex.getOrDefault(tableField.id(), null);
+                if (dataFieldIndex != null) {
+                    indexMapping.put(dataFieldIndex, tableField.name());
+                }
+            }
+
+            return indexMapping;
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
index 48efd349d..f82f08209 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
@@ -23,7 +23,6 @@ import org.apache.paimon.table.sink.CommitMessageSerializer;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
 
 /** Type information of {@link MultiTableCommittable}. */
 public class MultiTableCommittableTypeInfo extends 
TypeInformation<MultiTableCommittable> {
@@ -61,19 +60,8 @@ public class MultiTableCommittableTypeInfo extends 
TypeInformation<MultiTableCom
     @Override
     public TypeSerializer<MultiTableCommittable> 
createSerializer(ExecutionConfig config) {
         // no copy, so that data from writer is directly going into committer 
while chaining
-        return new 
SimpleVersionedSerializerTypeSerializerProxy<MultiTableCommittable>(
-                () -> new MultiTableCommittableSerializer(new 
CommitMessageSerializer())) {
-            @Override
-            public MultiTableCommittable copy(MultiTableCommittable from) {
-                return from;
-            }
-
-            @Override
-            public MultiTableCommittable copy(
-                    MultiTableCommittable from, MultiTableCommittable reuse) {
-                return from;
-            }
-        };
+        return new 
NoneCopyVersionedSerializerTypeSerializerProxy<MultiTableCommittable>(
+                () -> new MultiTableCommittableSerializer(new 
CommitMessageSerializer())) {};
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoneCopyVersionedSerializerTypeSerializerProxy.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoneCopyVersionedSerializerTypeSerializerProxy.java
new file mode 100644
index 000000000..3df58eb93
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoneCopyVersionedSerializerTypeSerializerProxy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
+import org.apache.flink.util.function.SerializableSupplier;
+
+/** No copy {@link SimpleVersionedSerializerTypeSerializerProxy}. */
+public class NoneCopyVersionedSerializerTypeSerializerProxy<T>
+        extends SimpleVersionedSerializerTypeSerializerProxy<T> {
+
+    public NoneCopyVersionedSerializerTypeSerializerProxy(
+            SerializableSupplier<SimpleVersionedSerializer<T>> 
serializerSupplier) {
+        super(serializerSupplier);
+    }
+
+    @Override
+    public T copy(T from) {
+        return from;
+    }
+
+    @Override
+    public T copy(T from, T reuse) {
+        return from;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileIndexScanSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileIndexScanSource.java
new file mode 100644
index 000000000..fa83aad83
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileIndexScanSource.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestEntrySerializer;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** Bounded {@link FlinkSource} for reading records. It does not monitor new 
snapshots. */
+public class FileIndexScanSource
+        implements Source<
+                ManifestEntry, FileIndexScanSource.Split, 
FileIndexScanSource.CheckpointState> {
+
+    private static final long serialVersionUID = 2319102734891237489L;
+
+    private final FileStoreTable table;
+    @Nullable private final Predicate partitionPredicate;
+
+    public FileIndexScanSource(FileStoreTable table, @Nullable Predicate 
partitionPredicate) {
+        this.table = table;
+        this.partitionPredicate = partitionPredicate;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SplitEnumerator<Split, CheckpointState> createEnumerator(
+            SplitEnumeratorContext<Split> splitEnumeratorContext) throws 
Exception {
+        List<ManifestEntry> manifestEntries =
+                
table.store().newScan().withPartitionFilter(partitionPredicate).plan().files();
+        return new ManifestFileSplitEnumerator(
+                splitEnumeratorContext,
+                
manifestEntries.stream().map(Split::new).collect(Collectors.toList()));
+    }
+
+    @Override
+    public SplitEnumerator<Split, CheckpointState> restoreEnumerator(
+            SplitEnumeratorContext<Split> splitEnumeratorContext, 
CheckpointState checkpointState)
+            throws Exception {
+        return new ManifestFileSplitEnumerator(splitEnumeratorContext, 
checkpointState.files());
+    }
+
+    @Override
+    public SimpleVersionedSerializer<Split> getSplitSerializer() {
+        return new SplitSerder();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<CheckpointState> 
getEnumeratorCheckpointSerializer() {
+        return new CheckpointSerde();
+    }
+
+    @Override
+    public SourceReader<ManifestEntry, Split> createReader(SourceReaderContext 
sourceReaderContext)
+            throws Exception {
+        return new Reader(sourceReaderContext);
+    }
+
+    /** State for splits. */
+    public static class CheckpointState {
+
+        private final List<Split> files;
+
+        public CheckpointState(List<Split> files) {
+            this.files = files;
+        }
+
+        public List<Split> files() {
+            return files;
+        }
+    }
+
+    /** Enumerator to generate splits. */
+    private static class ManifestFileSplitEnumerator
+            implements SplitEnumerator<Split, CheckpointState> {
+
+        private final SplitEnumeratorContext<Split> splitEnumeratorContext;
+        private final List<Split> files;
+
+        public ManifestFileSplitEnumerator(
+                SplitEnumeratorContext<Split> splitEnumeratorContext, 
List<Split> files) {
+            this.splitEnumeratorContext = splitEnumeratorContext;
+            this.files = files;
+        }
+
+        @Override
+        public void start() {}
+
+        @Override
+        public void handleSplitRequest(int i, @Nullable String s) {
+            if (!files.isEmpty()) {
+                splitEnumeratorContext.assignSplit(files.remove(0), i);
+            } else {
+                splitEnumeratorContext.signalNoMoreSplits(i);
+            }
+        }
+
+        @Override
+        public void addSplitsBack(List<Split> list, int i) {
+            files.addAll(list);
+        }
+
+        @Override
+        public void addReader(int i) {}
+
+        @Override
+        public CheckpointState snapshotState(long l) throws Exception {
+            return new CheckpointState(files);
+        }
+
+        @Override
+        public void close() throws IOException {}
+    }
+
+    /** Split to wrap ManifestEntry. */
+    public static class Split implements SourceSplit {
+
+        private final ManifestEntry manifestEntry;
+
+        public Split(ManifestEntry manifestEntry) {
+            this.manifestEntry = manifestEntry;
+        }
+
+        @Override
+        public String splitId() {
+            return "splitId";
+        }
+
+        ManifestEntry entry() {
+            return manifestEntry;
+        }
+    }
+
+    private static class SplitSerder implements 
SimpleVersionedSerializer<Split> {
+
+        private static final ManifestEntrySerializer manifestEntrySerializer =
+                new ManifestEntrySerializer();
+
+        public SplitSerder() {}
+
+        @Override
+        public int getVersion() {
+            return 0;
+        }
+
+        @Override
+        public byte[] serialize(Split sourceSplit) throws IOException {
+            return 
manifestEntrySerializer.serializeToBytes(sourceSplit.entry());
+        }
+
+        @Override
+        public Split deserialize(int i, byte[] bytes) throws IOException {
+            return new 
Split(manifestEntrySerializer.deserializeFromBytes(bytes));
+        }
+    }
+
+    private static class CheckpointSerde implements 
SimpleVersionedSerializer<CheckpointState> {
+
+        private final SplitSerder splitSerder = new SplitSerder();
+
+        @Override
+        public int getVersion() {
+            return 0;
+        }
+
+        @Override
+        public byte[] serialize(CheckpointState checkpointState) throws 
IOException {
+            ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+            DataOutput dataOutput = new 
DataOutputStream(byteArrayOutputStream);
+            List<Split> files = checkpointState.files();
+            dataOutput.writeInt(files.size());
+            for (Split file : files) {
+                byte[] b = splitSerder.serialize(file);
+                dataOutput.writeInt(b.length);
+                dataOutput.write(b);
+            }
+            return new byte[0];
+        }
+
+        @Override
+        public CheckpointState deserialize(int i, byte[] bytes) throws 
IOException {
+            ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(bytes);
+            DataInput dataInput = new DataInputStream(byteArrayInputStream);
+            int size = dataInput.readInt();
+            List<Split> files = new ArrayList<>();
+            for (int j = 0; j < size; j++) {
+                byte[] b = new byte[dataInput.readInt()];
+                dataInput.readFully(b);
+                files.add(splitSerder.deserialize(0, b));
+            }
+            return new CheckpointState(files);
+        }
+    }
+
+    /** Reader for data metafile split. */
+    private static class Reader implements SourceReader<ManifestEntry, Split> {
+
+        private final SourceReaderContext context;
+        private final ArrayDeque<Split> splits;
+
+        private boolean noMore;
+
+        public Reader(SourceReaderContext sourceReaderContext) {
+            this.context = sourceReaderContext;
+            this.splits = new ArrayDeque<>();
+        }
+
+        @Override
+        public void start() {
+            context.sendSplitRequest();
+        }
+
+        @Override
+        public InputStatus pollNext(ReaderOutput<ManifestEntry> readerOutput) 
throws Exception {
+            if (!splits.isEmpty()) {
+                readerOutput.collect(splits.poll().entry());
+                if (!noMore && splits.isEmpty()) {
+                    context.sendSplitRequest();
+                }
+                if (!splits.isEmpty()) {
+                    return InputStatus.MORE_AVAILABLE;
+                }
+            }
+            return noMore ? InputStatus.END_OF_INPUT : 
InputStatus.NOTHING_AVAILABLE;
+        }
+
+        @Override
+        public List<Split> snapshotState(long l) {
+            return new ArrayList<>(splits);
+        }
+
+        @Override
+        public CompletableFuture<Void> isAvailable() {
+            return splits.isEmpty()
+                    ? CompletableFuture.completedFuture(null)
+                    : FutureCompletingBlockingQueue.AVAILABLE;
+        }
+
+        @Override
+        public void addSplits(List<Split> list) {
+            splits.addAll(list);
+        }
+
+        @Override
+        public void notifyNoMoreSplits() {
+            noMore = true;
+        }
+
+        @Override
+        public void close() throws Exception {}
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 847390dac..2fe8403dc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -32,6 +32,7 @@ org.apache.paimon.flink.action.QueryServiceActionFactory
 ### procedure factories
 org.apache.paimon.flink.procedure.CompactDatabaseProcedure
 org.apache.paimon.flink.procedure.CompactProcedure
+org.apache.paimon.flink.procedure.FileIndexProcedure
 org.apache.paimon.flink.procedure.CreateTagProcedure
 org.apache.paimon.flink.procedure.DeleteTagProcedure
 org.apache.paimon.flink.procedure.CreateBranchProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FileIndexProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FileIndexProcedureITCase.java
new file mode 100644
index 000000000..1fbbb5869
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FileIndexProcedureITCase.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexFormat;
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/** IT Case for {@link FileIndexProcedure}. */
+public class FileIndexProcedureITCase extends CatalogITCaseBase {
+
+    @Test
+    public void testFileIndexProcedureAddIndex() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " k INT,"
+                        + " v STRING,"
+                        + " hh INT,"
+                        + " dt STRING"
+                        + ") PARTITIONED BY (dt, hh) WITH ("
+                        + " 'write-only' = 'true',"
+                        + " 'bucket' = '-1'"
+                        + ")");
+
+        sql(
+                "INSERT INTO T VALUES (1, '100', 15, '20221208'), (1, '100', 
16, '20221208'), (1, '100', 15, '20221209')");
+
+        tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+        sql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='k,v')");
+        sql("CALL sys.file_index_rewrite('default.T')");
+
+        FileStoreTable table = paimonTable("T");
+        List<ManifestEntry> list = table.store().newScan().plan().files();
+
+        for (ManifestEntry entry : list) {
+            List<String> extraFiles =
+                    entry.file().extraFiles().stream()
+                            .filter(s -> 
s.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+                            .collect(Collectors.toList());
+
+            Assertions.assertThat(extraFiles.size()).isEqualTo(1);
+
+            String file = extraFiles.get(0);
+
+            Path indexFilePath =
+                    table.store()
+                            .pathFactory()
+                            .createDataFilePathFactory(entry.partition(), 
entry.bucket())
+                            .toPath(file);
+            try (FileIndexFormat.Reader reader =
+                    FileIndexFormat.createReader(
+                            table.fileIO().newInputStream(indexFilePath), 
table.rowType())) {
+                Set<FileIndexReader> readerSetK = reader.readColumnIndex("k");
+                Assertions.assertThat(readerSetK.size()).isEqualTo(1);
+
+                Predicate predicateK = new 
PredicateBuilder(table.rowType()).equal(0, 1);
+                for (FileIndexReader fileIndexReader : readerSetK) {
+                    
Assertions.assertThat(predicateK.visit(fileIndexReader).remain()).isTrue();
+                }
+
+                predicateK = new PredicateBuilder(table.rowType()).equal(0, 4);
+                for (FileIndexReader fileIndexReader : readerSetK) {
+                    
Assertions.assertThat(predicateK.visit(fileIndexReader).remain()).isFalse();
+                }
+
+                Set<FileIndexReader> readerSetV = reader.readColumnIndex("v");
+                Assertions.assertThat(readerSetV.size()).isEqualTo(1);
+
+                Predicate predicateV =
+                        new PredicateBuilder(table.rowType())
+                                .equal(1, BinaryString.fromString("100"));
+                for (FileIndexReader fileIndexReader : readerSetV) {
+                    
Assertions.assertThat(predicateV.visit(fileIndexReader).remain()).isTrue();
+                }
+
+                predicateV =
+                        new PredicateBuilder(table.rowType())
+                                .equal(1, BinaryString.fromString("101"));
+                for (FileIndexReader fileIndexReader : readerSetV) {
+                    
Assertions.assertThat(predicateV.visit(fileIndexReader).remain()).isFalse();
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testFileIndexProcedureSchemaEvolution() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " k INT,"
+                        + " v STRING,"
+                        + " hh INT,"
+                        + " dt STRING"
+                        + ") PARTITIONED BY (dt, hh) WITH ("
+                        + " 'write-only' = 'true',"
+                        + " 'file.format' = 'avro',"
+                        + " 'bucket' = '-1'"
+                        + ")");
+
+        sql(
+                "INSERT INTO T VALUES (1, '10', 15, '20221208'), (4, '100', 
16, '20221208'), (5, '1000', 15, '20221209')");
+
+        sql("ALTER TABLE T RENAME `k` TO order_id");
+
+        sql(
+                "INSERT INTO T VALUES (1, '10', 15, '20221208'), (4, '100', 
16, '20221208'), (5, '1000', 15, '20221209')");
+
+        FileStoreTable table = paimonTable("T");
+
+        Predicate predicateK = new PredicateBuilder(table.rowType()).equal(0, 
2);
+        Predicate predicateV =
+                new PredicateBuilder(table.rowType()).equal(1, 
BinaryString.fromString("101"));
+        RecordReader<InternalRow> reader =
+                table.newRead()
+                        .withFilter(PredicateBuilder.and(predicateK, 
predicateV))
+                        .createReader(table.newScan().plan());
+        AtomicInteger count = new AtomicInteger(0);
+        reader.forEachRemaining(r -> count.incrementAndGet());
+
+        // parquet format predicate would not reduce record read from file
+        Assertions.assertThat(count.get()).isEqualTo(6);
+
+        tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+        sql("ALTER TABLE T SET 
('file-index.bloom-filter.columns'='order_id,v')");
+        sql("CALL sys.file_index_rewrite('default.T')");
+
+        reader =
+                table.newRead()
+                        .withFilter(PredicateBuilder.and(predicateK, 
predicateV))
+                        .createReader(table.newScan().plan());
+        count.set(0);
+        reader.forEachRemaining(r -> count.incrementAndGet());
+
+        // the whole file is filtered, none record left
+        Assertions.assertThat(count.get()).isEqualTo(0);
+    }
+
+    @Test
+    public void testFileIndexProcedureDropIndex() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " k INT,"
+                        + " v STRING,"
+                        + " hh INT,"
+                        + " dt STRING"
+                        + ") PARTITIONED BY (dt, hh) WITH ("
+                        + " 'write-only' = 'true',"
+                        + " 'file-index.bloom-filter.columns' = 'k,v',"
+                        + " 'bucket' = '-1'"
+                        + ")");
+
+        sql(
+                "INSERT INTO T VALUES (1, '100', 15, '20221208'), (1, '100', 
16, '20221208'), (1, '100', 15, '20221209')");
+
+        tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+        sql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='k')");
+        sql("CALL sys.file_index_rewrite('default.T')");
+
+        FileStoreTable table = paimonTable("T");
+        List<ManifestEntry> list = table.store().newScan().plan().files();
+
+        for (ManifestEntry entry : list) {
+            List<String> extraFiles =
+                    entry.file().extraFiles().stream()
+                            .filter(s -> 
s.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+                            .collect(Collectors.toList());
+
+            Assertions.assertThat(extraFiles.size()).isEqualTo(1);
+
+            String file = extraFiles.get(0);
+
+            Path indexFilePath =
+                    table.store()
+                            .pathFactory()
+                            .createDataFilePathFactory(entry.partition(), 
entry.bucket())
+                            .toPath(file);
+            try (FileIndexFormat.Reader reader =
+                    FileIndexFormat.createReader(
+                            table.fileIO().newInputStream(indexFilePath), 
table.rowType())) {
+                Set<FileIndexReader> readerSetK = reader.readColumnIndex("v");
+
+                Assertions.assertThat(readerSetK.size()).isEqualTo(0);
+            }
+        }
+
+        sql("ALTER TABLE T RESET ('file-index.bloom-filter.columns')");
+
+        sql("CALL sys.file_index_rewrite('default.T')");
+
+        table = paimonTable("T");
+        list = table.store().newScan().plan().files();
+        for (ManifestEntry entry : list) {
+            List<String> extraFiles =
+                    entry.file().extraFiles().stream()
+                            .filter(s -> 
s.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+                            .collect(Collectors.toList());
+
+            Assertions.assertThat(extraFiles.size()).isEqualTo(0);
+        }
+    }
+}

Reply via email to