This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e155308fe [flink] Adjust file index rewriter
e155308fe is described below

commit e155308fe407cec515ac38bf50eac4501039bea0
Author: Jingsong <[email protected]>
AuthorDate: Thu Jun 20 14:49:01 2024 +0800

    [flink] Adjust file index rewriter
---
 docs/content/append-table/file-index.md            |  61 +++++++++++
 docs/content/concepts/specification.md             |  34 +-----
 docs/content/flink/procedures.md                   |  10 +-
 ...leIndexWriter.java => DataFileIndexWriter.java} |  98 +++++++++--------
 .../org/apache/paimon/io/DataFilePathFactory.java  |  10 +-
 .../org/apache/paimon/io/RowDataFileWriter.java    |  24 ++--
 .../paimon/manifest/ManifestEntrySerializer.java   |  18 ---
 .../org/apache/paimon/utils/ObjectSerializer.java  |  13 +++
 ...ocedure.java => RewriteFileIndexProcedure.java} |  40 +++----
 ...ileIndexSink.java => RewriteFileIndexSink.java} | 122 ++++++++++++---------
 ...ScanSource.java => RewriteFileIndexSource.java} |  10 +-
 .../services/org.apache.paimon.factories.Factory   |   2 +-
 ...e.java => RewriteFileIndexProcedureITCase.java} |  12 +-
 13 files changed, 249 insertions(+), 205 deletions(-)

diff --git a/docs/content/append-table/file-index.md 
b/docs/content/append-table/file-index.md
new file mode 100644
index 000000000..4bf769d6e
--- /dev/null
+++ b/docs/content/append-table/file-index.md
@@ -0,0 +1,61 @@
+---
+title: "File Index"
+weight: 4
+type: docs
+aliases:
+- /append-table/file-index.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Data File Index
+
+Define `file-index.bloom-filter.columns`, Paimon will create its corresponding 
index file for each file. If the index
+file is too small, it will be stored directly in the manifest, or in the 
directory of the data file. Each data file
+corresponds to an index file, which has a separate file definition and can 
contain different types of indexes with
+multiple columns.
+
+## Concept
+
+Data file index is an external index file corresponding to a certain data 
file. If the index file is too small, it will
+be stored directly in the manifest, otherwise in the directory of the data 
file. Each data file corresponds to an index file,
+which has a separate file definition and can contain different types of 
indexes with multiple columns.
+
+## Usage
+
+Different file index may be efficient in different scenario. For example bloom 
filter may speed up query in point lookup
+scenario. Using a bitmap may consume more space but can result in greater 
accuracy. Though we only realize bloom filter
+currently, but other types of index will be supported in the future.
+
+Currently, file index is only supported in append-only table.
+
+`Bloom Filter`:
+* `file-index.bloom-filter.columns`: specify the columns that need bloom 
filter index.
+* `file-index.bloom-filter.<column_name>.fpp` to config false positive 
probability.
+* `file-index.bloom-filter.<column_name>.items` to config the expected 
distinct items in one data file.
+
+More filter types will be supported...
+
+## Procedure
+
+If you want to add file index to existing table, without any rewrite, you can 
use `rewrite_file_index` procedure. Before
+we use the procedure, you should config appropriate configurations in target 
table. You can use ALTER clause to config
+`file-index.<filter-type>.columns` to the table.
+
+How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" 
>}}) 
diff --git a/docs/content/concepts/specification.md 
b/docs/content/concepts/specification.md
index 8463b51ed..3116f2eed 100644
--- a/docs/content/concepts/specification.md
+++ b/docs/content/concepts/specification.md
@@ -247,33 +247,7 @@ Global Index is in the index directory, currently, only 
two places will use glob
 
 ## Data File Index
 
-### Concept
-
-Data file index is an external index file corresponding to a certain data 
file. If the index file is too small, it will
-be stored directly in the manifest, otherwise in the directory of the data 
file. Each data file corresponds to an index file, 
-which has a separate file definition and can contain different types of 
indexes with multiple columns.
-
-### Usage
-
-Different file index may be efficient in different scenario. For example bloom 
filter may speed up query in point lookup
-scenario. Using a bitmap may consume more space but can result in greater 
accuracy. Though we only realize bloom filter 
-currently, but other types of index will be supported in the future. 
-
-Currently, file index is only supported in append-only table.
-
-`Bloom Filter`:
-* `file-index.bloom-filter.columns`: specify the columns that need bloom 
filter index.
-* `file-index.bloom-filter.<column_name>.fpp` to config false positive 
probability.
-* `file-index.bloom-filter.<column_name>.items` to config the expected 
distinct items in one data file.
-
-
-More filter types will be supported...
-
-### Procedure
-
-If you want to add file index to existing table, without any rewrite, you can 
use `file_index_rewrite` procedure. Before
-we use the procedure, you should config appropriate configurations in target 
table. You can use ALTER clause to config 
-`file-index.<filter-type>.columns` to the table. 
-
-How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" 
>}}) 
-
+Define `file-index.bloom-filter.columns`, Paimon will create its corresponding 
index file for each file. If the index
+file is too small, it will be stored directly in the manifest, or in the 
directory of the data file. Each data file
+corresponds to an index file, which has a separate file definition and can 
contain different types of indexes with
+multiple columns.
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index c855ee1bf..da7b7ff5b 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -275,20 +275,20 @@ All available procedures are listed below.
       <td>CALL sys.repair('test_db.T')</td>
    </tr>
     <tr>
