This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e155308fe [flink] Adjust file index rewriter
e155308fe is described below
commit e155308fe407cec515ac38bf50eac4501039bea0
Author: Jingsong <[email protected]>
AuthorDate: Thu Jun 20 14:49:01 2024 +0800
[flink] Adjust file index rewriter
---
docs/content/append-table/file-index.md | 61 +++++++++++
docs/content/concepts/specification.md | 34 +-----
docs/content/flink/procedures.md | 10 +-
...leIndexWriter.java => DataFileIndexWriter.java} | 98 +++++++++--------
.../org/apache/paimon/io/DataFilePathFactory.java | 10 +-
.../org/apache/paimon/io/RowDataFileWriter.java | 24 ++--
.../paimon/manifest/ManifestEntrySerializer.java | 18 ---
.../org/apache/paimon/utils/ObjectSerializer.java | 13 +++
...ocedure.java => RewriteFileIndexProcedure.java} | 40 +++----
...ileIndexSink.java => RewriteFileIndexSink.java} | 122 ++++++++++++---------
...ScanSource.java => RewriteFileIndexSource.java} | 10 +-
.../services/org.apache.paimon.factories.Factory | 2 +-
...e.java => RewriteFileIndexProcedureITCase.java} | 12 +-
13 files changed, 249 insertions(+), 205 deletions(-)
diff --git a/docs/content/append-table/file-index.md
b/docs/content/append-table/file-index.md
new file mode 100644
index 000000000..4bf769d6e
--- /dev/null
+++ b/docs/content/append-table/file-index.md
@@ -0,0 +1,61 @@
+---
+title: "File Index"
+weight: 4
+type: docs
+aliases:
+- /append-table/file-index.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Data File Index
+
+Define `file-index.bloom-filter.columns`, Paimon will create its corresponding
index file for each file. If the index
+file is too small, it will be stored directly in the manifest, or in the
directory of the data file. Each data file
+corresponds to an index file, which has a separate file definition and can
contain different types of indexes with
+multiple columns.
+
+## Concept
+
+Data file index is an external index file corresponding to a certain data
file. If the index file is too small, it will
+be stored directly in the manifest, otherwise in the directory of the data
file. Each data file corresponds to an index file,
+which has a separate file definition and can contain different types of
indexes with multiple columns.
+
+## Usage
+
+Different file index may be efficient in different scenario. For example bloom
filter may speed up query in point lookup
+scenario. Using a bitmap may consume more space but can result in greater
accuracy. Though we only realize bloom filter
+currently, but other types of index will be supported in the future.
+
+Currently, file index is only supported in append-only table.
+
+`Bloom Filter`:
+* `file-index.bloom-filter.columns`: specify the columns that need bloom
filter index.
+* `file-index.bloom-filter.<column_name>.fpp` to config false positive
probability.
+* `file-index.bloom-filter.<column_name>.items` to config the expected
distinct items in one data file.
+
+More filter types will be supported...
+
+## Procedure
+
+If you want to add file index to existing table, without any rewrite, you can
use `rewrite_file_index` procedure. Before
+we use the procedure, you should config appropriate configurations in target
table. You can use ALTER clause to config
+`file-index.<filter-type>.columns` to the table.
+
+How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures"
>}})
diff --git a/docs/content/concepts/specification.md
b/docs/content/concepts/specification.md
index 8463b51ed..3116f2eed 100644
--- a/docs/content/concepts/specification.md
+++ b/docs/content/concepts/specification.md
@@ -247,33 +247,7 @@ Global Index is in the index directory, currently, only
two places will use glob
## Data File Index
-### Concept
-
-Data file index is an external index file corresponding to a certain data
file. If the index file is too small, it will
-be stored directly in the manifest, otherwise in the directory of the data
file. Each data file corresponds to an index file,
-which has a separate file definition and can contain different types of
indexes with multiple columns.
-
-### Usage
-
-Different file index may be efficient in different scenario. For example bloom
filter may speed up query in point lookup
-scenario. Using a bitmap may consume more space but can result in greater
accuracy. Though we only realize bloom filter
-currently, but other types of index will be supported in the future.
-
-Currently, file index is only supported in append-only table.
-
-`Bloom Filter`:
-* `file-index.bloom-filter.columns`: specify the columns that need bloom
filter index.
-* `file-index.bloom-filter.<column_name>.fpp` to config false positive
probability.
-* `file-index.bloom-filter.<column_name>.items` to config the expected
distinct items in one data file.
-
-
-More filter types will be supported...
-
-### Procedure
-
-If you want to add file index to existing table, without any rewrite, you can
use `file_index_rewrite` procedure. Before
-we use the procedure, you should config appropriate configurations in target
table. You can use ALTER clause to config
-`file-index.<filter-type>.columns` to the table.
-
-How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures"
>}})
-
+Define `file-index.bloom-filter.columns`, Paimon will create its corresponding
index file for each file. If the index
+file is too small, it will be stored directly in the manifest, or in the
directory of the data file. Each data file
+corresponds to an index file, which has a separate file definition and can
contain different types of indexes with
+multiple columns.
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index c855ee1bf..da7b7ff5b 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -275,20 +275,20 @@ All available procedures are listed below.
<td>CALL sys.repair('test_db.T')</td>
</tr>
<tr>
- <td>file_index_rewrite</td>
+ <td>rewrite_file_index</td>
<td>
- CALL sys.file_index_rewrite(<identifier> [,
<partitions>])<br/><br/>
+ CALL sys.rewrite_file_index(<identifier> [,
<partitions>])<br/><br/>
</td>
<td>
Rewrite the file index for the table. Argument:
<li>identifier: <databaseName>.<tableName>.</li>
- <li>partitions : partition filter.</li>
+ <li>partitions : specific partitions.</li>
</td>
<td>
-- rewrite the file index for the whole table<br/>
- CALL sys.file_index_rewrite('test_db.T')<br/><br/>
+ CALL sys.rewrite_file_index('test_db.T')<br/><br/>
-- repair all tables in a specific partition<br/>
- CALL sys.file_index_rewrite('test_db.T', 'pt=a')<br/><br/>
+ CALL sys.rewrite_file_index('test_db.T', 'pt=a')<br/><br/>
</td>
</tr>
</tbody>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java
similarity index 78%
rename from paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
rename to
paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java
index c21d5418a..83d0cf078 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexCommon;
import org.apache.paimon.fileindex.FileIndexFormat;
import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.fileindex.FileIndexWriter;
import org.apache.paimon.fileindex.FileIndexer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -45,8 +46,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-/** Index file writer. */
-public final class FileIndexWriter implements Closeable {
+/** Index file writer for a data file. */
+public final class DataFileIndexWriter implements Closeable {
public static final FileIndexResult EMPTY_RESULT =
FileIndexResult.of(null, null);
@@ -63,12 +64,12 @@ public final class FileIndexWriter implements Closeable {
private byte[] embeddedIndexBytes;
- public FileIndexWriter(
+ public DataFileIndexWriter(
FileIO fileIO,
Path path,
RowType rowType,
FileIndexOptions fileIndexOptions,
- @Nullable Map<String, String> evolutionMap) {
+ @Nullable Map<String, String> colNameMapping) {
this.fileIO = fileIO;
this.path = path;
List<DataField> fields = rowType.getFields();
@@ -82,15 +83,15 @@ public final class FileIndexWriter implements Closeable {
for (Map.Entry<FileIndexOptions.Column, Map<String, Options>> entry :
fileIndexOptions.entrySet()) {
FileIndexOptions.Column entryColumn = entry.getKey();
- String tempName = entryColumn.getColumnName();
- if (evolutionMap != null) {
- tempName = evolutionMap.getOrDefault(tempName, null);
- if (tempName == null) {
+ String colName = entryColumn.getColumnName();
+ if (colNameMapping != null) {
+ colName = colNameMapping.getOrDefault(colName, null);
+ if (colName == null) {
continue;
}
}
- final String columnName = tempName;
+ String columnName = colName;
DataField field = map.get(columnName);
if (field == null) {
throw new IllegalArgumentException(columnName + " does not
exist in column fields");
@@ -98,6 +99,7 @@ public final class FileIndexWriter implements Closeable {
for (Map.Entry<String, Options> typeEntry :
entry.getValue().entrySet()) {
String indexType = typeEntry.getKey();
+ IndexMaintainer maintainer = indexMaintainers.get(columnName);
if (entryColumn.isNestedColumn()) {
if (field.type().getTypeRoot() != DataTypeRoot.MAP) {
throw new IllegalArgumentException(
@@ -105,34 +107,36 @@ public final class FileIndexWriter implements Closeable {
+ columnName
+ " is nested column, but is not map
type. Only should map type yet.");
}
- MapType mapType = (MapType) field.type();
- ((MapFileIndexMaintainer)
- indexMaintainers.computeIfAbsent(
- columnName,
- name ->
- new MapFileIndexMaintainer(
- columnName,
- indexType,
-
mapType.getKeyType(),
-
mapType.getValueType(),
-
fileIndexOptions.getMapTopLevelOptions(
-
columnName, typeEntry.getKey()),
-
index.get(columnName))))
- .add(entryColumn.getNestedColumnName(),
typeEntry.getValue());
+ MapFileIndexMaintainer mapMaintainer =
(MapFileIndexMaintainer) maintainer;
+ if (mapMaintainer == null) {
+ MapType mapType = (MapType) field.type();
+ mapMaintainer =
+ new MapFileIndexMaintainer(
+ columnName,
+ indexType,
+ mapType.getKeyType(),
+ mapType.getValueType(),
+ fileIndexOptions.getMapTopLevelOptions(
+ columnName,
typeEntry.getKey()),
+ index.get(columnName));
+ indexMaintainers.put(columnName, mapMaintainer);
+ }
+ mapMaintainer.add(entryColumn.getNestedColumnName(),
typeEntry.getValue());
} else {
- indexMaintainers.computeIfAbsent(
- columnName,
- name ->
- new FileIndexMaintainer(
- columnName,
- indexType,
- FileIndexer.create(
- indexType,
- field.type(),
-
typeEntry.getValue())
- .createWriter(),
- InternalRow.createFieldGetter(
- field.type(),
index.get(columnName))));
+ if (maintainer == null) {
+ maintainer =
+ new FileIndexMaintainer(
+ columnName,
+ indexType,
+ FileIndexer.create(
+ indexType,
+ field.type(),
+ typeEntry.getValue())
+ .createWriter(),
+ InternalRow.createFieldGetter(
+ field.type(),
index.get(columnName)));
+ indexMaintainers.put(columnName, maintainer);
+ }
}
}
}
@@ -149,18 +153,18 @@ public final class FileIndexWriter implements Closeable {
public void close() throws IOException {
Map<String, Map<String, byte[]>> indexMaps = serializeMaintainers();
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (FileIndexFormat.Writer writer =
FileIndexFormat.createWriter(baos)) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try (FileIndexFormat.Writer writer =
FileIndexFormat.createWriter(out)) {
writer.writeColumnIndexes(indexMaps);
}
- if (baos.size() > inManifestThreshold) {
+ if (out.size() > inManifestThreshold) {
try (OutputStream outputStream = fileIO.newOutputStream(path,
true)) {
- outputStream.write(baos.toByteArray());
+ outputStream.write(out.toByteArray());
}
resultFileName = path.getName();
} else {
- embeddedIndexBytes = baos.toByteArray();
+ embeddedIndexBytes = out.toByteArray();
}
}
@@ -182,21 +186,21 @@ public final class FileIndexWriter implements Closeable {
}
@Nullable
- public static FileIndexWriter create(
+ public static DataFileIndexWriter create(
FileIO fileIO, Path path, RowType rowType, FileIndexOptions
fileIndexOptions) {
return create(fileIO, path, rowType, fileIndexOptions, null);
}
@Nullable
- public static FileIndexWriter create(
+ public static DataFileIndexWriter create(
FileIO fileIO,
Path path,
RowType rowType,
FileIndexOptions fileIndexOptions,
- @Nullable Map<String, String> evolutionMap) {
+ @Nullable Map<String, String> colNameMapping) {
return fileIndexOptions.isEmpty()
? null
- : new FileIndexWriter(fileIO, path, rowType, fileIndexOptions,
evolutionMap);
+ : new DataFileIndexWriter(fileIO, path, rowType,
fileIndexOptions, colNameMapping);
}
/** File index result. */
@@ -238,13 +242,13 @@ public final class FileIndexWriter implements Closeable {
private final String columnName;
private final String indexType;
- private final org.apache.paimon.fileindex.FileIndexWriter
fileIndexWriter;
+ private final FileIndexWriter fileIndexWriter;
private final InternalRow.FieldGetter getter;
public FileIndexMaintainer(
String columnName,
String indexType,
- org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter,
+ FileIndexWriter fileIndexWriter,
InternalRow.FieldGetter getter) {
this.columnName = columnName;
this.indexType = indexType;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index f7da2760e..6c7090f3d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -72,11 +72,11 @@ public class DataFilePathFactory {
return uuid;
}
- public static Path toFileIndexPath(Path filePath) {
- return new Path(filePath.getParent(), filePath.getName() +
INDEX_PATH_SUFFIX);
+ public static Path dataFileToFileIndexPath(Path dataFilePath) {
+ return new Path(dataFilePath.getParent(), dataFilePath.getName() +
INDEX_PATH_SUFFIX);
}
- public static Path fileIndexPathIncrease(Path filePath) {
+ public static Path createNewFileIndexFilePath(Path filePath) {
String fileName = filePath.getName();
int dot = fileName.lastIndexOf(".");
int dash = fileName.lastIndexOf("-");
@@ -87,8 +87,8 @@ public class DataFilePathFactory {
return new Path(
filePath.getParent(),
fileName.substring(0, dash + 1) + (num + 1) +
INDEX_PATH_SUFFIX);
- } catch (NumberFormatException e) {
- // ignore
+ } catch (NumberFormatException ignore) {
+ // it is the first index file, has no number
}
}
return new Path(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 3da909817..ae18768c9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -37,7 +37,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.function.Function;
-import static org.apache.paimon.io.DataFilePathFactory.toFileIndexPath;
+import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath;
/**
* A {@link StatsCollectingSingleFileWriter} to write data files containing
{@link InternalRow}.
@@ -48,7 +48,7 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
private final long schemaId;
private final LongCounter seqNumCounter;
private final SimpleStatsConverter statsArraySerializer;
- @Nullable private final FileIndexWriter fileIndexWriter;
+ @Nullable private final DataFileIndexWriter dataFileIndexWriter;
private final FileSource fileSource;
public RowDataFileWriter(
@@ -75,9 +75,9 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
this.schemaId = schemaId;
this.seqNumCounter = seqNumCounter;
this.statsArraySerializer = new SimpleStatsConverter(writeSchema);
- this.fileIndexWriter =
- FileIndexWriter.create(
- fileIO, toFileIndexPath(path), writeSchema,
fileIndexOptions);
+ this.dataFileIndexWriter =
+ DataFileIndexWriter.create(
+ fileIO, dataFileToFileIndexPath(path), writeSchema,
fileIndexOptions);
this.fileSource = fileSource;
}
@@ -85,16 +85,16 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
public void write(InternalRow row) throws IOException {
super.write(row);
// add row to index if needed
- if (fileIndexWriter != null) {
- fileIndexWriter.write(row);
+ if (dataFileIndexWriter != null) {
+ dataFileIndexWriter.write(row);
}
seqNumCounter.add(1L);
}
@Override
public void close() throws IOException {
- if (fileIndexWriter != null) {
- fileIndexWriter.close();
+ if (dataFileIndexWriter != null) {
+ dataFileIndexWriter.close();
}
super.close();
}
@@ -102,8 +102,10 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
@Override
public DataFileMeta result() throws IOException {
SimpleStats stats = statsArraySerializer.toBinary(fieldStats());
- FileIndexWriter.FileIndexResult indexResult =
- fileIndexWriter == null ? FileIndexWriter.EMPTY_RESULT :
fileIndexWriter.result();
+ DataFileIndexWriter.FileIndexResult indexResult =
+ dataFileIndexWriter == null
+ ? DataFileIndexWriter.EMPTY_RESULT
+ : dataFileIndexWriter.result();
return DataFileMeta.forAppend(
path.getName(),
fileIO.getFileSize(path),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
index 6d3e1a961..83c2be59a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
@@ -23,13 +23,8 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
-import org.apache.paimon.io.DataInputViewStreamWrapper;
-import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.utils.VersionedObjectSerializer;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.util.function.Function;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
@@ -82,19 +77,6 @@ public class ManifestEntrySerializer extends
VersionedObjectSerializer<ManifestE
dataFileMetaSerializer.fromRow(row.getRow(4,
dataFileMetaSerializer.numFields())));
}
- public byte[] serializeToBytes(ManifestEntry manifestEntry) throws
IOException {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
- serialize(manifestEntry, view);
- return out.toByteArray();
- }
-
- public ManifestEntry deserializeFromBytes(byte[] bytes) throws IOException
{
- ByteArrayInputStream in = new ByteArrayInputStream(bytes);
- DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
- return deserialize(view);
- }
-
public static Function<InternalRow, BinaryRow> partitionGetter() {
return row -> deserializeBinaryRow(row.getBinary(2));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java
index 329dee230..a83d80549 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java
@@ -112,6 +112,19 @@ public abstract class ObjectSerializer<T> implements
Serializable {
return deserializeList(view);
}
+ public byte[] serializeToBytes(T record) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
+ serialize(record, view);
+ return out.toByteArray();
+ }
+
+ public T deserializeFromBytes(byte[] bytes) throws IOException {
+ ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+ DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+ return deserialize(view);
+ }
+
/** Convert a {@link T} to {@link InternalRow}. */
public abstract InternalRow toRow(T record);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FileIndexProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java
similarity index 83%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FileIndexProcedure.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java
index 21582ae28..a85094b9a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FileIndexProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java
@@ -20,9 +20,9 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.sink.FileIndexSink;
import
org.apache.paimon.flink.sink.NoneCopyVersionedSerializerTypeSerializerProxy;
-import org.apache.paimon.flink.source.FileIndexScanSource;
+import org.apache.paimon.flink.sink.RewriteFileIndexSink;
+import org.apache.paimon.flink.source.RewriteFileIndexSource;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.predicate.Predicate;
@@ -46,12 +46,12 @@ import java.util.Map;
import static org.apache.paimon.utils.ParameterUtils.getPartitions;
-/** Migrate procedure to migrate hive table to paimon table. */
-public class FileIndexProcedure extends ProcedureBase {
+/** Rewrite file index procedure to re-generated all file index. */
+public class RewriteFileIndexProcedure extends ProcedureBase {
@Override
public String identifier() {
- return "file_index_rewrite";
+ return "rewrite_file_index";
}
public String[] call(ProcedureContext procedureContext, String
sourceTablePath)
@@ -89,30 +89,20 @@ public class FileIndexProcedure extends ProcedureBase {
partitionPredicate = null;
}
- TopoBuilder.build(env, (FileStoreTable) table, partitionPredicate);
-
+ FileStoreTable storeTable = (FileStoreTable) table;
+ DataStreamSource<ManifestEntry> source =
+ env.fromSource(
+ new RewriteFileIndexSource(storeTable,
partitionPredicate),
+ WatermarkStrategy.noWatermarks(),
+ "index source",
+ new ManifestEntryTypeInfo());
+ new RewriteFileIndexSink(storeTable).sinkFrom(source);
return execute(env, "Add file index for table: " + sourceTablePath);
}
- private static class TopoBuilder {
-
- public static void build(
- StreamExecutionEnvironment env,
- FileStoreTable table,
- Predicate partitionPredicate) {
- DataStreamSource<ManifestEntry> source =
- env.fromSource(
- new FileIndexScanSource(table, partitionPredicate),
- WatermarkStrategy.noWatermarks(),
- "index source",
- new TypeInfo());
- new FileIndexSink(table).sinkFrom(source);
- }
- }
-
- private static class TypeInfo extends GenericTypeInfo<ManifestEntry> {
+ private static class ManifestEntryTypeInfo extends
GenericTypeInfo<ManifestEntry> {
- public TypeInfo() {
+ public ManifestEntryTypeInfo() {
super(ManifestEntry.class);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileIndexSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
similarity index 78%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileIndexSink.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
index a69f3f23c..39dcca03c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileIndexSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
@@ -23,14 +23,14 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexCommon;
import org.apache.paimon.fileindex.FileIndexFormat;
import org.apache.paimon.fileindex.FileIndexOptions;
-import org.apache.paimon.flink.procedure.FileIndexProcedure;
+import org.apache.paimon.flink.procedure.RewriteFileIndexProcedure;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileIndexWriter;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.FileIndexWriter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
@@ -45,13 +45,14 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
-import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import javax.annotation.Nullable;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -64,13 +65,13 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import static org.apache.paimon.io.DataFilePathFactory.fileIndexPathIncrease;
-import static org.apache.paimon.io.DataFilePathFactory.toFileIndexPath;
+import static
org.apache.paimon.io.DataFilePathFactory.createNewFileIndexFilePath;
+import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath;
-/** File index sink for {@link FileIndexProcedure}. */
-public class FileIndexSink extends FlinkWriteSink<ManifestEntry> {
+/** File index sink for {@link RewriteFileIndexProcedure}. */
+public class RewriteFileIndexSink extends FlinkWriteSink<ManifestEntry> {
- public FileIndexSink(FileStoreTable table) {
+ public RewriteFileIndexSink(FileStoreTable table) {
super(table, null);
}
@@ -84,6 +85,8 @@ public class FileIndexSink extends
FlinkWriteSink<ManifestEntry> {
private static class FileIndexModificationOperator
extends PrepareCommitOperator<ManifestEntry, Committable> {
+ private static final long serialVersionUID = 1L;
+
private final FileStoreTable table;
private transient FileIndexProcessor fileIndexProcessor;
@@ -145,7 +148,7 @@ public class FileIndexSink extends
FlinkWriteSink<ManifestEntry> {
private final FileIO fileIO;
private final FileStorePathFactory pathFactory;
private final Map<Pair<BinaryRow, Integer>, DataFilePathFactory>
dataFilePathFactoryMap;
- private final SchemaCache schemaCache;
+ private final SchemaCache schemaInfoCache;
private final long sizeInMeta;
public FileIndexProcessor(FileStoreTable table) {
@@ -154,7 +157,7 @@ public class FileIndexSink extends
FlinkWriteSink<ManifestEntry> {
this.fileIO = table.fileIO();
this.pathFactory = table.store().pathFactory();
this.dataFilePathFactoryMap = new HashMap<>();
- this.schemaCache =
+ this.schemaInfoCache =
new SchemaCache(fileIndexOptions, new
SchemaManager(fileIO, table.location()));
this.sizeInMeta =
table.coreOptions().fileIndexInManifestThreshold();
}
@@ -166,13 +169,7 @@ public class FileIndexSink extends
FlinkWriteSink<ManifestEntry> {
Pair.of(partition, bucket),
p ->
pathFactory.createDataFilePathFactory(partition, bucket));
- Tuple4<RowType, Map<String, String>, int[], Map<String,
Set<String>>> t4 =
- schemaCache.schemaInfo(dataFileMeta.schemaId());
- RowType fileSchema = t4.f0;
- Map<String, String> evolutionNameMap = t4.f1;
- int[] projection = t4.f2;
- Map<String, Set<String>> expectedFileIndex = t4.f3;
-
+ SchemaInfo schemaInfo =
schemaInfoCache.schemaInfo(dataFileMeta.schemaId());
List<String> extras = new ArrayList<>(dataFileMeta.extraFiles());
List<String> indexFiles =
dataFileMeta.extraFiles().stream()
@@ -188,20 +185,22 @@ public class FileIndexSink extends
FlinkWriteSink<ManifestEntry> {
try (FileIndexFormat.Reader indexReader =
FileIndexFormat.createReader(
fileIO.newInputStream(dataFilePathFactory.toPath(indexFile)),
- fileSchema)) {
+ schemaInfo.fileSchema)) {
maintainers = indexReader.readAll();
}
- newIndexPath =
fileIndexPathIncrease(dataFilePathFactory.toPath(indexFile));
+ newIndexPath =
createNewFileIndexFilePath(dataFilePathFactory.toPath(indexFile));
} else {
maintainers = new HashMap<>();
- newIndexPath =
toFileIndexPath(dataFilePathFactory.toPath(dataFileMeta.fileName()));
+ newIndexPath =
+ dataFileToFileIndexPath(
+
dataFilePathFactory.toPath(dataFileMeta.fileName()));
}
// remove unnecessary
for (Map.Entry<String, Map<String, byte[]>> entry :
new HashSet<>(maintainers.entrySet())) {
String name = entry.getKey();
- if (!expectedFileIndex.containsKey(name)) {
+ if (!schemaInfo.projectedColFullNames.contains(name)) {
maintainers.remove(name);
} else {
Map<String, byte[]> indexTypeBytes = maintainers.get(name);
@@ -213,18 +212,19 @@ public class FileIndexSink extends
FlinkWriteSink<ManifestEntry> {
}
}
- // ignore close
- FileIndexWriter fileIndexWriter =
- FileIndexWriter.create(
+ // ignore close, do not close to write file, only collect
serialized maintainers
+ @SuppressWarnings("resource")
+ DataFileIndexWriter dataFileIndexWriter =
+ DataFileIndexWriter.create(
fileIO,
newIndexPath,
- fileSchema.project(projection),
+
schemaInfo.fileSchema.project(schemaInfo.projectedIndexCols),
fileIndexOptions,
- evolutionNameMap);
- if (fileIndexWriter != null) {
+ schemaInfo.colNameMapping);
+ if (dataFileIndexWriter != null) {
try (RecordReader<InternalRow> reader =
table.newReadBuilder()
- .withProjection(projection)
+ .withProjection(schemaInfo.projectedIndexCols)
.newRead()
.createReader(
DataSplit.builder()
@@ -238,10 +238,10 @@ public class FileIndexSink extends
FlinkWriteSink<ManifestEntry> {
Collections.singletonList(dataFileMeta))
.rawConvertible(true)
.build())) {
- reader.forEachRemaining(fileIndexWriter::write);
+ reader.forEachRemaining(dataFileIndexWriter::write);
}
- fileIndexWriter
+ dataFileIndexWriter
.serializeMaintainers()
.forEach(
(key, value) ->
@@ -276,38 +276,37 @@ public class FileIndexSink extends
FlinkWriteSink<ManifestEntry> {
private final FileIndexOptions fileIndexOptions;
private final SchemaManager schemaManager;
- private final TableSchema latest;
- private final Map<
- Long, Tuple4<RowType, Map<String, String>, int[],
Map<String, Set<String>>>>
- schemaInfos;
-
+ private final TableSchema currentSchema;
+ private final Map<Long, SchemaInfo> schemaInfos;
private final Set<Long> fileSchemaIds;
public SchemaCache(FileIndexOptions fileIndexOptions, SchemaManager
schemaManager) {
this.fileIndexOptions = fileIndexOptions;
this.schemaManager = schemaManager;
- this.latest =
schemaManager.latest().orElseThrow(RuntimeException::new);
+ this.currentSchema =
schemaManager.latest().orElseThrow(RuntimeException::new);
this.schemaInfos = new HashMap<>();
this.fileSchemaIds = new HashSet<>();
}
- public Tuple4<RowType, Map<String, String>, int[], Map<String,
Set<String>>> schemaInfo(
- long schemaId) {
+ public SchemaInfo schemaInfo(long schemaId) {
if (!fileSchemaIds.contains(schemaId)) {
RowType fileSchema =
schemaManager.schema(schemaId).logicalRowType();
- Map<String, String> evolutionmap =
- schemaId == latest.id()
+
+ @Nullable
+ Map<String, String> colNameMapping =
+ schemaId == currentSchema.id()
? null
- : createIndexNameMapping(latest.fields(),
fileSchema.getFields());
+ : createIndexNameMapping(
+ currentSchema.fields(),
fileSchema.getFields());
- List<String> projectedColumnNames = new ArrayList<>();
- Map<String, Set<String>> expectedIndexNameType = new
HashMap<>();
+ List<String> projectedColNames = new ArrayList<>();
+ Set<String> projectedColFullNames = new HashSet<>();
for (Map.Entry<FileIndexOptions.Column, Map<String, Options>>
entry :
fileIndexOptions.entrySet()) {
FileIndexOptions.Column column = entry.getKey();
String columnName;
- if (evolutionmap != null) {
- columnName =
evolutionmap.getOrDefault(column.getColumnName(), null);
+ if (colNameMapping != null) {
+ columnName =
colNameMapping.getOrDefault(column.getColumnName(), null);
// if column name has no corresponding field, then we
just skip it
if (columnName == null) {
continue;
@@ -315,26 +314,24 @@ public class FileIndexSink extends
FlinkWriteSink<ManifestEntry> {
} else {
columnName = column.getColumnName();
}
- projectedColumnNames.add(columnName);
+ projectedColNames.add(columnName);
String fullColumnName =
column.isNestedColumn()
? FileIndexCommon.toMapKey(
columnName,
column.getNestedColumnName())
: column.getColumnName();
- expectedIndexNameType
- .computeIfAbsent(fullColumnName, name -> new
HashSet<>())
- .addAll(entry.getValue().keySet());
+ projectedColFullNames.add(fullColumnName);
}
schemaInfos.put(
schemaId,
- Tuple4.of(
+ new SchemaInfo(
fileSchema,
- evolutionmap,
- projectedColumnNames.stream()
+ colNameMapping,
+ projectedColNames.stream()
.mapToInt(fileSchema::getFieldIndex)
.toArray(),
- expectedIndexNameType));
+ projectedColFullNames));
fileSchemaIds.add(schemaId);
}
@@ -359,4 +356,23 @@ public class FileIndexSink extends
FlinkWriteSink<ManifestEntry> {
return indexMapping;
}
}
+
+ private static class SchemaInfo {
+
+ private final RowType fileSchema;
+ private final Map<String, String> colNameMapping;
+ private final int[] projectedIndexCols;
+ private final Set<String> projectedColFullNames;
+
+ private SchemaInfo(
+ RowType fileSchema,
+ Map<String, String> colNameMapping,
+ int[] projectedIndexCols,
+ Set<String> projectedColFullNames) {
+ this.fileSchema = fileSchema;
+ this.colNameMapping = colNameMapping;
+ this.projectedIndexCols = projectedIndexCols;
+ this.projectedColFullNames = projectedColFullNames;
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileIndexScanSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/RewriteFileIndexSource.java
similarity index 96%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileIndexScanSource.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/RewriteFileIndexSource.java
index fa83aad83..9a488613f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileIndexScanSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/RewriteFileIndexSource.java
@@ -51,16 +51,18 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/** Bounded {@link FlinkSource} for reading records. It does not monitor new
snapshots. */
-public class FileIndexScanSource
+public class RewriteFileIndexSource
implements Source<
- ManifestEntry, FileIndexScanSource.Split,
FileIndexScanSource.CheckpointState> {
+ ManifestEntry,
+ RewriteFileIndexSource.Split,
+ RewriteFileIndexSource.CheckpointState> {
- private static final long serialVersionUID = 2319102734891237489L;
+ private static final long serialVersionUID = 1L;
private final FileStoreTable table;
@Nullable private final Predicate partitionPredicate;
- public FileIndexScanSource(FileStoreTable table, @Nullable Predicate
partitionPredicate) {
+ public RewriteFileIndexSource(FileStoreTable table, @Nullable Predicate
partitionPredicate) {
this.table = table;
this.partitionPredicate = partitionPredicate;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 2fe8403dc..c41539091 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -32,7 +32,7 @@ org.apache.paimon.flink.action.QueryServiceActionFactory
### procedure factories
org.apache.paimon.flink.procedure.CompactDatabaseProcedure
org.apache.paimon.flink.procedure.CompactProcedure
-org.apache.paimon.flink.procedure.FileIndexProcedure
+org.apache.paimon.flink.procedure.RewriteFileIndexProcedure
org.apache.paimon.flink.procedure.CreateTagProcedure
org.apache.paimon.flink.procedure.DeleteTagProcedure
org.apache.paimon.flink.procedure.CreateBranchProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FileIndexProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java
similarity index 96%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FileIndexProcedureITCase.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java
index 1fbbb5869..d465287ff 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FileIndexProcedureITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java
@@ -40,8 +40,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-/** IT Case for {@link FileIndexProcedure}. */
-public class FileIndexProcedureITCase extends CatalogITCaseBase {
+/** IT Case for {@link RewriteFileIndexProcedure}. */
+public class RewriteFileIndexProcedureITCase extends CatalogITCaseBase {
@Test
public void testFileIndexProcedureAddIndex() throws Exception {
@@ -61,7 +61,7 @@ public class FileIndexProcedureITCase extends
CatalogITCaseBase {
tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
sql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='k,v')");
- sql("CALL sys.file_index_rewrite('default.T')");
+ sql("CALL sys.rewrite_file_index('default.T')");
FileStoreTable table = paimonTable("T");
List<ManifestEntry> list = table.store().newScan().plan().files();
@@ -156,7 +156,7 @@ public class FileIndexProcedureITCase extends
CatalogITCaseBase {
tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
sql("ALTER TABLE T SET
('file-index.bloom-filter.columns'='order_id,v')");
- sql("CALL sys.file_index_rewrite('default.T')");
+ sql("CALL sys.rewrite_file_index('default.T')");
reader =
table.newRead()
@@ -188,7 +188,7 @@ public class FileIndexProcedureITCase extends
CatalogITCaseBase {
tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
sql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='k')");
- sql("CALL sys.file_index_rewrite('default.T')");
+ sql("CALL sys.rewrite_file_index('default.T')");
FileStoreTable table = paimonTable("T");
List<ManifestEntry> list = table.store().newScan().plan().files();
@@ -219,7 +219,7 @@ public class FileIndexProcedureITCase extends
CatalogITCaseBase {
sql("ALTER TABLE T RESET ('file-index.bloom-filter.columns')");
- sql("CALL sys.file_index_rewrite('default.T')");
+ sql("CALL sys.rewrite_file_index('default.T')");
table = paimonTable("T");
list = table.store().newScan().plan().files();