This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7524767e6 [flink] Introduce file index rewriter (#3391)
7524767e6 is described below
commit 7524767e6ff9af15f34b4b6990d726c3e9d9485d
Author: YeJunHao <[email protected]>
AuthorDate: Thu Jun 20 15:45:23 2024 +0800
[flink] Introduce file index rewriter (#3391)
---
docs/content/concepts/specification.md | 34 +-
docs/content/flink/procedures.md | 17 +
.../apache/paimon/fileindex/FileIndexFormat.java | 20 +-
.../java/org/apache/paimon/io/DataFileMeta.java | 20 ++
.../org/apache/paimon/io/DataFilePathFactory.java | 19 ++
.../java/org/apache/paimon/io/FileIndexWriter.java | 54 ++-
.../paimon/manifest/ManifestEntrySerializer.java | 18 +
.../paimon/flink/procedure/FileIndexProcedure.java | 146 +++++++++
.../paimon/flink/sink/CommittableTypeInfo.java | 15 +-
.../paimon/flink/sink/CompactionTaskTypeInfo.java | 16 +-
.../apache/paimon/flink/sink/FileIndexSink.java | 362 +++++++++++++++++++++
.../flink/sink/MultiTableCommittableTypeInfo.java | 16 +-
...CopyVersionedSerializerTypeSerializerProxy.java | 43 +++
.../paimon/flink/source/FileIndexScanSource.java | 299 +++++++++++++++++
.../services/org.apache.paimon.factories.Factory | 1 +
.../flink/procedure/FileIndexProcedureITCase.java | 235 +++++++++++++
16 files changed, 1254 insertions(+), 61 deletions(-)
diff --git a/docs/content/concepts/specification.md
b/docs/content/concepts/specification.md
index 3116f2eed..8463b51ed 100644
--- a/docs/content/concepts/specification.md
+++ b/docs/content/concepts/specification.md
@@ -247,7 +247,33 @@ Global Index is in the index directory, currently, only
two places will use glob
## Data File Index
-Define `file-index.bloom-filter.columns`, Paimon will create its corresponding
index file for each file. If the index
-file is too small, it will be stored directly in the manifest, or in the
directory of the data file. Each data file
-corresponds to an index file, which has a separate file definition and can
contain different types of indexes with
-multiple columns.
+### Concept
+
+Data file index is an external index file corresponding to a certain data
file. If the index file is too small, it will
+be stored directly in the manifest, otherwise in the directory of the data
file. Each data file corresponds to an index file,
+which has a separate file definition and can contain different types of
indexes with multiple columns.
+
+### Usage
+
+Different file index may be efficient in different scenario. For example bloom
filter may speed up query in point lookup
+scenario. Using a bitmap may consume more space but can result in greater
accuracy. Though we only realize bloom filter
+currently, but other types of index will be supported in the future.
+
+Currently, file index is only supported in append-only table.
+
+`Bloom Filter`:
+* `file-index.bloom-filter.columns`: specify the columns that need bloom
filter index.
+* `file-index.bloom-filter.<column_name>.fpp` to config false positive
probability.
+* `file-index.bloom-filter.<column_name>.items` to config the expected
distinct items in one data file.
+
+
+More filter types will be supported...
+
+### Procedure
+
+If you want to add file index to existing table, without any rewrite, you can
use `file_index_rewrite` procedure. Before
+we use the procedure, you should config appropriate configurations in target
table. You can use ALTER clause to config
+`file-index.<filter-type>.columns` to the table.
+
+How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures"
>}})
+
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index cfa905628..c855ee1bf 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -273,6 +273,23 @@ All available procedures are listed below.
<li>tableName: the target table identifier.</li>
</td>
<td>CALL sys.repair('test_db.T')</td>
+ </tr>
+ <tr>
+ <td>file_index_rewrite</td>
+ <td>
+ CALL sys.file_index_rewrite(<identifier> [,
<partitions>])<br/><br/>
+ </td>
+ <td>
+ Rewrite the file index for the table. Argument:
+ <li>identifier: <databaseName>.<tableName>.</li>
+ <li>partitions : partition filter.</li>
+ </td>
+ <td>
+ -- rewrite the file index for the whole table<br/>
+ CALL sys.file_index_rewrite('test_db.T')<br/><br/>
+ -- repair all tables in a specific partition<br/>
+ CALL sys.file_index_rewrite('test_db.T', 'pt=a')<br/><br/>
+ </td>
</tr>
</tbody>
</table>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
index 4541cc46d..07ee5a18a 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
@@ -38,6 +38,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -137,13 +138,13 @@ public final class FileIndexFormat {
public void writeColumnIndexes(Map<String, Map<String, byte[]>>
indexes)
throws IOException {
- Map<String, Map<String, Pair<Integer, Integer>>> bodyInfo = new
HashMap<>();
+ Map<String, Map<String, Pair<Integer, Integer>>> bodyInfo = new
LinkedHashMap<>();
// construct body
ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
for (Map.Entry<String, Map<String, byte[]>> columnMap :
indexes.entrySet()) {
Map<String, Pair<Integer, Integer>> innerMap =
- bodyInfo.computeIfAbsent(columnMap.getKey(), k -> new
HashMap<>());
+ bodyInfo.computeIfAbsent(columnMap.getKey(), k -> new
LinkedHashMap<>());
Map<String, byte[]> bytesMap = columnMap.getValue();
for (Map.Entry<String, byte[]> entry : bytesMap.entrySet()) {
int startPosition = baos.size();
@@ -335,6 +336,21 @@ public final class FileIndexFormat {
return b;
}
+ public Map<String, Map<String, byte[]>> readAll() {
+ Map<String, Map<String, byte[]>> result = new HashMap<>();
+ for (Map.Entry<String, Map<String, Pair<Integer, Integer>>>
entryOuter :
+ header.entrySet()) {
+ for (Map.Entry<String, Pair<Integer, Integer>> entryInner :
+ entryOuter.getValue().entrySet()) {
+ result.computeIfAbsent(entryOuter.getKey(), key -> new
HashMap<>())
+ .put(
+ entryInner.getKey(),
+
getBytesWithStartAndLength(entryInner.getValue()));
+ }
+ }
+ return result;
+ }
+
@VisibleForTesting
// only for test yet
Optional<byte[]> getBytesWithNameAndType(String columnName, String
indexType) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 834710725..7b2785261 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -376,6 +376,26 @@ public class DataFileMeta {
fileSource);
}
+ public DataFileMeta copy(byte[] newEmbeddedIndex) {
+ return new DataFileMeta(
+ fileName,
+ fileSize,
+ rowCount,
+ minKey,
+ maxKey,
+ keyStats,
+ valueStats,
+ minSequenceNumber,
+ maxSequenceNumber,
+ schemaId,
+ level,
+ extraFiles,
+ creationTime,
+ deleteRowCount,
+ newEmbeddedIndex,
+ fileSource);
+ }
+
@Override
public boolean equals(Object o) {
if (o == this) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index ef83e8598..f7da2760e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -76,6 +76,25 @@ public class DataFilePathFactory {
return new Path(filePath.getParent(), filePath.getName() +
INDEX_PATH_SUFFIX);
}
+ public static Path fileIndexPathIncrease(Path filePath) {
+ String fileName = filePath.getName();
+ int dot = fileName.lastIndexOf(".");
+ int dash = fileName.lastIndexOf("-");
+
+ if (dash != -1) {
+ try {
+ int num = Integer.parseInt(fileName.substring(dash + 1, dot));
+ return new Path(
+ filePath.getParent(),
+ fileName.substring(0, dash + 1) + (num + 1) +
INDEX_PATH_SUFFIX);
+ } catch (NumberFormatException e) {
+ // ignore
+ }
+ }
+ return new Path(
+ filePath.getParent(), fileName.substring(0, dot) + "-" + 1 +
INDEX_PATH_SUFFIX);
+ }
+
public static String formatIdentifier(String fileName) {
int index = fileName.lastIndexOf('.');
if (index == -1) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
index 0ec473fed..c21d5418a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
@@ -64,7 +64,11 @@ public final class FileIndexWriter implements Closeable {
private byte[] embeddedIndexBytes;
public FileIndexWriter(
- FileIO fileIO, Path path, RowType rowType, FileIndexOptions
fileIndexOptions) {
+ FileIO fileIO,
+ Path path,
+ RowType rowType,
+ FileIndexOptions fileIndexOptions,
+ @Nullable Map<String, String> evolutionMap) {
this.fileIO = fileIO;
this.path = path;
List<DataField> fields = rowType.getFields();
@@ -78,7 +82,15 @@ public final class FileIndexWriter implements Closeable {
for (Map.Entry<FileIndexOptions.Column, Map<String, Options>> entry :
fileIndexOptions.entrySet()) {
FileIndexOptions.Column entryColumn = entry.getKey();
- String columnName = entryColumn.getColumnName();
+ String tempName = entryColumn.getColumnName();
+ if (evolutionMap != null) {
+ tempName = evolutionMap.getOrDefault(tempName, null);
+ if (tempName == null) {
+ continue;
+ }
+ }
+
+ final String columnName = tempName;
DataField field = map.get(columnName);
if (field == null) {
throw new IllegalArgumentException(columnName + " does not
exist in column fields");
@@ -135,16 +147,7 @@ public final class FileIndexWriter implements Closeable {
@Override
public void close() throws IOException {
- Map<String, Map<String, byte[]>> indexMaps = new HashMap<>();
-
- for (IndexMaintainer indexMaintainer : indexMaintainers.values()) {
- Map<String, byte[]> mapBytes = indexMaintainer.serializedBytes();
- for (Map.Entry<String, byte[]> entry : mapBytes.entrySet()) {
- indexMaps
- .computeIfAbsent(entry.getKey(), k -> new HashMap<>())
- .put(indexMaintainer.getIndexType(), entry.getValue());
- }
- }
+ Map<String, Map<String, byte[]>> indexMaps = serializeMaintainers();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (FileIndexFormat.Writer writer =
FileIndexFormat.createWriter(baos)) {
@@ -152,7 +155,7 @@ public final class FileIndexWriter implements Closeable {
}
if (baos.size() > inManifestThreshold) {
- try (OutputStream outputStream = fileIO.newOutputStream(path,
false)) {
+ try (OutputStream outputStream = fileIO.newOutputStream(path,
true)) {
outputStream.write(baos.toByteArray());
}
resultFileName = path.getName();
@@ -161,6 +164,19 @@ public final class FileIndexWriter implements Closeable {
}
}
+ public Map<String, Map<String, byte[]>> serializeMaintainers() {
+ Map<String, Map<String, byte[]>> indexMaps = new HashMap<>();
+ for (IndexMaintainer indexMaintainer : indexMaintainers.values()) {
+ Map<String, byte[]> mapBytes = indexMaintainer.serializedBytes();
+ for (Map.Entry<String, byte[]> entry : mapBytes.entrySet()) {
+ indexMaps
+ .computeIfAbsent(entry.getKey(), k -> new HashMap<>())
+ .put(indexMaintainer.getIndexType(), entry.getValue());
+ }
+ }
+ return indexMaps;
+ }
+
public FileIndexResult result() {
return FileIndexResult.of(embeddedIndexBytes, resultFileName);
}
@@ -168,9 +184,19 @@ public final class FileIndexWriter implements Closeable {
@Nullable
public static FileIndexWriter create(
FileIO fileIO, Path path, RowType rowType, FileIndexOptions
fileIndexOptions) {
+ return create(fileIO, path, rowType, fileIndexOptions, null);
+ }
+
+ @Nullable
+ public static FileIndexWriter create(
+ FileIO fileIO,
+ Path path,
+ RowType rowType,
+ FileIndexOptions fileIndexOptions,
+ @Nullable Map<String, String> evolutionMap) {
return fileIndexOptions.isEmpty()
? null
- : new FileIndexWriter(fileIO, path, rowType, fileIndexOptions);
+ : new FileIndexWriter(fileIO, path, rowType, fileIndexOptions,
evolutionMap);
}
/** File index result. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
index 83c2be59a..6d3e1a961 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
@@ -23,8 +23,13 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.utils.VersionedObjectSerializer;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.function.Function;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
@@ -77,6 +82,19 @@ public class ManifestEntrySerializer extends
VersionedObjectSerializer<ManifestE
dataFileMetaSerializer.fromRow(row.getRow(4,
dataFileMetaSerializer.numFields())));
}
+ public byte[] serializeToBytes(ManifestEntry manifestEntry) throws
IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
+ serialize(manifestEntry, view);
+ return out.toByteArray();
+ }
+
+ public ManifestEntry deserializeFromBytes(byte[] bytes) throws IOException
{
+ ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+ DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+ return deserialize(view);
+ }
+
public static Function<InternalRow, BinaryRow> partitionGetter() {
return row -> deserializeBinaryRow(row.getBinary(2));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FileIndexProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FileIndexProcedure.java
new file mode 100644
index 000000000..21582ae28
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FileIndexProcedure.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.sink.FileIndexSink;
+import
org.apache.paimon.flink.sink.NoneCopyVersionedSerializerTypeSerializerProxy;
+import org.apache.paimon.flink.source.FileIndexScanSource;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestEntrySerializer;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.ParameterUtils.getPartitions;
+
+/** Migrate procedure to migrate hive table to paimon table. */
+public class FileIndexProcedure extends ProcedureBase {
+
+ @Override
+ public String identifier() {
+ return "file_index_rewrite";
+ }
+
+ public String[] call(ProcedureContext procedureContext, String
sourceTablePath)
+ throws Exception {
+ return call(procedureContext, sourceTablePath, "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String sourceTablePath, String
partitions)
+ throws Exception {
+
+ StreamExecutionEnvironment env =
procedureContext.getExecutionEnvironment();
+ Table table = catalog.getTable(Identifier.fromString(sourceTablePath));
+
+ List<Map<String, String>> partitionList =
+ StringUtils.isBlank(partitions) ? null :
getPartitions(partitions.split(";"));
+
+ Predicate partitionPredicate;
+ if (partitionList != null) {
+ // This predicate is based on the row type of the original table,
not bucket table.
+ // Because TableScan in BucketsTable is the same with
FileStoreTable,
+ // and partition filter is done by scan.
+ partitionPredicate =
+ PredicateBuilder.or(
+ partitionList.stream()
+ .map(
+ p ->
+ PredicateBuilder.partition(
+ p,
+ table.rowType(),
+
CoreOptions.PARTITION_DEFAULT_NAME
+
.defaultValue()))
+ .toArray(Predicate[]::new));
+ } else {
+ partitionPredicate = null;
+ }
+
+ TopoBuilder.build(env, (FileStoreTable) table, partitionPredicate);
+
+ return execute(env, "Add file index for table: " + sourceTablePath);
+ }
+
+ private static class TopoBuilder {
+
+ public static void build(
+ StreamExecutionEnvironment env,
+ FileStoreTable table,
+ Predicate partitionPredicate) {
+ DataStreamSource<ManifestEntry> source =
+ env.fromSource(
+ new FileIndexScanSource(table, partitionPredicate),
+ WatermarkStrategy.noWatermarks(),
+ "index source",
+ new TypeInfo());
+ new FileIndexSink(table).sinkFrom(source);
+ }
+ }
+
+ private static class TypeInfo extends GenericTypeInfo<ManifestEntry> {
+
+ public TypeInfo() {
+ super(ManifestEntry.class);
+ }
+
+ @Override
+ public TypeSerializer<ManifestEntry> createSerializer(SerializerConfig
config) {
+ return new NoneCopyVersionedSerializerTypeSerializerProxy<>(
+ () ->
+ new SimpleVersionedSerializer<ManifestEntry>() {
+ private final ManifestEntrySerializer
manifestEntrySerializer =
+ new ManifestEntrySerializer();
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(ManifestEntry
manifestEntry)
+ throws IOException {
+ return
manifestEntrySerializer.serializeToBytes(manifestEntry);
+ }
+
+ @Override
+ public ManifestEntry deserialize(int i, byte[]
bytes)
+ throws IOException {
+ return
manifestEntrySerializer.deserializeFromBytes(bytes);
+ }
+ });
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
index a29956bf7..dcb87238b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
@@ -23,7 +23,6 @@ import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
/** Type information of {@link Committable}. */
public class CommittableTypeInfo extends TypeInformation<Committable> {
@@ -61,18 +60,8 @@ public class CommittableTypeInfo extends
TypeInformation<Committable> {
@Override
public TypeSerializer<Committable> createSerializer(ExecutionConfig
config) {
// no copy, so that data from writer is directly going into committer
while chaining
- return new SimpleVersionedSerializerTypeSerializerProxy<Committable>(
- () -> new CommittableSerializer(new
CommitMessageSerializer())) {
- @Override
- public Committable copy(Committable from) {
- return from;
- }
-
- @Override
- public Committable copy(Committable from, Committable reuse) {
- return from;
- }
- };
+ return new NoneCopyVersionedSerializerTypeSerializerProxy<Committable>(
+ () -> new CommittableSerializer(new
CommitMessageSerializer())) {};
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
index c33ab95e1..1c3d1e8dd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
@@ -24,7 +24,6 @@ import org.apache.paimon.table.sink.CompactionTaskSerializer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
/** Type information of {@link AppendOnlyCompactionTask}. */
public class CompactionTaskTypeInfo extends
TypeInformation<AppendOnlyCompactionTask> {
@@ -62,19 +61,8 @@ public class CompactionTaskTypeInfo extends
TypeInformation<AppendOnlyCompaction
@Override
public TypeSerializer<AppendOnlyCompactionTask>
createSerializer(ExecutionConfig config) {
// we don't need copy for task
- return new
SimpleVersionedSerializerTypeSerializerProxy<AppendOnlyCompactionTask>(
- () -> new CompactionTaskSimpleSerializer(new
CompactionTaskSerializer())) {
- @Override
- public AppendOnlyCompactionTask copy(AppendOnlyCompactionTask
from) {
- return from;
- }
-
- @Override
- public AppendOnlyCompactionTask copy(
- AppendOnlyCompactionTask from, AppendOnlyCompactionTask
reuse) {
- return from;
- }
- };
+ return new
NoneCopyVersionedSerializerTypeSerializerProxy<AppendOnlyCompactionTask>(
+ () -> new CompactionTaskSimpleSerializer(new
CompactionTaskSerializer())) {};
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileIndexSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileIndexSink.java
new file mode 100644
index 000000000..a69f3f23c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileIndexSink.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexCommon;
+import org.apache.paimon.fileindex.FileIndexFormat;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.flink.procedure.FileIndexProcedure;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.FileIndexWriter;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.io.DataFilePathFactory.fileIndexPathIncrease;
+import static org.apache.paimon.io.DataFilePathFactory.toFileIndexPath;
+
+/** File index sink for {@link FileIndexProcedure}. */
+public class FileIndexSink extends FlinkWriteSink<ManifestEntry> {
+
+ public FileIndexSink(FileStoreTable table) {
+ super(table, null);
+ }
+
+ @Override
+ protected OneInputStreamOperator<ManifestEntry, Committable>
createWriteOperator(
+ StoreSinkWrite.Provider writeProvider, String commitUser) {
+ return new
FileIndexModificationOperator(table.coreOptions().toConfiguration(), table);
+ }
+
+ /** File index modification operator to rewrite file index. */
+ private static class FileIndexModificationOperator
+ extends PrepareCommitOperator<ManifestEntry, Committable> {
+
+ private final FileStoreTable table;
+
+ private transient FileIndexProcessor fileIndexProcessor;
+ private transient List<CommitMessage> messages;
+
+ public FileIndexModificationOperator(Options options, FileStoreTable
table) {
+ super(options);
+ this.table = table;
+ }
+
+ @Override
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<Committable>> output) {
+ super.setup(containingTask, config, output);
+
+ this.fileIndexProcessor = new FileIndexProcessor(table);
+ this.messages = new ArrayList<>();
+ }
+
+ @Override
+ public void processElement(StreamRecord<ManifestEntry> element) throws
Exception {
+ ManifestEntry entry = element.getValue();
+ BinaryRow partition = entry.partition();
+ int bucket = entry.bucket();
+ DataFileMeta file = entry.file();
+ DataFileMeta indexedFile = fileIndexProcessor.process(partition,
bucket, file);
+
+ CommitMessageImpl commitMessage =
+ new CommitMessageImpl(
+ partition,
+ bucket,
+ DataIncrement.emptyIncrement(),
+ new CompactIncrement(
+ Collections.singletonList(file),
+ Collections.singletonList(indexedFile),
+ Collections.emptyList()));
+
+ messages.add(commitMessage);
+ }
+
+ @Override
+ protected List<Committable> prepareCommit(boolean waitCompaction, long
checkpointId)
+ throws IOException {
+ ArrayList<CommitMessage> temp = new ArrayList<>(messages);
+ messages.clear();
+ return temp.stream()
+ .map(s -> new Committable(checkpointId,
Committable.Kind.FILE, s))
+ .collect(Collectors.toList());
+ }
+ }
+
+ /** Does the file index rewrite. */
+ public static class FileIndexProcessor {
+
+ private final FileStoreTable table;
+ private final FileIndexOptions fileIndexOptions;
+ private final FileIO fileIO;
+ private final FileStorePathFactory pathFactory;
+ private final Map<Pair<BinaryRow, Integer>, DataFilePathFactory>
dataFilePathFactoryMap;
+ private final SchemaCache schemaCache;
+ private final long sizeInMeta;
+
+ public FileIndexProcessor(FileStoreTable table) {
+ this.table = table;
+ this.fileIndexOptions = table.coreOptions().indexColumnsOptions();
+ this.fileIO = table.fileIO();
+ this.pathFactory = table.store().pathFactory();
+ this.dataFilePathFactoryMap = new HashMap<>();
+ this.schemaCache =
+ new SchemaCache(fileIndexOptions, new
SchemaManager(fileIO, table.location()));
+ this.sizeInMeta =
table.coreOptions().fileIndexInManifestThreshold();
+ }
+
+ public DataFileMeta process(BinaryRow partition, int bucket,
DataFileMeta dataFileMeta)
+ throws IOException {
+ DataFilePathFactory dataFilePathFactory =
+ dataFilePathFactoryMap.computeIfAbsent(
+ Pair.of(partition, bucket),
+ p ->
pathFactory.createDataFilePathFactory(partition, bucket));
+
+ Tuple4<RowType, Map<String, String>, int[], Map<String,
Set<String>>> t4 =
+ schemaCache.schemaInfo(dataFileMeta.schemaId());
+ RowType fileSchema = t4.f0;
+ Map<String, String> evolutionNameMap = t4.f1;
+ int[] projection = t4.f2;
+ Map<String, Set<String>> expectedFileIndex = t4.f3;
+
+ List<String> extras = new ArrayList<>(dataFileMeta.extraFiles());
+ List<String> indexFiles =
+ dataFileMeta.extraFiles().stream()
+ .filter(name ->
name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+ .collect(Collectors.toList());
+ extras.removeAll(indexFiles);
+
+ Path newIndexPath;
+ Map<String, Map<String, byte[]>> maintainers;
+ // load
+ if (!indexFiles.isEmpty()) {
+ String indexFile = indexFiles.get(0);
+ try (FileIndexFormat.Reader indexReader =
+ FileIndexFormat.createReader(
+
fileIO.newInputStream(dataFilePathFactory.toPath(indexFile)),
+ fileSchema)) {
+ maintainers = indexReader.readAll();
+ }
+ newIndexPath =
fileIndexPathIncrease(dataFilePathFactory.toPath(indexFile));
+ } else {
+ maintainers = new HashMap<>();
+ newIndexPath =
toFileIndexPath(dataFilePathFactory.toPath(dataFileMeta.fileName()));
+ }
+
+ // remove unnecessary
+ for (Map.Entry<String, Map<String, byte[]>> entry :
+ new HashSet<>(maintainers.entrySet())) {
+ String name = entry.getKey();
+ if (!expectedFileIndex.containsKey(name)) {
+ maintainers.remove(name);
+ } else {
+ Map<String, byte[]> indexTypeBytes = maintainers.get(name);
+ for (String indexType : entry.getValue().keySet()) {
+ if (!indexTypeBytes.containsKey(indexType)) {
+ indexTypeBytes.remove(indexType);
+ }
+ }
+ }
+ }
+
+ // ignore close
+ FileIndexWriter fileIndexWriter =
+ FileIndexWriter.create(
+ fileIO,
+ newIndexPath,
+ fileSchema.project(projection),
+ fileIndexOptions,
+ evolutionNameMap);
+ if (fileIndexWriter != null) {
+ try (RecordReader<InternalRow> reader =
+ table.newReadBuilder()
+ .withProjection(projection)
+ .newRead()
+ .createReader(
+ DataSplit.builder()
+ .withPartition(partition)
+ .withBucket(bucket)
+ .withBucketPath(
+ pathFactory
+
.bucketPath(partition, bucket)
+ .toString())
+ .withDataFiles(
+
Collections.singletonList(dataFileMeta))
+ .rawConvertible(true)
+ .build())) {
+ reader.forEachRemaining(fileIndexWriter::write);
+ }
+
+ fileIndexWriter
+ .serializeMaintainers()
+ .forEach(
+ (key, value) ->
+ maintainers
+ .computeIfAbsent(key, k -> new
HashMap<>())
+ .putAll(value));
+ }
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (FileIndexFormat.Writer indexWriter =
FileIndexFormat.createWriter(baos)) {
+ if (!maintainers.isEmpty()) {
+ indexWriter.writeColumnIndexes(maintainers);
+ }
+ }
+
+ if (baos.size() > sizeInMeta) {
+ try (OutputStream outputStream =
fileIO.newOutputStream(newIndexPath, true)) {
+ outputStream.write(baos.toByteArray());
+ }
+ extras.add(newIndexPath.getName());
+ return dataFileMeta.copy(extras);
+ } else if (baos.size() == 0) {
+ return dataFileMeta.copy(extras);
+ } else {
+ return dataFileMeta.copy(baos.toByteArray());
+ }
+ }
+ }
+
+ /** Schema id to specified information related to schema. */
+ private static class SchemaCache {
+
+ private final FileIndexOptions fileIndexOptions;
+ private final SchemaManager schemaManager;
+ private final TableSchema latest;
+ private final Map<
+ Long, Tuple4<RowType, Map<String, String>, int[],
Map<String, Set<String>>>>
+ schemaInfos;
+
+ private final Set<Long> fileSchemaIds;
+
+ public SchemaCache(FileIndexOptions fileIndexOptions, SchemaManager
schemaManager) {
+ this.fileIndexOptions = fileIndexOptions;
+ this.schemaManager = schemaManager;
+ this.latest =
schemaManager.latest().orElseThrow(RuntimeException::new);
+ this.schemaInfos = new HashMap<>();
+ this.fileSchemaIds = new HashSet<>();
+ }
+
+ public Tuple4<RowType, Map<String, String>, int[], Map<String,
Set<String>>> schemaInfo(
+ long schemaId) {
+ if (!fileSchemaIds.contains(schemaId)) {
+ RowType fileSchema =
schemaManager.schema(schemaId).logicalRowType();
+ Map<String, String> evolutionmap =
+ schemaId == latest.id()
+ ? null
+ : createIndexNameMapping(latest.fields(),
fileSchema.getFields());
+
+ List<String> projectedColumnNames = new ArrayList<>();
+ Map<String, Set<String>> expectedIndexNameType = new
HashMap<>();
+ for (Map.Entry<FileIndexOptions.Column, Map<String, Options>>
entry :
+ fileIndexOptions.entrySet()) {
+ FileIndexOptions.Column column = entry.getKey();
+ String columnName;
+ if (evolutionmap != null) {
+ columnName =
evolutionmap.getOrDefault(column.getColumnName(), null);
+ // if column name has no corresponding field, then we
just skip it
+ if (columnName == null) {
+ continue;
+ }
+ } else {
+ columnName = column.getColumnName();
+ }
+ projectedColumnNames.add(columnName);
+ String fullColumnName =
+ column.isNestedColumn()
+ ? FileIndexCommon.toMapKey(
+ columnName,
column.getNestedColumnName())
+ : column.getColumnName();
+ expectedIndexNameType
+ .computeIfAbsent(fullColumnName, name -> new
HashSet<>())
+ .addAll(entry.getValue().keySet());
+ }
+
+ schemaInfos.put(
+ schemaId,
+ Tuple4.of(
+ fileSchema,
+ evolutionmap,
+ projectedColumnNames.stream()
+ .mapToInt(fileSchema::getFieldIndex)
+ .toArray(),
+ expectedIndexNameType));
+ fileSchemaIds.add(schemaId);
+ }
+
+ return schemaInfos.get(schemaId);
+ }
+
+ private static Map<String, String> createIndexNameMapping(
+ List<DataField> tableFields, List<DataField> dataFields) {
+ Map<String, String> indexMapping = new HashMap<>();
+ Map<Integer, String> fieldIdToIndex = new HashMap<>();
+ for (DataField dataField : tableFields) {
+ fieldIdToIndex.put(dataField.id(), dataField.name());
+ }
+
+ for (DataField tableField : dataFields) {
+ String dataFieldIndex =
fieldIdToIndex.getOrDefault(tableField.id(), null);
+ if (dataFieldIndex != null) {
+ indexMapping.put(dataFieldIndex, tableField.name());
+ }
+ }
+
+ return indexMapping;
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
index 48efd349d..f82f08209 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
@@ -23,7 +23,6 @@ import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
/** Type information of {@link MultiTableCommittable}. */
public class MultiTableCommittableTypeInfo extends
TypeInformation<MultiTableCommittable> {
@@ -61,19 +60,8 @@ public class MultiTableCommittableTypeInfo extends
TypeInformation<MultiTableCom
@Override
public TypeSerializer<MultiTableCommittable>
createSerializer(ExecutionConfig config) {
// no copy, so that data from writer is directly going into committer
while chaining
- return new
SimpleVersionedSerializerTypeSerializerProxy<MultiTableCommittable>(
- () -> new MultiTableCommittableSerializer(new
CommitMessageSerializer())) {
- @Override
- public MultiTableCommittable copy(MultiTableCommittable from) {
- return from;
- }
-
- @Override
- public MultiTableCommittable copy(
- MultiTableCommittable from, MultiTableCommittable reuse) {
- return from;
- }
- };
+ return new
NoneCopyVersionedSerializerTypeSerializerProxy<MultiTableCommittable>(
+ () -> new MultiTableCommittableSerializer(new
CommitMessageSerializer())) {};
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoneCopyVersionedSerializerTypeSerializerProxy.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoneCopyVersionedSerializerTypeSerializerProxy.java
new file mode 100644
index 000000000..3df58eb93
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoneCopyVersionedSerializerTypeSerializerProxy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
+import org.apache.flink.util.function.SerializableSupplier;
+
+/** No copy {@link SimpleVersionedSerializerTypeSerializerProxy}. */
+public class NoneCopyVersionedSerializerTypeSerializerProxy<T>
+ extends SimpleVersionedSerializerTypeSerializerProxy<T> {
+
+ public NoneCopyVersionedSerializerTypeSerializerProxy(
+ SerializableSupplier<SimpleVersionedSerializer<T>>
serializerSupplier) {
+ super(serializerSupplier);
+ }
+
+ @Override
+ public T copy(T from) {
+ return from;
+ }
+
+ @Override
+ public T copy(T from, T reuse) {
+ return from;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileIndexScanSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileIndexScanSource.java
new file mode 100644
index 000000000..fa83aad83
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileIndexScanSource.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestEntrySerializer;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** Bounded {@link FlinkSource} for reading records. It does not monitor new
snapshots. */
+public class FileIndexScanSource
+ implements Source<
+ ManifestEntry, FileIndexScanSource.Split,
FileIndexScanSource.CheckpointState> {
+
+ private static final long serialVersionUID = 2319102734891237489L;
+
+ private final FileStoreTable table;
+ @Nullable private final Predicate partitionPredicate;
+
+ public FileIndexScanSource(FileStoreTable table, @Nullable Predicate
partitionPredicate) {
+ this.table = table;
+ this.partitionPredicate = partitionPredicate;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SplitEnumerator<Split, CheckpointState> createEnumerator(
+ SplitEnumeratorContext<Split> splitEnumeratorContext) throws
Exception {
+ List<ManifestEntry> manifestEntries =
+
table.store().newScan().withPartitionFilter(partitionPredicate).plan().files();
+ return new ManifestFileSplitEnumerator(
+ splitEnumeratorContext,
+
manifestEntries.stream().map(Split::new).collect(Collectors.toList()));
+ }
+
+ @Override
+ public SplitEnumerator<Split, CheckpointState> restoreEnumerator(
+ SplitEnumeratorContext<Split> splitEnumeratorContext,
CheckpointState checkpointState)
+ throws Exception {
+ return new ManifestFileSplitEnumerator(splitEnumeratorContext,
checkpointState.files());
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Split> getSplitSerializer() {
+ return new SplitSerder();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<CheckpointState>
getEnumeratorCheckpointSerializer() {
+ return new CheckpointSerde();
+ }
+
+ @Override
+ public SourceReader<ManifestEntry, Split> createReader(SourceReaderContext
sourceReaderContext)
+ throws Exception {
+ return new Reader(sourceReaderContext);
+ }
+
+ /** State for splits. */
+ public static class CheckpointState {
+
+ private final List<Split> files;
+
+ public CheckpointState(List<Split> files) {
+ this.files = files;
+ }
+
+ public List<Split> files() {
+ return files;
+ }
+ }
+
+ /** Enumerator to generate splits. */
+ private static class ManifestFileSplitEnumerator
+ implements SplitEnumerator<Split, CheckpointState> {
+
+ private final SplitEnumeratorContext<Split> splitEnumeratorContext;
+ private final List<Split> files;
+
+ public ManifestFileSplitEnumerator(
+ SplitEnumeratorContext<Split> splitEnumeratorContext,
List<Split> files) {
+ this.splitEnumeratorContext = splitEnumeratorContext;
+ this.files = files;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void handleSplitRequest(int i, @Nullable String s) {
+ if (!files.isEmpty()) {
+ splitEnumeratorContext.assignSplit(files.remove(0), i);
+ } else {
+ splitEnumeratorContext.signalNoMoreSplits(i);
+ }
+ }
+
+ @Override
+ public void addSplitsBack(List<Split> list, int i) {
+ files.addAll(list);
+ }
+
+ @Override
+ public void addReader(int i) {}
+
+ @Override
+ public CheckpointState snapshotState(long l) throws Exception {
+ return new CheckpointState(files);
+ }
+
+ @Override
+ public void close() throws IOException {}
+ }
+
+ /** Split to wrap ManifestEntry. */
+ public static class Split implements SourceSplit {
+
+ private final ManifestEntry manifestEntry;
+
+ public Split(ManifestEntry manifestEntry) {
+ this.manifestEntry = manifestEntry;
+ }
+
+ @Override
+ public String splitId() {
+ return "splitId";
+ }
+
+ ManifestEntry entry() {
+ return manifestEntry;
+ }
+ }
+
+ private static class SplitSerder implements
SimpleVersionedSerializer<Split> {
+
+ private static final ManifestEntrySerializer manifestEntrySerializer =
+ new ManifestEntrySerializer();
+
+ public SplitSerder() {}
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(Split sourceSplit) throws IOException {
+ return
manifestEntrySerializer.serializeToBytes(sourceSplit.entry());
+ }
+
+ @Override
+ public Split deserialize(int i, byte[] bytes) throws IOException {
+ return new
Split(manifestEntrySerializer.deserializeFromBytes(bytes));
+ }
+ }
+
+ private static class CheckpointSerde implements
SimpleVersionedSerializer<CheckpointState> {
+
+ private final SplitSerder splitSerder = new SplitSerder();
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(CheckpointState checkpointState) throws
IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ DataOutput dataOutput = new
DataOutputStream(byteArrayOutputStream);
+ List<Split> files = checkpointState.files();
+ dataOutput.writeInt(files.size());
+ for (Split file : files) {
+ byte[] b = splitSerder.serialize(file);
+ dataOutput.writeInt(b.length);
+ dataOutput.write(b);
+ }
+ return new byte[0];
+ }
+
+ @Override
+ public CheckpointState deserialize(int i, byte[] bytes) throws
IOException {
+ ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(bytes);
+ DataInput dataInput = new DataInputStream(byteArrayInputStream);
+ int size = dataInput.readInt();
+ List<Split> files = new ArrayList<>();
+ for (int j = 0; j < size; j++) {
+ byte[] b = new byte[dataInput.readInt()];
+ dataInput.readFully(b);
+ files.add(splitSerder.deserialize(0, b));
+ }
+ return new CheckpointState(files);
+ }
+ }
+
+ /** Reader for data metafile split. */
+ private static class Reader implements SourceReader<ManifestEntry, Split> {
+
+ private final SourceReaderContext context;
+ private final ArrayDeque<Split> splits;
+
+ private boolean noMore;
+
+ public Reader(SourceReaderContext sourceReaderContext) {
+ this.context = sourceReaderContext;
+ this.splits = new ArrayDeque<>();
+ }
+
+ @Override
+ public void start() {
+ context.sendSplitRequest();
+ }
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<ManifestEntry> readerOutput)
throws Exception {
+ if (!splits.isEmpty()) {
+ readerOutput.collect(splits.poll().entry());
+ if (!noMore && splits.isEmpty()) {
+ context.sendSplitRequest();
+ }
+ if (!splits.isEmpty()) {
+ return InputStatus.MORE_AVAILABLE;
+ }
+ }
+ return noMore ? InputStatus.END_OF_INPUT :
InputStatus.NOTHING_AVAILABLE;
+ }
+
+ @Override
+ public List<Split> snapshotState(long l) {
+ return new ArrayList<>(splits);
+ }
+
+ @Override
+ public CompletableFuture<Void> isAvailable() {
+ return splits.isEmpty()
+ ? CompletableFuture.completedFuture(null)
+ : FutureCompletingBlockingQueue.AVAILABLE;
+ }
+
+ @Override
+ public void addSplits(List<Split> list) {
+ splits.addAll(list);
+ }
+
+ @Override
+ public void notifyNoMoreSplits() {
+ noMore = true;
+ }
+
+ @Override
+ public void close() throws Exception {}
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 847390dac..2fe8403dc 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -32,6 +32,7 @@ org.apache.paimon.flink.action.QueryServiceActionFactory
### procedure factories
org.apache.paimon.flink.procedure.CompactDatabaseProcedure
org.apache.paimon.flink.procedure.CompactProcedure
+org.apache.paimon.flink.procedure.FileIndexProcedure
org.apache.paimon.flink.procedure.CreateTagProcedure
org.apache.paimon.flink.procedure.DeleteTagProcedure
org.apache.paimon.flink.procedure.CreateBranchProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FileIndexProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FileIndexProcedureITCase.java
new file mode 100644
index 000000000..1fbbb5869
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FileIndexProcedureITCase.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexFormat;
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/** IT Case for {@link FileIndexProcedure}. */
+public class FileIndexProcedureITCase extends CatalogITCaseBase {
+
+ @Test
+ public void testFileIndexProcedureAddIndex() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k INT,"
+ + " v STRING,"
+ + " hh INT,"
+ + " dt STRING"
+ + ") PARTITIONED BY (dt, hh) WITH ("
+ + " 'write-only' = 'true',"
+ + " 'bucket' = '-1'"
+ + ")");
+
+ sql(
+ "INSERT INTO T VALUES (1, '100', 15, '20221208'), (1, '100',
16, '20221208'), (1, '100', 15, '20221209')");
+
+ tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+ sql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='k,v')");
+ sql("CALL sys.file_index_rewrite('default.T')");
+
+ FileStoreTable table = paimonTable("T");
+ List<ManifestEntry> list = table.store().newScan().plan().files();
+
+ for (ManifestEntry entry : list) {
+ List<String> extraFiles =
+ entry.file().extraFiles().stream()
+ .filter(s ->
s.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+ .collect(Collectors.toList());
+
+ Assertions.assertThat(extraFiles.size()).isEqualTo(1);
+
+ String file = extraFiles.get(0);
+
+ Path indexFilePath =
+ table.store()
+ .pathFactory()
+ .createDataFilePathFactory(entry.partition(),
entry.bucket())
+ .toPath(file);
+ try (FileIndexFormat.Reader reader =
+ FileIndexFormat.createReader(
+ table.fileIO().newInputStream(indexFilePath),
table.rowType())) {
+ Set<FileIndexReader> readerSetK = reader.readColumnIndex("k");
+ Assertions.assertThat(readerSetK.size()).isEqualTo(1);
+
+ Predicate predicateK = new
PredicateBuilder(table.rowType()).equal(0, 1);
+ for (FileIndexReader fileIndexReader : readerSetK) {
+
Assertions.assertThat(predicateK.visit(fileIndexReader).remain()).isTrue();
+ }
+
+ predicateK = new PredicateBuilder(table.rowType()).equal(0, 4);
+ for (FileIndexReader fileIndexReader : readerSetK) {
+
Assertions.assertThat(predicateK.visit(fileIndexReader).remain()).isFalse();
+ }
+
+ Set<FileIndexReader> readerSetV = reader.readColumnIndex("v");
+ Assertions.assertThat(readerSetV.size()).isEqualTo(1);
+
+ Predicate predicateV =
+ new PredicateBuilder(table.rowType())
+ .equal(1, BinaryString.fromString("100"));
+ for (FileIndexReader fileIndexReader : readerSetV) {
+
Assertions.assertThat(predicateV.visit(fileIndexReader).remain()).isTrue();
+ }
+
+ predicateV =
+ new PredicateBuilder(table.rowType())
+ .equal(1, BinaryString.fromString("101"));
+ for (FileIndexReader fileIndexReader : readerSetV) {
+
Assertions.assertThat(predicateV.visit(fileIndexReader).remain()).isFalse();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testFileIndexProcedureSchemaEvolution() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k INT,"
+ + " v STRING,"
+ + " hh INT,"
+ + " dt STRING"
+ + ") PARTITIONED BY (dt, hh) WITH ("
+ + " 'write-only' = 'true',"
+ + " 'file.format' = 'avro',"
+ + " 'bucket' = '-1'"
+ + ")");
+
+ sql(
+ "INSERT INTO T VALUES (1, '10', 15, '20221208'), (4, '100',
16, '20221208'), (5, '1000', 15, '20221209')");
+
+ sql("ALTER TABLE T RENAME `k` TO order_id");
+
+ sql(
+ "INSERT INTO T VALUES (1, '10', 15, '20221208'), (4, '100',
16, '20221208'), (5, '1000', 15, '20221209')");
+
+ FileStoreTable table = paimonTable("T");
+
+ Predicate predicateK = new PredicateBuilder(table.rowType()).equal(0,
2);
+ Predicate predicateV =
+ new PredicateBuilder(table.rowType()).equal(1,
BinaryString.fromString("101"));
+ RecordReader<InternalRow> reader =
+ table.newRead()
+ .withFilter(PredicateBuilder.and(predicateK,
predicateV))
+ .createReader(table.newScan().plan());
+ AtomicInteger count = new AtomicInteger(0);
+ reader.forEachRemaining(r -> count.incrementAndGet());
+
+ // parquet format predicate would not reduce record read from file
+ Assertions.assertThat(count.get()).isEqualTo(6);
+
+ tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+ sql("ALTER TABLE T SET
('file-index.bloom-filter.columns'='order_id,v')");
+ sql("CALL sys.file_index_rewrite('default.T')");
+
+ reader =
+ table.newRead()
+ .withFilter(PredicateBuilder.and(predicateK,
predicateV))
+ .createReader(table.newScan().plan());
+ count.set(0);
+ reader.forEachRemaining(r -> count.incrementAndGet());
+
+ // the whole file is filtered, none record left
+ Assertions.assertThat(count.get()).isEqualTo(0);
+ }
+
+ @Test
+ public void testFileIndexProcedureDropIndex() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k INT,"
+ + " v STRING,"
+ + " hh INT,"
+ + " dt STRING"
+ + ") PARTITIONED BY (dt, hh) WITH ("
+ + " 'write-only' = 'true',"
+ + " 'file-index.bloom-filter.columns' = 'k,v',"
+ + " 'bucket' = '-1'"
+ + ")");
+
+ sql(
+ "INSERT INTO T VALUES (1, '100', 15, '20221208'), (1, '100',
16, '20221208'), (1, '100', 15, '20221209')");
+
+ tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+ sql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='k')");
+ sql("CALL sys.file_index_rewrite('default.T')");
+
+ FileStoreTable table = paimonTable("T");
+ List<ManifestEntry> list = table.store().newScan().plan().files();
+
+ for (ManifestEntry entry : list) {
+ List<String> extraFiles =
+ entry.file().extraFiles().stream()
+ .filter(s ->
s.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+ .collect(Collectors.toList());
+
+ Assertions.assertThat(extraFiles.size()).isEqualTo(1);
+
+ String file = extraFiles.get(0);
+
+ Path indexFilePath =
+ table.store()
+ .pathFactory()
+ .createDataFilePathFactory(entry.partition(),
entry.bucket())
+ .toPath(file);
+ try (FileIndexFormat.Reader reader =
+ FileIndexFormat.createReader(
+ table.fileIO().newInputStream(indexFilePath),
table.rowType())) {
+ Set<FileIndexReader> readerSetK = reader.readColumnIndex("v");
+
+ Assertions.assertThat(readerSetK.size()).isEqualTo(0);
+ }
+ }
+
+ sql("ALTER TABLE T RESET ('file-index.bloom-filter.columns')");
+
+ sql("CALL sys.file_index_rewrite('default.T')");
+
+ table = paimonTable("T");
+ list = table.store().newScan().plan().files();
+ for (ManifestEntry entry : list) {
+ List<String> extraFiles =
+ entry.file().extraFiles().stream()
+ .filter(s ->
s.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+ .collect(Collectors.toList());
+
+ Assertions.assertThat(extraFiles.size()).isEqualTo(0);
+ }
+ }
+}