-      <td>file_index_rewrite</td>
+      <td>rewrite_file_index</td>
       <td>
-         CALL sys.file_index_rewrite(&ltidentifier&gt [, 
&ltpartitions&gt])<br/><br/>
+         CALL sys.rewrite_file_index(&ltidentifier&gt [, 
&ltpartitions&gt])<br/><br/>
       </td>
       <td>
          Rewrite the file index for the table. Argument:
             <li>identifier: &ltdatabaseName&gt.&lttableName&gt.</li>
-            <li>partitions : partition filter.</li>
+            <li>partitions : specific partitions.</li>
       </td>
       <td>
          -- rewrite the file index for the whole table<br/>
-         CALL sys.file_index_rewrite('test_db.T')<br/><br/>
+         CALL sys.rewrite_file_index('test_db.T')<br/><br/>
          -- repair all tables in a specific partition<br/>
-         CALL sys.file_index_rewrite('test_db.T', 'pt=a')<br/><br/>
+         CALL sys.rewrite_file_index('test_db.T', 'pt=a')<br/><br/>
      </td>
    </tr>
    </tbody>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java
similarity index 78%
rename from paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
rename to 
paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java
index c21d5418a..83d0cf078 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fileindex.FileIndexCommon;
 import org.apache.paimon.fileindex.FileIndexFormat;
 import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.fileindex.FileIndexWriter;
 import org.apache.paimon.fileindex.FileIndexer;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -45,8 +46,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-/** Index file writer. */
