This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b3eeea91e [core] Introduce file index read/write framework. (#3177)
b3eeea91e is described below
commit b3eeea91e8e1a862b375732333768f823c3fb770
Author: YeJunHao <[email protected]>
AuthorDate: Mon Apr 15 18:54:33 2024 +0800
[core] Introduce file index read/write framework. (#3177)
---
.../shortcodes/generated/core_configuration.html | 12 ++
.../main/java/org/apache/paimon/CoreOptions.java | 97 +++++++++++
.../apache/paimon/fileindex/FileIndexFormat.java | 17 +-
.../apache/paimon/fileindex/FileIndexOptions.java | 69 ++++++++
.../bloomfilter/BloomFilterFileIndex.java | 22 +--
.../org/apache/paimon/utils/BloomFilter64.java | 42 ++++-
.../org/apache/paimon/AppendOnlyFileStore.java | 6 +-
.../java/org/apache/paimon/KeyValueFileStore.java | 3 +-
.../org/apache/paimon/append/AppendOnlyWriter.java | 9 +-
.../java/org/apache/paimon/io/DataFileMeta.java | 54 +++++-
.../apache/paimon/io/DataFileMetaSerializer.java | 6 +-
.../org/apache/paimon/io/DataFilePathFactory.java | 6 +
.../apache/paimon/io/FileIndexRecordReader.java | 90 ++++++++++
.../java/org/apache/paimon/io/FileIndexWriter.java | 194 +++++++++++++++++++++
.../org/apache/paimon/io/FileRecordReader.java | 36 ++--
.../apache/paimon/io/KeyValueDataFileWriter.java | 4 +-
.../org/apache/paimon/io/RowDataFileWriter.java | 31 +++-
.../apache/paimon/io/RowDataRollingFileWriter.java | 7 +-
.../paimon/operation/AppendOnlyFileStoreScan.java | 45 ++++-
.../paimon/operation/AppendOnlyFileStoreWrite.java | 9 +-
.../apache/paimon/operation/RawFileSplitRead.java | 94 +++++++---
.../apache/paimon/stats/FieldStatsConverters.java | 15 ++
.../AppendOnlyTableCompactionCoordinatorTest.java | 3 +-
.../apache/paimon/append/AppendOnlyWriterTest.java | 4 +-
.../paimon/crosspartition/IndexBootstrapTest.java | 3 +-
.../apache/paimon/format/FileFormatSuffixTest.java | 4 +-
.../paimon/io/DataFileTestDataGenerator.java | 3 +-
.../org/apache/paimon/io/DataFileTestUtils.java | 9 +-
.../apache/paimon/io/RollingFileWriterTest.java | 4 +-
.../ManifestCommittableSerializerTest.java | 3 +-
.../paimon/manifest/ManifestFileMetaTestBase.java | 6 +-
.../org/apache/paimon/mergetree/LevelsTest.java | 14 +-
.../mergetree/compact/IntervalPartitionTest.java | 3 +-
.../mergetree/compact/UniversalCompactionTest.java | 2 +-
.../paimon/operation/ExpireSnapshotsTest.java | 3 +-
.../paimon/table/AppendOnlyFileStoreTableTest.java | 149 ++++++++++++++++
.../paimon/table/source/SplitGeneratorTest.java | 3 +-
.../flink/UnawareBucketAppendOnlyTableITCase.java | 12 +-
.../sink/CompactionTaskSimpleSerializerTest.java | 3 +-
.../source/FileStoreSourceSplitGeneratorTest.java | 3 +-
.../source/FileStoreSourceSplitSerializerTest.java | 3 +-
.../flink/source/TestChangelogDataReadWrite.java | 3 +-
42 files changed, 994 insertions(+), 111 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 8d39919d5..4c42bbfbb 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -206,6 +206,18 @@ under the License.
<td>Boolean</td>
<td>Whether only overwrite dynamic partition when overwriting a
partitioned table with dynamic partition columns. Works only when the table has
partition keys.</td>
</tr>
+ <tr>
+ <td><h5>file-index.in-manifest-threshold</h5></td>
+ <td style="word-wrap: break-word;">500 bytes</td>
+ <td>MemorySize</td>
+ <td>The threshold to store file index bytes in manifest.</td>
+ </tr>
+ <tr>
+ <td><h5>file-index.read.enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Whether enabled read file index.</td>
+ </tr>
<tr>
<td><h5>file-reader-async-threshold</h5></td>
<td style="word-wrap: break-word;">10 mb</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 0d78d2ec8..b73133bae 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Documentation;
import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
import org.apache.paimon.annotation.Documentation.Immutable;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lookup.LookupStrategy;
@@ -70,6 +71,10 @@ public class CoreOptions implements Serializable {
public static final String DISTINCT = "distinct";
+ public static final String FILE_INDEX = "file-index";
+
+ public static final String COLUMNS = "columns";
+
public static final ConfigOption<Integer> BUCKET =
key("bucket")
.intType()
@@ -135,6 +140,18 @@ public class CoreOptions implements Serializable {
"Default file compression format, orc is lz4 and
parquet is snappy. It can be overridden by "
+ FILE_COMPRESSION_PER_LEVEL.key());
+ public static final ConfigOption<MemorySize>
FILE_INDEX_IN_MANIFEST_THRESHOLD =
+ key("file-index.in-manifest-threshold")
+ .memoryType()
+ .defaultValue(MemorySize.parse("500 B"))
+ .withDescription("The threshold to store file index bytes
in manifest.");
+
+ public static final ConfigOption<Boolean> FILE_INDEX_READ_ENABLED =
+ key("file-index.read.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether enabled read file index.");
+
public static final ConfigOption<FileFormatType> MANIFEST_FORMAT =
key("manifest.format")
.enumType(FileFormatType.class)
@@ -1703,6 +1720,86 @@ public class CoreOptions implements Serializable {
return options.get(DELETION_VECTORS_ENABLED);
}
+ public FileIndexOptions indexColumnsOptions() {
+ String fileIndexPrefix = FILE_INDEX + ".";
+ String fileIndexColumnSuffix = "." + COLUMNS;
+
+ FileIndexOptions fileIndexOptions = new
FileIndexOptions(fileIndexInManifestThreshold());
+ for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith(fileIndexPrefix)) {
+ // start with file-index, decode this option
+ if (key.endsWith(fileIndexColumnSuffix)) {
+ // if end with .column, set up indexes
+ String indexType =
+ key.substring(
+ fileIndexPrefix.length(),
+ key.length() -
fileIndexColumnSuffix.length());
+ String[] names = entry.getValue().split(",");
+ for (String name : names) {
+ if (StringUtils.isBlank(name)) {
+ throw new IllegalArgumentException(
+ "Wrong option in " + key + ", should not
have empty column");
+ }
+ fileIndexOptions.computeIfAbsent(name.trim(),
indexType);
+ }
+ } else {
+ // else, it must be an option
+ String[] kv =
key.substring(fileIndexPrefix.length()).split("\\.");
+ if (kv.length != 3) {
+ continue;
+ }
+ String indexType = kv[0];
+ String cname = kv[1];
+ String opkey = kv[2];
+
+ if (fileIndexOptions.get(cname, indexType) == null) {
+ // if indexes have not set, find .column in options,
then set them
+ String columns =
+ options.get(fileIndexPrefix + indexType +
fileIndexColumnSuffix);
+ if (columns == null) {
+ continue;
+ }
+ String[] names = columns.split(",");
+ boolean foundTarget = false;
+ for (String name : names) {
+ if (StringUtils.isBlank(name)) {
+ throw new IllegalArgumentException(
+ "Wrong option in "
+ + key
+ + ", should not have empty
column");
+ }
+ String tname = name.trim();
+ if (cname.equals(tname)) {
+ foundTarget = true;
+ }
+ fileIndexOptions.computeIfAbsent(name.trim(),
indexType);
+ }
+ if (!foundTarget) {
+ throw new IllegalArgumentException(
+ "Wrong option in "
+ + key
+ + ", can't found column "
+ + cname
+ + " in "
+ + columns);
+ }
+ }
+ fileIndexOptions.get(cname, indexType).set(opkey,
entry.getValue());
+ }
+ }
+ }
+ return fileIndexOptions;
+ }
+
+ public long fileIndexInManifestThreshold() {
+ return options.get(FILE_INDEX_IN_MANIFEST_THRESHOLD).getBytes();
+ }
+
+ public boolean fileIndexReadEnabled() {
+ return options.get(FILE_INDEX_READ_ENABLED);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row.",
true, true),
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
index ec809dff4..4f03faf90 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
@@ -46,12 +46,12 @@ import java.util.stream.Collectors;
* File index file format. Put all column and offset in the header.
*
* <pre>
- * _______________________________________ _____________________
+ * _____________________________________ _____________________
* | magic |version|head length |
* |-------------------------------------|
- * | column size |
+ * | column number |
* |-------------------------------------|
- * | column 1 | index size |
+ * | column 1 | index number |
* |-------------------------------------|
* | index name 1 |start pos |length |
* |-------------------------------------|
@@ -59,7 +59,7 @@ import java.util.stream.Collectors;
* |-------------------------------------|
* | index name 3 |start pos |length |
* |-------------------------------------| HEAD
- * | column 2 | index size |
+ * | column 2 | index number |
* |-------------------------------------|
* | index name 1 |start pos |length |
* |-------------------------------------|
@@ -82,14 +82,15 @@ import java.util.stream.Collectors;
* magic: 8 bytes long
* version: 4 bytes int
* head length: 4 bytes int
- * index type: var bytes utf (length + bytes)
- * body info size: 4 bytes int (how many column items below)
- * column name: var bytes utf
+ * column number: 4 bytes int
+ * column x: var bytes utf (length + bytes)
+ * index number: 4 bytes int (how many column items below)
+ * index name x: var bytes utf
* start pos: 4 bytes int
* length: 4 bytes int
* redundant length: 4 bytes int (for compatibility with later
versions, in this version, content is zero)
* redundant bytes: var bytes (for compatibility with later
version, in this version, is empty)
- * BODY: column bytes + column bytes + column
bytes + .......
+ * BODY: column index bytes + column index bytes +
column index bytes + .......
*
* </pre>
*/
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java
new file mode 100644
index 000000000..e8751245c
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.Options;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Options of file index column. */
+public class FileIndexOptions {
+
+ // if the filter size greater than fileIndexInManifestThreshold, we put it
in file
+ private final long fileIndexInManifestThreshold;
+
+ private final Map<String, Map<String, Options>> indexTypeOptions;
+
+ public FileIndexOptions() {
+
this(CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.defaultValue().getBytes());
+ }
+
+ public FileIndexOptions(long fileIndexInManifestThreshold) {
+ this.indexTypeOptions = new HashMap<>();
+ this.fileIndexInManifestThreshold = fileIndexInManifestThreshold;
+ }
+
+ public void computeIfAbsent(String column, String indexType) {
+ indexTypeOptions
+ .computeIfAbsent(column, c -> new HashMap<>())
+ .computeIfAbsent(indexType, i -> new Options());
+ }
+
+ public Options get(String column, String indexType) {
+ return Optional.ofNullable(indexTypeOptions.getOrDefault(column, null))
+ .map(x -> x.get(indexType))
+ .orElse(null);
+ }
+
+ public boolean isEmpty() {
+ return indexTypeOptions.isEmpty();
+ }
+
+ public long fileIndexInManifestThreshold() {
+ return fileIndexInManifestThreshold;
+ }
+
+ public Set<Map.Entry<String, Map<String, Options>>> entrySet() {
+ return indexTypeOptions.entrySet();
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
index c0486d535..37ba4d205 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
@@ -25,11 +25,10 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.BloomFilter64;
+import org.apache.paimon.utils.BloomFilter64.BitSet;
import org.apache.hadoop.util.bloom.HashFunction;
-import java.util.BitSet;
-
/**
* Bloom filter for file index.
*
@@ -40,7 +39,7 @@ import java.util.BitSet;
*/
public class BloomFilterFileIndex implements FileIndexer {
- public static final String BLOOM_FILTER = "bloom";
+ public static final String BLOOM_FILTER = "bloom-filter";
private static final int DEFAULT_ITEMS = 1_000_000;
private static final double DEFAULT_FPP = 0.1;
@@ -84,19 +83,21 @@ public class BloomFilterFileIndex implements FileIndexer {
@Override
public void write(Object key) {
- filter.addHash(hashFunction.hash(key));
+ if (key != null) {
+ filter.addHash(hashFunction.hash(key));
+ }
}
@Override
public byte[] serializedBytes() {
int numHashFunctions = filter.getNumHashFunctions();
- byte[] bytes = filter.getBitSet().toByteArray();
- byte[] serialized = new byte[bytes.length + Integer.BYTES];
+ byte[] serialized = new byte[filter.getBitSet().bitSize() /
Byte.SIZE + Integer.BYTES];
+ // little endian
serialized[0] = (byte) ((numHashFunctions >>> 24) & 0xFF);
serialized[1] = (byte) ((numHashFunctions >>> 16) & 0xFF);
serialized[2] = (byte) ((numHashFunctions >>> 8) & 0xFF);
serialized[3] = (byte) (numHashFunctions & 0xFF);
- System.arraycopy(bytes, 0, serialized, 4, bytes.length);
+ filter.getBitSet().toByteArray(serialized, 4, serialized.length -
4);
return serialized;
}
}
@@ -107,21 +108,20 @@ public class BloomFilterFileIndex implements FileIndexer {
private final FastHash hashFunction;
public Reader(DataType type, byte[] serializedBytes) {
+ // little endian
int numHashFunctions =
((serializedBytes[0] << 24)
+ (serializedBytes[1] << 16)
+ (serializedBytes[2] << 8)
+ serializedBytes[3]);
- byte[] bytes = new byte[serializedBytes.length - Integer.BYTES];
- System.arraycopy(serializedBytes, 4, bytes, 0, bytes.length);
- BitSet bitSet = BitSet.valueOf(bytes);
+ BitSet bitSet = new BitSet(serializedBytes, 4);
this.filter = new BloomFilter64(numHashFunctions, bitSet);
this.hashFunction = FastHash.getHashFunction(type);
}
@Override
public Boolean visitEqual(FieldRef fieldRef, Object key) {
- return filter.testHash(hashFunction.hash(key));
+ return key == null || filter.testHash(hashFunction.hash(key));
}
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java
b/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java
index 75b8661a2..5a4be5a45 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java
@@ -18,8 +18,6 @@
package org.apache.paimon.utils;
-import java.util.BitSet;
-
/** Bloom filter 64 handle 64 bits hash. */
public final class BloomFilter64 {
@@ -29,15 +27,15 @@ public final class BloomFilter64 {
public BloomFilter64(long items, double fpp) {
int nb = (int) (-items * Math.log(fpp) / (Math.log(2) * Math.log(2)));
- this.numBits = nb + (Long.SIZE - (nb % Long.SIZE));
+ this.numBits = nb + (Byte.SIZE - (nb % Byte.SIZE));
this.numHashFunctions =
Math.max(1, (int) Math.round((double) numBits / items *
Math.log(2)));
- this.bitSet = new BitSet(numBits);
+ this.bitSet = new BitSet(new byte[numBits / Byte.SIZE], 0);
}
public BloomFilter64(int numHashFunctions, BitSet bitSet) {
this.numHashFunctions = numHashFunctions;
- this.numBits = bitSet.size();
+ this.numBits = bitSet.bitSize();
this.bitSet = bitSet;
}
@@ -81,4 +79,38 @@ public final class BloomFilter64 {
public BitSet getBitSet() {
return bitSet;
}
+
+ /** Bit set used for bloom filter 64. */
+ public static class BitSet {
+
+ private static final byte MAST = 0x07;
+
+ private final byte[] data;
+ private final int offset;
+
+ public BitSet(byte[] data, int offset) {
+ assert data.length > 0 : "data length is zero!";
+ assert offset >= 0 : "offset is negative!";
+ this.data = data;
+ this.offset = offset;
+ }
+
+ public void set(int index) {
+ data[(index >>> 3) + offset] |= (byte) ((byte) 1 << (index &
MAST));
+ }
+
+ public boolean get(int index) {
+ return (data[(index >>> 3) + offset] & ((byte) 1 << (index &
MAST))) != 0;
+ }
+
+ public int bitSize() {
+ return (data.length - offset) * Byte.SIZE;
+ }
+
+ public void toByteArray(byte[] bytes, int offset, int length) {
+ if (length >= 0) {
+ System.arraycopy(data, this.offset, bytes, offset, length);
+ }
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 186fd4799..99efac540 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -86,7 +86,8 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
schema,
rowType,
FileFormatDiscover.of(options),
- pathFactory());
+ pathFactory(),
+ options.fileIndexReadEnabled());
}
@Override
@@ -145,7 +146,8 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
options.bucket(),
forWrite,
options.scanManifestParallelism(),
- branchName);
+ branchName,
+ options.fileIndexReadEnabled());
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 354cc6dda..aeb30731d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -138,7 +138,8 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
schema,
valueType,
FileFormatDiscover.of(options),
- pathFactory());
+ pathFactory(),
+ options.fileIndexReadEnabled());
}
public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 9d9f41897..0ae21e6b2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.RowBuffer;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.CompactIncrement;
@@ -78,6 +79,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
private SinkWriter sinkWriter;
private final FieldStatsCollector.Factory[] statsCollectors;
private final IOManager ioManager;
+ private final FileIndexOptions fileIndexOptions;
private MemorySegmentPool memorySegmentPool;
private MemorySize maxDiskSize;
@@ -100,7 +102,8 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
String fileCompression,
String spillCompression,
FieldStatsCollector.Factory[] statsCollectors,
- MemorySize maxDiskSize) {
+ MemorySize maxDiskSize,
+ FileIndexOptions fileIndexOptions) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
@@ -120,6 +123,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
this.ioManager = ioManager;
this.statsCollectors = statsCollectors;
this.maxDiskSize = maxDiskSize;
+ this.fileIndexOptions = fileIndexOptions;
this.sinkWriter =
useWriteBuffer
@@ -246,7 +250,8 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
pathFactory,
seqNumCounter,
fileCompression,
- statsCollectors);
+ statsCollectors,
+ fileIndexOptions);
}
private void trySyncLatestCompaction(boolean blocking)
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 30dacd130..fdf7266bd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -83,6 +83,9 @@ public class DataFileMeta {
// We have to keep the compatibility.
private final @Nullable Long deleteRowCount;
+ // file index filter bytes, if it is small, store in data file meta
+ private final @Nullable byte[] embeddedIndex;
+
public static DataFileMeta forAppend(
String fileName,
long fileSize,
@@ -91,6 +94,28 @@ public class DataFileMeta {
long minSequenceNumber,
long maxSequenceNumber,
long schemaId) {
+ return forAppend(
+ fileName,
+ fileSize,
+ rowCount,
+ rowStats,
+ minSequenceNumber,
+ maxSequenceNumber,
+ schemaId,
+ Collections.emptyList(),
+ null);
+ }
+
+ public static DataFileMeta forAppend(
+ String fileName,
+ long fileSize,
+ long rowCount,
+ BinaryTableStats rowStats,
+ long minSequenceNumber,
+ long maxSequenceNumber,
+ long schemaId,
+ List<String> extraFiles,
+ @Nullable byte[] embeddedIndex) {
return new DataFileMeta(
fileName,
fileSize,
@@ -103,7 +128,10 @@ public class DataFileMeta {
maxSequenceNumber,
schemaId,
DUMMY_LEVEL,
- 0L);
+ extraFiles,
+
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
+ 0L,
+ embeddedIndex);
}
public DataFileMeta(
@@ -118,7 +146,8 @@ public class DataFileMeta {
long maxSequenceNumber,
long schemaId,
int level,
- @Nullable Long deleteRowCount) {
+ @Nullable Long deleteRowCount,
+ @Nullable byte[] embeddedIndex) {
this(
fileName,
fileSize,
@@ -133,7 +162,8 @@ public class DataFileMeta {
level,
Collections.emptyList(),
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
- deleteRowCount);
+ deleteRowCount,
+ embeddedIndex);
}
public DataFileMeta(
@@ -150,12 +180,14 @@ public class DataFileMeta {
int level,
List<String> extraFiles,
Timestamp creationTime,
- @Nullable Long deleteRowCount) {
+ @Nullable Long deleteRowCount,
+ @Nullable byte[] embeddedIndex) {
this.fileName = fileName;
this.fileSize = fileSize;
this.rowCount = rowCount;
+ this.embeddedIndex = embeddedIndex;
this.minKey = minKey;
this.maxKey = maxKey;
this.keyStats = keyStats;
@@ -191,6 +223,10 @@ public class DataFileMeta {
return Optional.ofNullable(deleteRowCount);
}
+ public byte[] embeddedIndex() {
+ return embeddedIndex;
+ }
+
public BinaryRow minKey() {
return minKey;
}
@@ -276,7 +312,8 @@ public class DataFileMeta {
newLevel,
extraFiles,
creationTime,
- deleteRowCount);
+ deleteRowCount,
+ embeddedIndex);
}
public List<Path> collectFiles(DataFilePathFactory pathFactory) {
@@ -301,7 +338,8 @@ public class DataFileMeta {
level,
newExtraFiles,
creationTime,
- deleteRowCount);
+ deleteRowCount,
+ embeddedIndex);
}
@Override
@@ -316,6 +354,7 @@ public class DataFileMeta {
return Objects.equals(fileName, that.fileName)
&& fileSize == that.fileSize
&& rowCount == that.rowCount
+ && Objects.equals(embeddedIndex, that.embeddedIndex)
&& Objects.equals(minKey, that.minKey)
&& Objects.equals(maxKey, that.maxKey)
&& Objects.equals(keyStats, that.keyStats)
@@ -335,6 +374,7 @@ public class DataFileMeta {
fileName,
fileSize,
rowCount,
+ embeddedIndex,
minKey,
maxKey,
keyStats,
@@ -358,6 +398,7 @@ public class DataFileMeta {
fileName,
fileSize,
rowCount,
+ embeddedIndex,
minKey,
maxKey,
keyStats,
@@ -387,6 +428,7 @@ public class DataFileMeta {
fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false,
newStringType(false))));
fields.add(new DataField(12, "_CREATION_TIME",
DataTypes.TIMESTAMP_MILLIS()));
fields.add(new DataField(13, "_DELETE_ROW_COUNT", new
BigIntType(true)));
+ fields.add(new DataField(14, "_EMBEDDED_FILE_INDEX",
newBytesType(true)));
return new RowType(fields);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
index f91e3d293..3de1fcac9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
@@ -54,7 +54,8 @@ public class DataFileMetaSerializer extends
ObjectSerializer<DataFileMeta> {
meta.level(),
toStringArrayData(meta.extraFiles()),
meta.creationTime(),
- meta.deleteRowCount().orElse(null));
+ meta.deleteRowCount().orElse(null),
+ meta.embeddedIndex());
}
@Override
@@ -73,6 +74,7 @@ public class DataFileMetaSerializer extends
ObjectSerializer<DataFileMeta> {
row.getInt(10),
fromStringArrayData(row.getArray(11)),
row.getTimestamp(12, 3),
- row.isNullAt(13) ? null : row.getLong(13));
+ row.isNullAt(13) ? null : row.getLong(13),
+ row.isNullAt(14) ? null : row.getBinary(14));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index 385609de3..ef83e8598 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -34,6 +34,8 @@ public class DataFilePathFactory {
public static final String CHANGELOG_FILE_PREFIX = "changelog-";
+ public static final String INDEX_PATH_SUFFIX = ".index";
+
private final Path parent;
private final String uuid;
@@ -70,6 +72,10 @@ public class DataFilePathFactory {
return uuid;
}
+ public static Path toFileIndexPath(Path filePath) {
+ return new Path(filePath.getParent(), filePath.getName() +
INDEX_PATH_SUFFIX);
+ }
+
public static String formatIdentifier(String fileName) {
int index = fileName.lastIndexOf('.');
if (index == -1) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java
new file mode 100644
index 000000000..0a91d4f7f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexPredicate;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** File index reader, do the filter in the constructor. */
+public class FileIndexRecordReader implements RecordReader<InternalRow> {
+
+ private final RecordReader<InternalRow> reader;
+
+ public FileIndexRecordReader(
+ FileIO fileIO,
+ TableSchema dataSchema,
+ List<Predicate> dataFilter,
+ DataFilePathFactory dataFilePathFactory,
+ DataFileMeta file,
+ ConcatRecordReader.ReaderSupplier<InternalRow> readerSupplier)
+ throws IOException {
+ boolean filterThisFile = false;
+ if (dataFilter != null && !dataFilter.isEmpty()) {
+ List<String> indexFiles =
+ file.extraFiles().stream()
+ .filter(name ->
name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+ .collect(Collectors.toList());
+ if (!indexFiles.isEmpty()) {
+ if (indexFiles.size() > 1) {
+ throw new RuntimeException(
+ "Found more than one index file for one data file:
"
+ + String.join(" and ", indexFiles));
+ }
+ // go to file index check
+ try (FileIndexPredicate predicate =
+ new FileIndexPredicate(
+ dataFilePathFactory.toPath(indexFiles.get(0)),
+ fileIO,
+ dataSchema.logicalRowType())) {
+ if (!predicate.testPredicate(
+ PredicateBuilder.and(dataFilter.toArray(new
Predicate[0])))) {
+ filterThisFile = true;
+ }
+ }
+ }
+ }
+
+ this.reader = filterThisFile ? null : readerSupplier.get();
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator<InternalRow> readBatch() throws IOException {
+ return reader == null ? null : reader.readBatch();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
new file mode 100644
index 000000000..6ba635c5e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexFormat;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.fileindex.FileIndexer;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Index file writer. */
+public final class FileIndexWriter implements Closeable {
+
+ public static final FileIndexResult EMPTY_RESULT =
FileIndexResult.of(null, null);
+
+ private final FileIO fileIO;
+
+ private final Path path;
+
+ // if the filter size greater than fileIndexInManifestThreshold, we put it
in file
+ private final long inManifestThreshold;
+
+ private final List<IndexMaintainer> indexMaintainers = new ArrayList<>();
+
+ private String resultFileName;
+
+ private byte[] embeddedIndexBytes;
+
+ public FileIndexWriter(
+ FileIO fileIO, Path path, RowType rowType, FileIndexOptions
fileIndexOptions) {
+ this.fileIO = fileIO;
+ this.path = path;
+ List<DataField> fields = rowType.getFields();
+ Map<String, DataField> map = new HashMap<>();
+ Map<String, Integer> index = new HashMap<>();
+ fields.forEach(
+ dataField -> {
+ map.put(dataField.name(), dataField);
+ index.put(dataField.name(),
rowType.getFieldIndex(dataField.name()));
+ });
+ for (Map.Entry<String, Map<String, Options>> entry :
fileIndexOptions.entrySet()) {
+ String columnName = entry.getKey();
+ DataField field = map.get(columnName);
+ if (field == null) {
+ throw new IllegalArgumentException(columnName + " does not
exist in column fields");
+ }
+ for (Map.Entry<String, Options> typeEntry :
entry.getValue().entrySet()) {
+ String indexType = typeEntry.getKey();
+ indexMaintainers.add(
+ new IndexMaintainer(
+ columnName,
+ indexType,
+ FileIndexer.create(indexType, field.type(),
typeEntry.getValue())
+ .createWriter(),
+ InternalRow.createFieldGetter(
+ field.type(), index.get(columnName))));
+ }
+ }
+ this.inManifestThreshold =
fileIndexOptions.fileIndexInManifestThreshold();
+ }
+
+ public void write(InternalRow row) {
+ indexMaintainers.forEach(indexMaintainer ->
indexMaintainer.write(row));
+ }
+
+ @Override
+ public void close() throws IOException {
+ Map<String, Map<String, byte[]>> indexMaps = new HashMap<>();
+
+ for (IndexMaintainer indexMaintainer : indexMaintainers) {
+ indexMaps
+ .computeIfAbsent(indexMaintainer.getColumnName(), k -> new
HashMap<>())
+ .put(indexMaintainer.getIndexType(),
indexMaintainer.serializedBytes());
+ }
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (FileIndexFormat.Writer writer =
FileIndexFormat.createWriter(baos)) {
+ writer.writeColumnIndexes(indexMaps);
+ }
+
+ if (baos.size() > inManifestThreshold) {
+ try (OutputStream outputStream = fileIO.newOutputStream(path,
false)) {
+ outputStream.write(baos.toByteArray());
+ }
+ resultFileName = path.getName();
+ } else {
+ embeddedIndexBytes = baos.toByteArray();
+ }
+ }
+
+ public FileIndexResult result() {
+ return FileIndexResult.of(embeddedIndexBytes, resultFileName);
+ }
+
+ @Nullable
+ public static FileIndexWriter create(
+ FileIO fileIO, Path path, RowType rowType, FileIndexOptions
fileIndexOptions) {
+ return fileIndexOptions.isEmpty()
+ ? null
+ : new FileIndexWriter(fileIO, path, rowType, fileIndexOptions);
+ }
+
+ /** File index result. */
+ public interface FileIndexResult {
+
+ @Nullable
+ byte[] embeddedIndexBytes();
+
+ @Nullable
+ String independentIndexFile();
+
+ static FileIndexResult of(byte[] embeddedIndexBytes, String
resultFileName) {
+ return new FileIndexResult() {
+
+ @Override
+ public byte[] embeddedIndexBytes() {
+ return embeddedIndexBytes;
+ }
+
+ @Override
+ public String independentIndexFile() {
+ return resultFileName;
+ }
+ };
+ }
+ }
+
+ /** One index maintainer for one column. */
+ private static class IndexMaintainer {
+
+ private final String columnName;
+ private final String indexType;
+ private final org.apache.paimon.fileindex.FileIndexWriter
fileIndexWriter;
+ private final InternalRow.FieldGetter getter;
+
+ public IndexMaintainer(
+ String columnName,
+ String indexType,
+ org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter,
+ InternalRow.FieldGetter getter) {
+ this.columnName = columnName;
+ this.indexType = indexType;
+ this.fileIndexWriter = fileIndexWriter;
+ this.getter = getter;
+ }
+
+ public void write(InternalRow row) {
+ fileIndexWriter.write(getter.getFieldOrNull(row));
+ }
+
+ public String getIndexType() {
+ return indexType;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public byte[] serializedBytes() {
+ return fileIndexWriter.serializedBytes();
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
index 0eac2961a..1e12025ba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
@@ -41,24 +41,6 @@ public class FileRecordReader implements
RecordReader<InternalRow> {
@Nullable private final PartitionInfo partitionInfo;
@Nullable private final CastFieldGetter[] castMapping;
- public FileRecordReader(
- FormatReaderFactory readerFactory,
- FormatReaderFactory.Context context,
- @Nullable int[] indexMapping,
- @Nullable CastFieldGetter[] castMapping,
- @Nullable PartitionInfo partitionInfo)
- throws IOException {
- try {
- this.reader = readerFactory.createReader(context);
- } catch (Exception e) {
- FileUtils.checkExists(context.fileIO(), context.filePath());
- throw e;
- }
- this.indexMapping = indexMapping;
- this.partitionInfo = partitionInfo;
- this.castMapping = castMapping;
- }
-
@Nullable
@Override
public RecordReader.RecordIterator<InternalRow> readBatch() throws
IOException {
@@ -89,6 +71,24 @@ public class FileRecordReader implements
RecordReader<InternalRow> {
return iterator;
}
+ public FileRecordReader(
+ FormatReaderFactory readerFactory,
+ FormatReaderFactory.Context context,
+ @Nullable int[] indexMapping,
+ @Nullable CastFieldGetter[] castMapping,
+ @Nullable PartitionInfo partitionInfo)
+ throws IOException {
+ try {
+ this.reader = readerFactory.createReader(context);
+ } catch (Exception e) {
+ FileUtils.checkExists(context.fileIO(), context.filePath());
+ throw e;
+ }
+ this.indexMapping = indexMapping;
+ this.partitionInfo = partitionInfo;
+ this.castMapping = castMapping;
+ }
+
@Override
public void close() throws IOException {
reader.close();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index e2e5441f6..0f1223cca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -168,6 +168,8 @@ public class KeyValueDataFileWriter
maxSeqNumber,
schemaId,
level,
- deleteRecordCount);
+ deleteRecordCount,
+ // TODO: enable file filter for primary key table (e.g.
deletion table).
+ null);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 3b085c645..ce441b279 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -19,6 +19,7 @@
package org.apache.paimon.io;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.TableStatsExtractor;
import org.apache.paimon.fs.FileIO;
@@ -32,8 +33,11 @@ import org.apache.paimon.utils.LongCounter;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Collections;
import java.util.function.Function;
+import static org.apache.paimon.io.DataFilePathFactory.toFileIndexPath;
+
/**
* A {@link StatsCollectingSingleFileWriter} to write data files containing
{@link InternalRow}.
* Also produces {@link DataFileMeta} after writing a file.
@@ -43,6 +47,7 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
private final long schemaId;
private final LongCounter seqNumCounter;
private final FieldStatsArraySerializer statsArraySerializer;
+ @Nullable private final FileIndexWriter fileIndexWriter;
public RowDataFileWriter(
FileIO fileIO,
@@ -53,7 +58,8 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
long schemaId,
LongCounter seqNumCounter,
String fileCompression,
- FieldStatsCollector.Factory[] statsCollectors) {
+ FieldStatsCollector.Factory[] statsCollectors,
+ FileIndexOptions fileIndexOptions) {
super(
fileIO,
factory,
@@ -66,17 +72,34 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
this.schemaId = schemaId;
this.seqNumCounter = seqNumCounter;
this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
+ this.fileIndexWriter =
+ FileIndexWriter.create(
+ fileIO, toFileIndexPath(path), writeSchema,
fileIndexOptions);
}
@Override
public void write(InternalRow row) throws IOException {
super.write(row);
+ // add row to index if needed
+ if (fileIndexWriter != null) {
+ fileIndexWriter.write(row);
+ }
seqNumCounter.add(1L);
}
+ @Override
+ public void close() throws IOException {
+ if (fileIndexWriter != null) {
+ fileIndexWriter.close();
+ }
+ super.close();
+ }
+
@Override
public DataFileMeta result() throws IOException {
BinaryTableStats stats = statsArraySerializer.toBinary(fieldStats());
+ FileIndexWriter.FileIndexResult indexResult =
+ fileIndexWriter == null ? FileIndexWriter.EMPTY_RESULT :
fileIndexWriter.result();
return DataFileMeta.forAppend(
path.getName(),
fileIO.getFileSize(path),
@@ -84,6 +107,10 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
stats,
seqNumCounter.getValue() - super.recordCount(),
seqNumCounter.getValue() - 1,
- schemaId);
+ schemaId,
+ indexResult.independentIndexFile() == null
+ ? Collections.emptyList()
+ :
Collections.singletonList(indexResult.independentIndexFile()),
+ indexResult.embeddedIndexBytes());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index 153929a36..8c92f17c8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -19,6 +19,7 @@
package org.apache.paimon.io;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.avro.AvroFileFormat;
import org.apache.paimon.fs.FileIO;
@@ -38,7 +39,8 @@ public class RowDataRollingFileWriter extends
RollingFileWriter<InternalRow, Dat
DataFilePathFactory pathFactory,
LongCounter seqNumCounter,
String fileCompression,
- FieldStatsCollector.Factory[] statsCollectors) {
+ FieldStatsCollector.Factory[] statsCollectors,
+ FileIndexOptions fileIndexOptions) {
super(
() ->
new RowDataFileWriter(
@@ -54,7 +56,8 @@ public class RowDataRollingFileWriter extends
RollingFileWriter<InternalRow, Dat
schemaId,
seqNumCounter,
fileCompression,
- statsCollectors),
+ statsCollectors,
+ fileIndexOptions),
targetFileSize);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 866c87d75..baa2e9f4a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -19,6 +19,7 @@
package org.apache.paimon.operation;
import org.apache.paimon.AppendOnlyFileStore;
+import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
@@ -31,15 +32,25 @@ import org.apache.paimon.stats.FieldStatsConverters;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/** {@link FileStoreScan} for {@link AppendOnlyFileStore}. */
public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
private final FieldStatsConverters fieldStatsConverters;
+ private final boolean fileIndexReadEnabled;
+
private Predicate filter;
+ // just cache.
+ private final Map<Long, Predicate> dataFilterMapping = new HashMap<>();
+
public AppendOnlyFileStoreScan(
RowType partitionType,
ScanBucketFilter bucketFilter,
@@ -51,7 +62,8 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
- String branchName) {
+ String branchName,
+ boolean fileIndexReadEnabled) {
super(
partitionType,
bucketFilter,
@@ -66,6 +78,7 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
branchName);
this.fieldStatsConverters =
new FieldStatsConverters(sid -> scanTableSchema(sid).fields(),
schema.id());
+ this.fileIndexReadEnabled = fileIndexReadEnabled;
}
public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
@@ -84,11 +97,13 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
FieldStatsArraySerializer serializer =
fieldStatsConverters.getOrCreate(entry.file().schemaId());
BinaryTableStats stats = entry.file().valueStats();
+
return filter.test(
- entry.file().rowCount(),
- serializer.evolution(stats.minValues()),
- serializer.evolution(stats.maxValues()),
- serializer.evolution(stats.nullCounts(),
entry.file().rowCount()));
+ entry.file().rowCount(),
+ serializer.evolution(stats.minValues()),
+ serializer.evolution(stats.maxValues()),
+ serializer.evolution(stats.nullCounts(),
entry.file().rowCount()))
+ && (!fileIndexReadEnabled ||
testFileIndex(entry.file().embeddedIndex(), entry));
}
@Override
@@ -96,4 +111,24 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
// We don't need to filter per-bucket entries here
return entries;
}
+
+ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes,
ManifestEntry entry) {
+ if (embeddedIndexBytes == null) {
+ return true;
+ }
+
+ RowType dataRowType =
scanTableSchema(entry.file().schemaId()).logicalRowType();
+
+ Predicate dataPredicate =
+ dataFilterMapping.computeIfAbsent(
+ entry.file().schemaId(),
+ id ->
fieldStatsConverters.convertFilter(entry.file().schemaId(), filter));
+
+ try (FileIndexPredicate predicate =
+ new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
+ return predicate.testPredicate(dataPredicate);
+ } catch (IOException e) {
+ throw new RuntimeException("Exception happens while checking
predicate.", e);
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index b3361b0df..fc3f2a3d6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -27,6 +27,7 @@ import org.apache.paimon.compact.NoopCompactManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
@@ -74,6 +75,7 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
private final boolean spillable;
private final MemorySize maxDiskSize;
private final FieldStatsCollector.Factory[] statsCollectors;
+ private final FileIndexOptions fileIndexOptions;
private boolean forceBufferSpill = false;
private boolean skipCompaction;
@@ -109,6 +111,7 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
this.maxDiskSize = options.writeBufferSpillDiskSize();
this.statsCollectors =
StatsCollectorFactories.createStatsFactories(options,
rowType.getFieldNames());
+ this.fileIndexOptions = options.indexColumnsOptions();
}
@Override
@@ -155,7 +158,8 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
fileCompression,
spillCompression,
statsCollectors,
- maxDiskSize);
+ maxDiskSize,
+ fileIndexOptions);
}
public AppendOnlyCompactManager.CompactRewriter compactRewriter(
@@ -174,7 +178,8 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
pathFactory.createDataFilePathFactory(partition,
bucket),
new
LongCounter(toCompact.get(0).minSequenceNumber()),
fileCompression,
- statsCollectors);
+ statsCollectors,
+ fileIndexOptions);
try {
rewriter.write(bucketReader(partition,
bucket).read(toCompact));
} finally {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index c801dcaa8..cf3b76e67 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -18,6 +18,7 @@
package org.apache.paimon.operation;
+import org.apache.paimon.casting.CastFieldGetter;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
@@ -25,9 +26,11 @@ import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatKey;
import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.FileIndexRecordReader;
import org.apache.paimon.io.FileRecordReader;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.partition.PartitionUtils;
@@ -68,7 +71,8 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
private final TableSchema schema;
private final FileFormatDiscover formatDiscover;
private final FileStorePathFactory pathFactory;
- private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
+ private final Map<FormatKey, RawFileBulkFormatMapping> bulkFormatMappings;
+ private final boolean fileIndexReadEnabled;
private int[][] projection;
@@ -80,13 +84,15 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
TableSchema schema,
RowType rowType,
FileFormatDiscover formatDiscover,
- FileStorePathFactory pathFactory) {
+ FileStorePathFactory pathFactory,
+ boolean fileIndexReadEnabled) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schema = schema;
this.formatDiscover = formatDiscover;
this.pathFactory = pathFactory;
this.bulkFormatMappings = new HashMap<>();
+ this.fileIndexReadEnabled = fileIndexReadEnabled;
this.projection = Projection.range(0,
rowType.getFieldCount()).toNestedIndexes();
}
@@ -121,7 +127,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
for (DataFileMeta file : split.dataFiles()) {
String formatIdentifier =
DataFilePathFactory.formatIdentifier(file.fileName());
- BulkFormatMapping bulkFormatMapping =
+ RawFileBulkFormatMapping bulkFormatMapping =
bulkFormatMappings.computeIfAbsent(
new FormatKey(file.schemaId(), formatIdentifier),
this::createBulkFormatMapping);
@@ -140,7 +146,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
return ConcatRecordReader.create(suppliers);
}
- private BulkFormatMapping createBulkFormatMapping(FormatKey key) {
+ private RawFileBulkFormatMapping createBulkFormatMapping(FormatKey key) {
TableSchema tableSchema = schema;
TableSchema dataSchema =
key.schemaId == schema.id() ? schema :
schemaManager.schema(key.schemaId);
@@ -180,37 +186,81 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
RowType projectedRowType =
Projection.of(dataProjection).project(dataSchema.logicalRowType());
- return new BulkFormatMapping(
+ return new RawFileBulkFormatMapping(
indexCastMapping.getIndexMapping(),
indexCastMapping.getCastMapping(),
partitionPair,
formatDiscover
.discover(key.format)
- .createReaderFactory(projectedRowType, dataFilters));
+ .createReaderFactory(projectedRowType, dataFilters),
+ dataSchema,
+ dataFilters);
}
private RecordReader<InternalRow> createFileReader(
BinaryRow partition,
DataFileMeta file,
DataFilePathFactory dataFilePathFactory,
- BulkFormatMapping bulkFormatMapping,
+ RawFileBulkFormatMapping bulkFormatMapping,
DeletionVector.Factory dvFactory)
throws IOException {
- FileRecordReader fileRecordReader =
- new FileRecordReader(
- bulkFormatMapping.getReaderFactory(),
- new FormatReaderContext(
- fileIO,
- dataFilePathFactory.toPath(file.fileName()),
- file.fileSize()),
- bulkFormatMapping.getIndexMapping(),
- bulkFormatMapping.getCastMapping(),
-
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
-
- Optional<DeletionVector> deletionVector =
dvFactory.create(file.fileName());
- if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
- return new ApplyDeletionVectorReader<>(fileRecordReader,
deletionVector.get());
+ ConcatRecordReader.ReaderSupplier<InternalRow> supplier =
+ () -> {
+ FileRecordReader fileRecordReader =
+ new FileRecordReader(
+ bulkFormatMapping.getReaderFactory(),
+ new FormatReaderContext(
+ fileIO,
+
dataFilePathFactory.toPath(file.fileName()),
+ file.fileSize()),
+ bulkFormatMapping.getIndexMapping(),
+ bulkFormatMapping.getCastMapping(),
+ PartitionUtils.create(
+
bulkFormatMapping.getPartitionPair(), partition));
+
+ Optional<DeletionVector> deletionVector =
dvFactory.create(file.fileName());
+ if (deletionVector.isPresent() &&
!deletionVector.get().isEmpty()) {
+ return new ApplyDeletionVectorReader<>(
+ fileRecordReader, deletionVector.get());
+ }
+ return fileRecordReader;
+ };
+
+ return fileIndexReadEnabled
+ ? new FileIndexRecordReader(
+ fileIO,
+ bulkFormatMapping.getDataSchema(),
+ bulkFormatMapping.getDataFilters(),
+ dataFilePathFactory,
+ file,
+ supplier)
+ : supplier.get();
+ }
+
+ /** Bulk format mapping with data schema and data filters. */
+ private static class RawFileBulkFormatMapping extends BulkFormatMapping {
+
+ private final TableSchema dataSchema;
+ private final List<Predicate> dataFilters;
+
+ public RawFileBulkFormatMapping(
+ int[] indexMapping,
+ CastFieldGetter[] castMapping,
+ Pair<int[], RowType> partitionPair,
+ FormatReaderFactory bulkFormat,
+ TableSchema dataSchema,
+ List<Predicate> dataFilters) {
+ super(indexMapping, castMapping, partitionPair, bulkFormat);
+ this.dataSchema = dataSchema;
+ this.dataFilters = dataFilters;
+ }
+
+ public TableSchema getDataSchema() {
+ return dataSchema;
+ }
+
+ public List<Predicate> getDataFilters() {
+ return dataFilters;
}
- return fileRecordReader;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java
b/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java
index 6733e1fea..d3eb0e416 100644
---
a/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java
+++
b/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java
@@ -18,13 +18,17 @@
package org.apache.paimon.stats;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.IndexCastMapping;
+import org.apache.paimon.schema.SchemaEvolutionUtil;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -71,6 +75,17 @@ public class FieldStatsConverters {
});
}
+ public Predicate convertFilter(long dataSchemaId, Predicate filter) {
+ return tableSchemaId == dataSchemaId
+ ? filter
+ : Objects.requireNonNull(
+ SchemaEvolutionUtil.createDataFilters(
+ schemaFields.apply(tableSchemaId),
+ schemaFields.apply(dataSchemaId),
+ Collections.singletonList(filter)))
+ .get(0);
+ }
+
public List<DataField> tableDataFields() {
return tableDataFields;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
index e6209e223..21b8825f7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
@@ -186,6 +186,7 @@ public class AppendOnlyTableCompactionCoordinatorTest {
0,
0,
0,
- 0L);
+ 0L,
+ null);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 565be9f90..9211cfe60 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -27,6 +27,7 @@ import org.apache.paimon.disk.ChannelWithMeta;
import org.apache.paimon.disk.ExternalBuffer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.RowBuffer;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FieldStats;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
@@ -604,7 +605,8 @@ public class AppendOnlyWriterTest {
CoreOptions.SPILL_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
options,
AppendOnlyWriterTest.SCHEMA.getFieldNames()),
- MemorySize.MAX_VALUE);
+ MemorySize.MAX_VALUE,
+ new FileIndexOptions());
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
return Pair.of(writer, compactManager.allFiles());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index a96ee1429..3eff9b7cd 100644
---
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -138,7 +138,8 @@ public class IndexBootstrapTest extends TableTestBase {
Instant.ofEpochMilli(timeMillis)
.atZone(ZoneId.systemDefault())
.toLocalDateTime()),
- 0L);
+ 0L,
+ null);
}
private Pair<InternalRow, Integer> row(int pt, int col, int pk, int
bucket) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index ffa290e0a..1a3503f7a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.append.AppendOnlyWriter;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
@@ -89,7 +90,8 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
CoreOptions.SPILL_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
options, SCHEMA.getFieldNames()),
- MemorySize.MAX_VALUE);
+ MemorySize.MAX_VALUE,
+ new FileIndexOptions());
appendOnlyWriter.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
appendOnlyWriter.write(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
index aed01ca21..3b4921c88 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
@@ -161,7 +161,8 @@ public class DataFileTestDataGenerator {
maxSequenceNumber,
0,
level,
- kvs.stream().filter(kv ->
kv.valueKind().isRetract()).count()),
+ kvs.stream().filter(kv ->
kv.valueKind().isRetract()).count(),
+ null),
kvs);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
index b902ae967..d5f6ceda3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
@@ -51,7 +51,8 @@ public class DataFileTestUtils {
DataFileMeta.DUMMY_LEVEL,
Collections.emptyList(),
Timestamp.fromEpochMillis(100),
- maxSeq - minSeq + 1);
+ maxSeq - minSeq + 1,
+ null);
}
public static DataFileMeta newFile() {
@@ -67,7 +68,8 @@ public class DataFileTestUtils {
0,
0,
0,
- 0L);
+ 0L,
+ null);
}
public static DataFileMeta newFile(
@@ -89,7 +91,8 @@ public class DataFileTestUtils {
maxSequence,
0,
level,
- deleteRowCount);
+ deleteRowCount,
+ null);
}
public static BinaryRow row(int i) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index d91b8a9a0..1801c2dd0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.io;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -87,7 +88,8 @@ public class RollingFileWriterTest {
CoreOptions.FILE_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
new CoreOptions(new
HashMap<>()),
- SCHEMA.getFieldNames())),
+ SCHEMA.getFieldNames()),
+ new FileIndexOptions()),
TARGET_FILE_SIZE);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index ad5fe2f80..473b333a6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -115,6 +115,7 @@ public class ManifestCommittableSerializerTest {
1,
0,
level,
- 0L);
+ 0L,
+ null);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index e066eeaf9..9120e7b6c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -82,7 +82,8 @@ public abstract class ManifestFileMetaTestBase {
0, // not used
Collections.emptyList(),
Timestamp.fromEpochMillis(200000),
- 0L // not used
+ 0L, // not used
+ null // not used
));
}
@@ -243,6 +244,7 @@ public abstract class ManifestFileMetaTestBase {
0, // not used
0, // not used
0, // not used
- 0L));
+ 0L,
+ null));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
index c424b6094..4763585b0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
@@ -69,6 +69,18 @@ public class LevelsTest {
public static DataFileMeta newFile(int level) {
return new DataFileMeta(
- UUID.randomUUID().toString(), 0, 1, row(0), row(0), null,
null, 0, 1, 0, level, 0L);
+ UUID.randomUUID().toString(),
+ 0,
+ 1,
+ row(0),
+ row(0),
+ null,
+ null,
+ 0,
+ 1,
+ 0,
+ level,
+ 0L,
+ null);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
index c4edd76e2..89007a33a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
@@ -181,7 +181,8 @@ public class IntervalPartitionTest {
0,
Collections.emptyList(),
Timestamp.fromEpochMillis(100000),
- 0L);
+ 0L,
+ null);
}
private List<Map<SortedRun, Integer>> toMultiset(List<List<SortedRun>>
sections) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index 313f9799a..0d891f2c7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -357,6 +357,6 @@ public class UniversalCompactionTest {
}
static DataFileMeta file(long size) {
- return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0,
0, 0L);
+ return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0,
0, 0L, null);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index c8c7be7e1..de95819ad 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -200,7 +200,8 @@ public class ExpireSnapshotsTest {
0,
extraFiles,
Timestamp.now(),
- 0L);
+ 0L,
+ null);
ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1,
dataFile);
ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition,
0, 1, dataFile);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 3846ab234..6893649d5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -20,14 +20,19 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndex;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
@@ -41,6 +46,9 @@ import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
@@ -348,6 +356,129 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
commit.close();
}
+ @Test
+ public void testBloomFilterInMemory() throws Exception {
+ RowType rowType =
+ RowType.builder()
+ .field("id", DataTypes.INT())
+ .field("index_column", DataTypes.STRING())
+ .field("index_column2", DataTypes.INT())
+ .field("index_column3", DataTypes.BIGINT())
+ .build();
+ // in unaware-bucket mode, we split files into splits all the time
+ FileStoreTable table =
+ createUnawareBucketFileStoreTable(
+ rowType,
+ options -> {
+ options.set(
+ CoreOptions.FILE_INDEX
+ + "."
+ + BloomFilterFileIndex.BLOOM_FILTER
+ + "."
+ + CoreOptions.COLUMNS,
+ "index_column, index_column2,
index_column3");
+ options.set(
+ CoreOptions.FILE_INDEX
+ + "."
+ + BloomFilterFileIndex.BLOOM_FILTER
+ + ".index_column.items",
+ "150");
+ options.set(
+ CoreOptions.FILE_INDEX
+ + "."
+ + BloomFilterFileIndex.BLOOM_FILTER
+ + ".index_column2.items",
+ "150");
+ options.set(
+ CoreOptions.FILE_INDEX
+ + "."
+ + BloomFilterFileIndex.BLOOM_FILTER
+ + ".index_column3.items",
+ "150");
+ options.set(
+
CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "500 B");
+ });
+
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ List<CommitMessage> result = new ArrayList<>();
+ write.write(GenericRow.of(1, BinaryString.fromString("a"), 2, 3L));
+ write.write(GenericRow.of(1, BinaryString.fromString("c"), 2, 3L));
+ result.addAll(write.prepareCommit(true, 0));
+ write.write(GenericRow.of(1, BinaryString.fromString("b"), 2, 3L));
+ result.addAll(write.prepareCommit(true, 0));
+ commit.commit(0, result);
+ result.clear();
+
+ TableScan.Plan plan =
+ table.newScan()
+ .withFilter(
+ new PredicateBuilder(rowType)
+ .equal(1,
BinaryString.fromString("b")))
+ .plan();
+ List<DataFileMeta> metas =
+ plan.splits().stream()
+ .flatMap(split -> ((DataSplit)
split).dataFiles().stream())
+ .collect(Collectors.toList());
+ assertThat(metas.size()).isEqualTo(1);
+ }
+
+ @Test
+ public void testBloomFilterInDisk() throws Exception {
+ RowType rowType =
+ RowType.builder()
+ .field("id", DataTypes.INT())
+ .field("index_column", DataTypes.STRING())
+ .field("index_column2", DataTypes.INT())
+ .field("index_column3", DataTypes.BIGINT())
+ .build();
+ // in unaware-bucket mode, we split files into splits all the time
+ FileStoreTable table =
+ createUnawareBucketFileStoreTable(
+ rowType,
+ options -> {
+ options.set(
+ CoreOptions.FILE_INDEX
+ + "."
+ + BloomFilterFileIndex.BLOOM_FILTER
+ + "."
+ + CoreOptions.COLUMNS,
+ "index_column, index_column2,
index_column3");
+
options.set(CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "50 B");
+ });
+
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ List<CommitMessage> result = new ArrayList<>();
+ write.write(GenericRow.of(1, BinaryString.fromString("a"), 2, 3L));
+ write.write(GenericRow.of(1, BinaryString.fromString("c"), 2, 3L));
+ result.addAll(write.prepareCommit(true, 0));
+ write.write(GenericRow.of(1, BinaryString.fromString("b"), 2, 3L));
+ result.addAll(write.prepareCommit(true, 0));
+ commit.commit(0, result);
+ result.clear();
+
+ TableScan.Plan plan =
+ table.newScan()
+ .withFilter(
+ new PredicateBuilder(rowType)
+ .equal(1,
BinaryString.fromString("b")))
+ .plan();
+ List<DataFileMeta> metas =
+ plan.splits().stream()
+ .flatMap(split -> ((DataSplit)
split).dataFiles().stream())
+ .collect(Collectors.toList());
+ assertThat(metas.size()).isEqualTo(2);
+
+ RecordReader<InternalRow> reader =
+ table.newRead()
+ .withFilter(
+ new PredicateBuilder(rowType)
+ .equal(1,
BinaryString.fromString("b")))
+ .createReader(plan.splits());
+ reader.forEachRemaining(row ->
assertThat(row.getString(1).toString()).isEqualTo("b"));
+ }
+
@Test
public void testStreamingProjection() throws Exception {
writeData();
@@ -596,4 +727,22 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
""));
return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, tableSchema);
}
+
+ protected FileStoreTable createUnawareBucketFileStoreTable(
+ RowType rowType, Consumer<Options> configure) throws Exception {
+ Options conf = new Options();
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ conf.set(CoreOptions.BUCKET, -1);
+ configure.accept(conf);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ conf.toMap(),
+ ""));
+ return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, tableSchema);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
index a10413005..8223afa5f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
@@ -55,7 +55,8 @@ public class SplitGeneratorTest {
maxSequence,
0,
0,
- 0L);
+ 0L,
+ null);
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index d428bd29a..c096371d3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -307,12 +307,22 @@ public class UnawareBucketAppendOnlyTableITCase extends
CatalogITCaseBase {
assertThat(sql("SELECT * FROM append_table LIMIT 1")).hasSize(1);
}
+ @Test
+ public void testFileIndex() {
+ batchSql(
+ "INSERT INTO index_table VALUES (1, 'a', 'AAA'), (1, 'a',
'AAA'), (2, 'c', 'BBB'), (3, 'c', 'BBB')");
+
+ assertThat(batchSql("SELECT * FROM index_table WHERE indexc = 'c'"))
+ .containsExactlyInAnyOrder(Row.of(2, "c", "BBB"), Row.of(3,
"c", "BBB"));
+ }
+
@Override
protected List<String> ddl() {
return Arrays.asList(
"CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING)
WITH ('bucket' = '-1')",
"CREATE TABLE IF NOT EXISTS part_table (id INT, data STRING,
dt STRING) PARTITIONED BY (dt) WITH ('bucket' = '-1')",
- "CREATE TABLE IF NOT EXISTS complex_table (id INT, data
MAP<INT, INT>) WITH ('bucket' = '-1')");
+ "CREATE TABLE IF NOT EXISTS complex_table (id INT, data
MAP<INT, INT>) WITH ('bucket' = '-1')",
+ "CREATE TABLE IF NOT EXISTS index_table (id INT, indexc
STRING, data STRING) WITH ('bucket' = '-1',
'file-index.bloom-filter.columns'='indexc',
'file-index.bloom-filter.indexc.items' = '500')");
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
index c607e7a8f..42c125fbb 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
@@ -75,6 +75,7 @@ public class CompactionTaskSimpleSerializerTest {
1,
0,
0,
- 0L);
+ 0L,
+ null);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 4f53932a2..ef364a1ed 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -112,7 +112,8 @@ public class FileStoreSourceSplitGeneratorTest {
0, // not used
0, // not used
0, // not used
- 0L // not used
+ 0L, // not used
+ null // not used
));
}
return DataSplit.builder()
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
index cbe6cb86d..8d7d4c04e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
@@ -85,7 +85,8 @@ public class FileStoreSourceSplitSerializerTest {
1,
0,
level,
- 0L);
+ 0L,
+ null);
}
public static FileStoreSourceSplit newSourceSplit(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 7fd14b1ad..edd6688da 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -137,7 +137,8 @@ public class TestChangelogDataReadWrite {
schema,
VALUE_TYPE,
FileFormatDiscover.of(options),
- pathFactory);
+ pathFactory,
+ options.fileIndexReadEnabled());
return new KeyValueTableRead(() -> read, () -> rawFileRead, null);
}