-public final class FileIndexWriter implements Closeable {
+/** Index file writer for a data file. */
+public final class DataFileIndexWriter implements Closeable {
 
     public static final FileIndexResult EMPTY_RESULT = 
FileIndexResult.of(null, null);
 
@@ -63,12 +64,12 @@ public final class FileIndexWriter implements Closeable {
 
     private byte[] embeddedIndexBytes;
 
-    public FileIndexWriter(
+    public DataFileIndexWriter(
             FileIO fileIO,
             Path path,
             RowType rowType,
             FileIndexOptions fileIndexOptions,
-            @Nullable Map<String, String> evolutionMap) {
+            @Nullable Map<String, String> colNameMapping) {
         this.fileIO = fileIO;
         this.path = path;
         List<DataField> fields = rowType.getFields();
@@ -82,15 +83,15 @@ public final class FileIndexWriter implements Closeable {
         for (Map.Entry<FileIndexOptions.Column, Map<String, Options>> entry :
                 fileIndexOptions.entrySet()) {
             FileIndexOptions.Column entryColumn = entry.getKey();
-            String tempName = entryColumn.getColumnName();
-            if (evolutionMap != null) {
-                tempName = evolutionMap.getOrDefault(tempName, null);
-                if (tempName == null) {
+            String colName = entryColumn.getColumnName();
+            if (colNameMapping != null) {
+                colName = colNameMapping.getOrDefault(colName, null);
+                if (colName == null) {
                     continue;
                 }
             }
 
-            final String columnName = tempName;
+            String columnName = colName;
             DataField field = map.get(columnName);
             if (field == null) {
                 throw new IllegalArgumentException(columnName + " does not 
exist in column fields");
@@ -98,6 +99,7 @@ public final class FileIndexWriter implements Closeable {
 
             for (Map.Entry<String, Options> typeEntry : 
entry.getValue().entrySet()) {
                 String indexType = typeEntry.getKey();
+                IndexMaintainer maintainer = indexMaintainers.get(columnName);
                 if (entryColumn.isNestedColumn()) {
                     if (field.type().getTypeRoot() != DataTypeRoot.MAP) {
                         throw new IllegalArgumentException(
@@ -105,34 +107,36 @@ public final class FileIndexWriter implements Closeable {
                                         + columnName
                                         + " is nested column, but is not map 
type. Only should map type yet.");
                     }
-                    MapType mapType = (MapType) field.type();
-                    ((MapFileIndexMaintainer)
-                                    indexMaintainers.computeIfAbsent(
-                                            columnName,
-                                            name ->
-                                                    new MapFileIndexMaintainer(
-                                                            columnName,
-                                                            indexType,
-                                                            
mapType.getKeyType(),
-                                                            
mapType.getValueType(),
-                                                            
fileIndexOptions.getMapTopLevelOptions(
-                                                                    
columnName, typeEntry.getKey()),
-                                                            
index.get(columnName))))
-                            .add(entryColumn.getNestedColumnName(), 
typeEntry.getValue());
+                    MapFileIndexMaintainer mapMaintainer = 
(MapFileIndexMaintainer) maintainer;
+                    if (mapMaintainer == null) {
+                        MapType mapType = (MapType) field.type();
+                        mapMaintainer =
+                                new MapFileIndexMaintainer(
+                                        columnName,
+                                        indexType,
+                                        mapType.getKeyType(),
+                                        mapType.getValueType(),
+                                        fileIndexOptions.getMapTopLevelOptions(
+                                                columnName, 
typeEntry.getKey()),
+                                        index.get(columnName));
+                        indexMaintainers.put(columnName, mapMaintainer);
+                    }
+                    mapMaintainer.add(entryColumn.getNestedColumnName(), 
typeEntry.getValue());
                 } else {
-                    indexMaintainers.computeIfAbsent(
-                            columnName,
-                            name ->
-                                    new FileIndexMaintainer(
-                                            columnName,
-                                            indexType,
-                                            FileIndexer.create(
-                                                            indexType,
-                                                            field.type(),
-                                                            
typeEntry.getValue())
-                                                    .createWriter(),
-                                            InternalRow.createFieldGetter(
-                                                    field.type(), 
index.get(columnName))));
+                    if (maintainer == null) {
+                        maintainer =
+                                new FileIndexMaintainer(
+                                        columnName,
+                                        indexType,
+                                        FileIndexer.create(
+                                                        indexType,
+                                                        field.type(),
+                                                        typeEntry.getValue())
+                                                .createWriter(),
+                                        InternalRow.createFieldGetter(
+                                                field.type(), 
index.get(columnName)));
+                        indexMaintainers.put(columnName, maintainer);
+                    }
                 }
             }
         }
@@ -149,18 +153,18 @@ public final class FileIndexWriter implements Closeable {
     public void close() throws IOException {
         Map<String, Map<String, byte[]>> indexMaps = serializeMaintainers();
 
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (FileIndexFormat.Writer writer = 
FileIndexFormat.createWriter(baos)) {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        try (FileIndexFormat.Writer writer = 
FileIndexFormat.createWriter(out)) {
             writer.writeColumnIndexes(indexMaps);
         }
 
-        if (baos.size() > inManifestThreshold) {
+        if (out.size() > inManifestThreshold) {
             try (OutputStream outputStream = fileIO.newOutputStream(path, 
true)) {
-                outputStream.write(baos.toByteArray());
+                outputStream.write(out.toByteArray());
             }
             resultFileName = path.getName();
         } else {
-            embeddedIndexBytes = baos.toByteArray();
+            embeddedIndexBytes = out.toByteArray();
         }
     }
 
@@ -182,21 +186,21 @@ public final class FileIndexWriter implements Closeable {
     }
 
     @Nullable
-    public static FileIndexWriter create(
+    public static DataFileIndexWriter create(
             FileIO fileIO, Path path, RowType rowType, FileIndexOptions 
fileIndexOptions) {
         return create(fileIO, path, rowType, fileIndexOptions, null);
     }
 
     @Nullable
-    public static FileIndexWriter create(
+    public static DataFileIndexWriter create(
             FileIO fileIO,
             Path path,
             RowType rowType,
             FileIndexOptions fileIndexOptions,
-            @Nullable Map<String, String> evolutionMap) {
+            @Nullable Map<String, String> colNameMapping) {
         return fileIndexOptions.isEmpty()
                 ? null
-                : new FileIndexWriter(fileIO, path, rowType, fileIndexOptions, 
evolutionMap);
+                : new DataFileIndexWriter(fileIO, path, rowType, 
fileIndexOptions, colNameMapping);
     }
 
     /** File index result. */
@@ -238,13 +242,13 @@ public final class FileIndexWriter implements Closeable {
 
         private final String columnName;
         private final String indexType;
-        private final org.apache.paimon.fileindex.FileIndexWriter 
fileIndexWriter;
+        private final FileIndexWriter fileIndexWriter;
         private final InternalRow.FieldGetter getter;
 
         public FileIndexMaintainer(
                 String columnName,
                 String indexType,
-                org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter,
+                FileIndexWriter fileIndexWriter,
                 InternalRow.FieldGetter getter) {
             this.columnName = columnName;
             this.indexType = indexType;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index f7da2760e..6c7090f3d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -72,11 +72,11 @@ public class DataFilePathFactory {
         return uuid;
     }
 
-    public static Path toFileIndexPath(Path filePath) {
-        return new Path(filePath.getParent(), filePath.getName() + 
INDEX_PATH_SUFFIX);
+    public static Path dataFileToFileIndexPath(Path dataFilePath) {
+        return new Path(dataFilePath.getParent(), dataFilePath.getName() + 
INDEX_PATH_SUFFIX);
     }
 
-    public static Path fileIndexPathIncrease(Path filePath) {
+    public static Path createNewFileIndexFilePath(Path filePath) {
         String fileName = filePath.getName();
         int dot = fileName.lastIndexOf(".");
         int dash = fileName.lastIndexOf("-");
@@ -87,8 +87,8 @@ public class DataFilePathFactory {
                 return new Path(
                         filePath.getParent(),
                         fileName.substring(0, dash + 1) + (num + 1) + 
INDEX_PATH_SUFFIX);
-            } catch (NumberFormatException e) {
-                // ignore
+            } catch (NumberFormatException ignore) {
+                // it is the first index file, has no number
             }
         }
         return new Path(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 3da909817..ae18768c9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -37,7 +37,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.function.Function;
 
-import static org.apache.paimon.io.DataFilePathFactory.toFileIndexPath;
+import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath;
 
 /**
  * A {@link StatsCollectingSingleFileWriter} to write data files containing 
{@link InternalRow}.
@@ -48,7 +48,7 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
     private final long schemaId;
     private final LongCounter seqNumCounter;
     private final SimpleStatsConverter statsArraySerializer;
-    @Nullable private final FileIndexWriter fileIndexWriter;
+    @Nullable private final DataFileIndexWriter dataFileIndexWriter;
     private final FileSource fileSource;
 
     public RowDataFileWriter(
@@ -75,9 +75,9 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
         this.schemaId = schemaId;
         this.seqNumCounter = seqNumCounter;
         this.statsArraySerializer = new SimpleStatsConverter(writeSchema);
-        this.fileIndexWriter =
-                FileIndexWriter.create(
-                        fileIO, toFileIndexPath(path), writeSchema, 
fileIndexOptions);
+        this.dataFileIndexWriter =
+                DataFileIndexWriter.create(
+                        fileIO, dataFileToFileIndexPath(path), writeSchema, 
fileIndexOptions);
         this.fileSource = fileSource;
     }
 
@@ -85,16 +85,16 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
     public void write(InternalRow row) throws IOException {
         super.write(row);
         // add row to index if needed
-        if (fileIndexWriter != null) {
-            fileIndexWriter.write(row);
+        if (dataFileIndexWriter != null) {
+            dataFileIndexWriter.write(row);
         }
         seqNumCounter.add(1L);
     }
 
     @Override
     public void close() throws IOException {
-        if (fileIndexWriter != null) {
-            fileIndexWriter.close();
+        if (dataFileIndexWriter != null) {
+            dataFileIndexWriter.close();
         }
         super.close();
     }
@@ -102,8 +102,10 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
     @Override
     public DataFileMeta result() throws IOException {
         SimpleStats stats = statsArraySerializer.toBinary(fieldStats());
-        FileIndexWriter.FileIndexResult indexResult =
-                fileIndexWriter == null ? FileIndexWriter.EMPTY_RESULT : 
fileIndexWriter.result();
+        DataFileIndexWriter.FileIndexResult indexResult =
+                dataFileIndexWriter == null
+                        ? DataFileIndexWriter.EMPTY_RESULT
+                        : dataFileIndexWriter.result();
         return DataFileMeta.forAppend(
                 path.getName(),
                 fileIO.getFileSize(path),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
index 6d3e1a961..83c2be59a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
@@ -23,13 +23,8 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMetaSerializer;
-import org.apache.paimon.io.DataInputViewStreamWrapper;
-import org.apache.paimon.io.DataOutputViewStreamWrapper;
 import org.apache.paimon.utils.VersionedObjectSerializer;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.util.function.Function;
 
 import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
@@ -82,19 +77,6 @@ public class ManifestEntrySerializer extends 
VersionedObjectSerializer<ManifestE
                 dataFileMetaSerializer.fromRow(row.getRow(4, 
dataFileMetaSerializer.numFields())));
     }
 
-    public byte[] serializeToBytes(ManifestEntry manifestEntry) throws 
IOException {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
-        serialize(manifestEntry, view);
-        return out.toByteArray();
-    }
-
-    public ManifestEntry deserializeFromBytes(byte[] bytes) throws IOException 
{
-        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
-        DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
-        return deserialize(view);
-    }
-
     public static Function<InternalRow, BinaryRow> partitionGetter() {
         return row -> deserializeBinaryRow(row.getBinary(2));
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java
index 329dee230..a83d80549 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java
@@ -112,6 +112,19 @@ public abstract class ObjectSerializer<T> implements 
Serializable {
         return deserializeList(view);
     }
 
+    public byte[] serializeToBytes(T record) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+        serialize(record, view);
+        return out.toByteArray();
+    }
+
+    public T deserializeFromBytes(byte[] bytes) throws IOException {
+        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+        DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+        return deserialize(view);
+    }
+
     /** Convert a {@link T} to {@link InternalRow}. */
     public abstract InternalRow toRow(T record);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FileIndexProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java
similarity index 83%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FileIndexProcedure.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java
index 21582ae28..a85094b9a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FileIndexProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java
@@ -20,9 +20,9 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.sink.FileIndexSink;
 import 
org.apache.paimon.flink.sink.NoneCopyVersionedSerializerTypeSerializerProxy;
-import org.apache.paimon.flink.source.FileIndexScanSource;
+import org.apache.paimon.flink.sink.RewriteFileIndexSink;
+import org.apache.paimon.flink.source.RewriteFileIndexSource;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestEntrySerializer;
 import org.apache.paimon.predicate.Predicate;
@@ -46,12 +46,12 @@ import java.util.Map;
 
 import static org.apache.paimon.utils.ParameterUtils.getPartitions;
 
-/** Migrate procedure to migrate hive table to paimon table. */
-public class FileIndexProcedure extends ProcedureBase {
+/** Rewrite file index procedure to re-generated all file index. */
+public class RewriteFileIndexProcedure extends ProcedureBase {
 
     @Override
     public String identifier() {
-        return "file_index_rewrite";
+        return "rewrite_file_index";
     }
 
     public String[] call(ProcedureContext procedureContext, String 
sourceTablePath)
@@ -89,30 +89,20 @@ public class FileIndexProcedure extends ProcedureBase {
             partitionPredicate = null;
         }
 
-        TopoBuilder.build(env, (FileStoreTable) table, partitionPredicate);
-
+        FileStoreTable storeTable = (FileStoreTable) table;
+        DataStreamSource<ManifestEntry> source =
+                env.fromSource(
+                        new RewriteFileIndexSource(storeTable, 
partitionPredicate),
+                        WatermarkStrategy.noWatermarks(),
+                        "index source",
+                        new ManifestEntryTypeInfo());
+        new RewriteFileIndexSink(storeTable).sinkFrom(source);
         return execute(env, "Add file index for table: " + sourceTablePath);
     }
 
-    private static class TopoBuilder {
-
-        public static void build(
-                StreamExecutionEnvironment env,
-                FileStoreTable table,
-                Predicate partitionPredicate) {
-            DataStreamSource<ManifestEntry> source =
-                    env.fromSource(
-                            new FileIndexScanSource(table, partitionPredicate),
-                            WatermarkStrategy.noWatermarks(),
-                            "index source",
-                            new TypeInfo());
-            new FileIndexSink(table).sinkFrom(source);
-        }
-    }
-
-    private static class TypeInfo extends GenericTypeInfo<ManifestEntry> {
+    private static class ManifestEntryTypeInfo extends 
GenericTypeInfo<ManifestEntry> {
 
-        public TypeInfo() {
+        public ManifestEntryTypeInfo() {
             super(ManifestEntry.class);
         }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileIndexSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
similarity index 78%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileIndexSink.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
index a69f3f23c..39dcca03c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileIndexSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
@@ -23,14 +23,14 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fileindex.FileIndexCommon;
 import org.apache.paimon.fileindex.FileIndexFormat;
 import org.apache.paimon.fileindex.FileIndexOptions;
-import org.apache.paimon.flink.procedure.FileIndexProcedure;
+import org.apache.paimon.flink.procedure.RewriteFileIndexProcedure;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileIndexWriter;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.FileIndexWriter;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReader;
@@ -45,13 +45,14 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Pair;
 
-import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
+import javax.annotation.Nullable;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -64,13 +65,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.io.DataFilePathFactory.fileIndexPathIncrease;
-import static org.apache.paimon.io.DataFilePathFactory.toFileIndexPath;
+import static 
org.apache.paimon.io.DataFilePathFactory.createNewFileIndexFilePath;
+import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath;
 
-/** File index sink for {@link FileIndexProcedure}. */
-public class FileIndexSink extends FlinkWriteSink<ManifestEntry> {
+/** File index sink for {@link RewriteFileIndexProcedure}. */
+public class RewriteFileIndexSink extends FlinkWriteSink<ManifestEntry> {
 
-    public FileIndexSink(FileStoreTable table) {
+    public RewriteFileIndexSink(FileStoreTable table) {
         super(table, null);
     }
 
@@ -84,6 +85,8 @@ public class FileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
     private static class FileIndexModificationOperator
             extends PrepareCommitOperator<ManifestEntry, Committable> {
 
+        private static final long serialVersionUID = 1L;
+
         private final FileStoreTable table;
 
         private transient FileIndexProcessor fileIndexProcessor;
@@ -145,7 +148,7 @@ public class FileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
         private final FileIO fileIO;
         private final FileStorePathFactory pathFactory;
         private final Map<Pair<BinaryRow, Integer>, DataFilePathFactory> 
dataFilePathFactoryMap;
-        private final SchemaCache schemaCache;
+        private final SchemaCache schemaInfoCache;
         private final long sizeInMeta;
 
         public FileIndexProcessor(FileStoreTable table) {
@@ -154,7 +157,7 @@ public class FileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
             this.fileIO = table.fileIO();
             this.pathFactory = table.store().pathFactory();
             this.dataFilePathFactoryMap = new HashMap<>();
-            this.schemaCache =
+            this.schemaInfoCache =
                     new SchemaCache(fileIndexOptions, new 
SchemaManager(fileIO, table.location()));
             this.sizeInMeta = 
table.coreOptions().fileIndexInManifestThreshold();
         }
@@ -166,13 +169,7 @@ public class FileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
                             Pair.of(partition, bucket),
                             p -> 
pathFactory.createDataFilePathFactory(partition, bucket));
 
-            Tuple4<RowType, Map<String, String>, int[], Map<String, 
Set<String>>> t4 =
-                    schemaCache.schemaInfo(dataFileMeta.schemaId());
-            RowType fileSchema = t4.f0;
-            Map<String, String> evolutionNameMap = t4.f1;
-            int[] projection = t4.f2;
-            Map<String, Set<String>> expectedFileIndex = t4.f3;
-
+            SchemaInfo schemaInfo = 
schemaInfoCache.schemaInfo(dataFileMeta.schemaId());
             List<String> extras = new ArrayList<>(dataFileMeta.extraFiles());
             List<String> indexFiles =
                     dataFileMeta.extraFiles().stream()
@@ -188,20 +185,22 @@ public class FileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
                 try (FileIndexFormat.Reader indexReader =
                         FileIndexFormat.createReader(
                                 
fileIO.newInputStream(dataFilePathFactory.toPath(indexFile)),
-                                fileSchema)) {
+                                schemaInfo.fileSchema)) {
                     maintainers = indexReader.readAll();
                 }
-                newIndexPath = 
fileIndexPathIncrease(dataFilePathFactory.toPath(indexFile));
+                newIndexPath = 
createNewFileIndexFilePath(dataFilePathFactory.toPath(indexFile));
             } else {
                 maintainers = new HashMap<>();
-                newIndexPath = 
toFileIndexPath(dataFilePathFactory.toPath(dataFileMeta.fileName()));
+                newIndexPath =
+                        dataFileToFileIndexPath(
+                                
dataFilePathFactory.toPath(dataFileMeta.fileName()));
             }
 
             // remove unnecessary
             for (Map.Entry<String, Map<String, byte[]>> entry :
                     new HashSet<>(maintainers.entrySet())) {
                 String name = entry.getKey();
-                if (!expectedFileIndex.containsKey(name)) {
+                if (!schemaInfo.projectedColFullNames.contains(name)) {
                     maintainers.remove(name);
                 } else {
                     Map<String, byte[]> indexTypeBytes = maintainers.get(name);
@@ -213,18 +212,19 @@ public class FileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
                 }
             }
 
-            // ignore close
-            FileIndexWriter fileIndexWriter =
-                    FileIndexWriter.create(
+            // ignore close, do not close to write file, only collect 
serialized maintainers
+            @SuppressWarnings("resource")
+            DataFileIndexWriter dataFileIndexWriter =
+                    DataFileIndexWriter.create(
                             fileIO,
                             newIndexPath,
-                            fileSchema.project(projection),
+                            
schemaInfo.fileSchema.project(schemaInfo.projectedIndexCols),
                             fileIndexOptions,
-                            evolutionNameMap);
-            if (fileIndexWriter != null) {
+                            schemaInfo.colNameMapping);
+            if (dataFileIndexWriter != null) {
                 try (RecordReader<InternalRow> reader =
                         table.newReadBuilder()
-                                .withProjection(projection)
+                                .withProjection(schemaInfo.projectedIndexCols)
                                 .newRead()
                                 .createReader(
                                         DataSplit.builder()
@@ -238,10 +238,10 @@ public class FileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
                                                         
Collections.singletonList(dataFileMeta))
                                                 .rawConvertible(true)
                                                 .build())) {
-                    reader.forEachRemaining(fileIndexWriter::write);
+                    reader.forEachRemaining(dataFileIndexWriter::write);
                 }
 
-                fileIndexWriter
+                dataFileIndexWriter
                         .serializeMaintainers()
                         .forEach(
                                 (key, value) ->
@@ -276,38 +276,37 @@ public class FileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
 
         private final FileIndexOptions fileIndexOptions;
         private final SchemaManager schemaManager;
-        private final TableSchema latest;
-        private final Map<
-                        Long, Tuple4<RowType, Map<String, String>, int[], 
Map<String, Set<String>>>>
-                schemaInfos;
-
+        private final TableSchema currentSchema;
+        private final Map<Long, SchemaInfo> schemaInfos;
         private final Set<Long> fileSchemaIds;
 
         public SchemaCache(FileIndexOptions fileIndexOptions, SchemaManager 
schemaManager) {
             this.fileIndexOptions = fileIndexOptions;
             this.schemaManager = schemaManager;
-            this.latest = 
schemaManager.latest().orElseThrow(RuntimeException::new);
+            this.currentSchema = 
schemaManager.latest().orElseThrow(RuntimeException::new);
             this.schemaInfos = new HashMap<>();
             this.fileSchemaIds = new HashSet<>();
         }
 
-        public Tuple4<RowType, Map<String, String>, int[], Map<String, 
Set<String>>> schemaInfo(
-                long schemaId) {
+        public SchemaInfo schemaInfo(long schemaId) {
             if (!fileSchemaIds.contains(schemaId)) {
                 RowType fileSchema = 
schemaManager.schema(schemaId).logicalRowType();
-                Map<String, String> evolutionmap =
-                        schemaId == latest.id()
+
+                @Nullable
+                Map<String, String> colNameMapping =
+                        schemaId == currentSchema.id()
                                 ? null
-                                : createIndexNameMapping(latest.fields(), 
fileSchema.getFields());
+                                : createIndexNameMapping(
+                                        currentSchema.fields(), 
fileSchema.getFields());
 
-                List<String> projectedColumnNames = new ArrayList<>();
-                Map<String, Set<String>> expectedIndexNameType = new 
HashMap<>();
+                List<String> projectedColNames = new ArrayList<>();
+                Set<String> projectedColFullNames = new HashSet<>();
                 for (Map.Entry<FileIndexOptions.Column, Map<String, Options>> 
entry :
                         fileIndexOptions.entrySet()) {
                     FileIndexOptions.Column column = entry.getKey();
                     String columnName;
-                    if (evolutionmap != null) {
-                        columnName = 
evolutionmap.getOrDefault(column.getColumnName(), null);
+                    if (colNameMapping != null) {
+                        columnName = 
colNameMapping.getOrDefault(column.getColumnName(), null);
                         // if column name has no corresponding field, then we 
just skip it
                         if (columnName == null) {
                             continue;
@@ -315,26 +314,24 @@ public class FileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
                     } else {
                         columnName = column.getColumnName();
                     }
-                    projectedColumnNames.add(columnName);
+                    projectedColNames.add(columnName);
                     String fullColumnName =
                             column.isNestedColumn()
                                     ? FileIndexCommon.toMapKey(
                                             columnName, 
column.getNestedColumnName())
                                     : column.getColumnName();
-                    expectedIndexNameType
-                            .computeIfAbsent(fullColumnName, name -> new 
HashSet<>())
-                            .addAll(entry.getValue().keySet());
+                    projectedColFullNames.add(fullColumnName);
                 }
 
                 schemaInfos.put(
                         schemaId,
-                        Tuple4.of(
+                        new SchemaInfo(
                                 fileSchema,
-                                evolutionmap,
-                                projectedColumnNames.stream()
+                                colNameMapping,
+                                projectedColNames.stream()
                                         .mapToInt(fileSchema::getFieldIndex)
                                         .toArray(),
-                                expectedIndexNameType));
+                                projectedColFullNames));
                 fileSchemaIds.add(schemaId);
             }
 
@@ -359,4 +356,23 @@ public class FileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
             return indexMapping;
         }
     }
+
+    private static class SchemaInfo {
+
+        private final RowType fileSchema;
+        private final Map<String, String> colNameMapping;
+        private final int[] projectedIndexCols;
+        private final Set<String> projectedColFullNames;
+
+        private SchemaInfo(
+                RowType fileSchema,
+                Map<String, String> colNameMapping,
+                int[] projectedIndexCols,
+                Set<String> projectedColFullNames) {
+            this.fileSchema = fileSchema;
+            this.colNameMapping = colNameMapping;
+            this.projectedIndexCols = projectedIndexCols;
+            this.projectedColFullNames = projectedColFullNames;
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileIndexScanSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/RewriteFileIndexSource.java
similarity index 96%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileIndexScanSource.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/RewriteFileIndexSource.java
index fa83aad83..9a488613f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileIndexScanSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/RewriteFileIndexSource.java
@@ -51,16 +51,18 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 /** Bounded {@link FlinkSource} for reading records. It does not monitor new 
snapshots. */
-public class FileIndexScanSource
+public class RewriteFileIndexSource
         implements Source<
-                ManifestEntry, FileIndexScanSource.Split, 
FileIndexScanSource.CheckpointState> {
+                ManifestEntry,
+                RewriteFileIndexSource.Split,
+                RewriteFileIndexSource.CheckpointState> {
 
-    private static final long serialVersionUID = 2319102734891237489L;
+    private static final long serialVersionUID = 1L;
 
     private final FileStoreTable table;
     @Nullable private final Predicate partitionPredicate;
 
-    public FileIndexScanSource(FileStoreTable table, @Nullable Predicate 
partitionPredicate) {
+    public RewriteFileIndexSource(FileStoreTable table, @Nullable Predicate 
partitionPredicate) {
         this.table = table;
         this.partitionPredicate = partitionPredicate;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 2fe8403dc..c41539091 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -32,7 +32,7 @@ org.apache.paimon.flink.action.QueryServiceActionFactory
 ### procedure factories
 org.apache.paimon.flink.procedure.CompactDatabaseProcedure
 org.apache.paimon.flink.procedure.CompactProcedure
-org.apache.paimon.flink.procedure.FileIndexProcedure
+org.apache.paimon.flink.procedure.RewriteFileIndexProcedure
 org.apache.paimon.flink.procedure.CreateTagProcedure
 org.apache.paimon.flink.procedure.DeleteTagProcedure
 org.apache.paimon.flink.procedure.CreateBranchProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FileIndexProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java
similarity index 96%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FileIndexProcedureITCase.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java
index 1fbbb5869..d465287ff 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FileIndexProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java
@@ -40,8 +40,8 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-/** IT Case for {@link FileIndexProcedure}. */
-public class FileIndexProcedureITCase extends CatalogITCaseBase {
+/** IT Case for {@link RewriteFileIndexProcedure}. */
+public class RewriteFileIndexProcedureITCase extends CatalogITCaseBase {
 
     @Test
     public void testFileIndexProcedureAddIndex() throws Exception {
@@ -61,7 +61,7 @@ public class FileIndexProcedureITCase extends 
CatalogITCaseBase {
 
         tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
         sql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='k,v')");
-        sql("CALL sys.file_index_rewrite('default.T')");
+        sql("CALL sys.rewrite_file_index('default.T')");
 
         FileStoreTable table = paimonTable("T");
         List<ManifestEntry> list = table.store().newScan().plan().files();
@@ -156,7 +156,7 @@ public class FileIndexProcedureITCase extends 
CatalogITCaseBase {
 
         tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
         sql("ALTER TABLE T SET 
('file-index.bloom-filter.columns'='order_id,v')");
-        sql("CALL sys.file_index_rewrite('default.T')");
+        sql("CALL sys.rewrite_file_index('default.T')");
 
         reader =
                 table.newRead()
@@ -188,7 +188,7 @@ public class FileIndexProcedureITCase extends 
CatalogITCaseBase {
 
         tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
         sql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='k')");
-        sql("CALL sys.file_index_rewrite('default.T')");
+        sql("CALL sys.rewrite_file_index('default.T')");
 
         FileStoreTable table = paimonTable("T");
         List<ManifestEntry> list = table.store().newScan().plan().files();
@@ -219,7 +219,7 @@ public class FileIndexProcedureITCase extends 
CatalogITCaseBase {
 
         sql("ALTER TABLE T RESET ('file-index.bloom-filter.columns')");
 
-        sql("CALL sys.file_index_rewrite('default.T')");
+        sql("CALL sys.rewrite_file_index('default.T')");
 
         table = paimonTable("T");
         list = table.store().newScan().plan().files();

Reply via email to