This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 70b5656610 [core] Support creating multiple indexes on one column and
correct parameter name for specifying bitmap index version (#5589)
70b5656610 is described below
commit 70b56566104a8f6d66dd7e1f08ebab386ee0c44a
Author: Zhonghang Liu <[email protected]>
AuthorDate: Tue May 13 23:02:48 2025 +0800
[core] Support creating multiple indexes on one column and correct
parameter name for specifying bitmap index version (#5589)
---
docs/content/concepts/spec/fileindex.md | 2 +-
.../org/apache/paimon/io/DataFileIndexWriter.java | 31 +--
.../apache/paimon/io/DataFileIndexWriterTest.java | 211 +++++++++++++++++++++
3 files changed, 232 insertions(+), 12 deletions(-)
diff --git a/docs/content/concepts/spec/fileindex.md
b/docs/content/concepts/spec/fileindex.md
index 2cede8afe1..c5c4835140 100644
--- a/docs/content/concepts/spec/fileindex.md
+++ b/docs/content/concepts/spec/fileindex.md
@@ -170,7 +170,7 @@ length: 4 bytes int
(Legacy) Bitmap file index format (V1):
-You can configure `file-index.bitmap.version` to use legacy bitmap version 1.
+You can configure `file-index.bitmap.<column_name>.version` to use legacy
bitmap version 1.
<pre>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java
index 2ba0283b7b..2c620dd143 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java
@@ -60,7 +60,8 @@ public final class DataFileIndexWriter implements Closeable {
// if the filter size greater than fileIndexInManifestThreshold, we put it
in file
private final long inManifestThreshold;
- private final Map<String, IndexMaintainer> indexMaintainers = new
HashMap<>();
+ // index type, column name -> index maintainer
+ private final Map<String, Map<String, IndexMaintainer>> indexMaintainers =
new HashMap<>();
private String resultFileName;
@@ -101,7 +102,9 @@ public final class DataFileIndexWriter implements Closeable
{
for (Map.Entry<String, Options> typeEntry :
entry.getValue().entrySet()) {
String indexType = typeEntry.getKey();
- IndexMaintainer maintainer = indexMaintainers.get(columnName);
+ Map<String, IndexMaintainer> column2maintainers =
+ indexMaintainers.computeIfAbsent(indexType, k -> new
HashMap<>());
+ IndexMaintainer maintainer =
column2maintainers.get(columnName);
if (entryColumn.isNestedColumn()) {
if (field.type().getTypeRoot() != DataTypeRoot.MAP) {
throw new IllegalArgumentException(
@@ -121,7 +124,7 @@ public final class DataFileIndexWriter implements Closeable
{
fileIndexOptions.getMapTopLevelOptions(
columnName,
typeEntry.getKey()),
index.get(columnName));
- indexMaintainers.put(columnName, mapMaintainer);
+ column2maintainers.put(columnName, mapMaintainer);
}
mapMaintainer.add(entryColumn.getNestedColumnName(),
typeEntry.getValue());
} else {
@@ -137,7 +140,7 @@ public final class DataFileIndexWriter implements Closeable
{
.createWriter(),
InternalRow.createFieldGetter(
field.type(),
index.get(columnName)));
- indexMaintainers.put(columnName, maintainer);
+ column2maintainers.put(columnName, maintainer);
}
}
}
@@ -146,7 +149,11 @@ public final class DataFileIndexWriter implements
Closeable {
}
public void write(InternalRow row) {
- indexMaintainers.values().forEach(index -> index.write(row));
+ indexMaintainers
+ .values()
+ .forEach(
+ column2maintainers ->
+ column2maintainers.values().forEach(index ->
index.write(row)));
}
@Override
@@ -170,12 +177,14 @@ public final class DataFileIndexWriter implements
Closeable {
public Map<String, Map<String, byte[]>> serializeMaintainers() {
Map<String, Map<String, byte[]>> indexMaps = new HashMap<>();
- for (IndexMaintainer indexMaintainer : indexMaintainers.values()) {
- Map<String, byte[]> mapBytes = indexMaintainer.serializedBytes();
- for (Map.Entry<String, byte[]> entry : mapBytes.entrySet()) {
- indexMaps
- .computeIfAbsent(entry.getKey(), k -> new HashMap<>())
- .put(indexMaintainer.getIndexType(), entry.getValue());
+ for (Map<String, IndexMaintainer> columnIndexMaintainers :
indexMaintainers.values()) {
+ for (IndexMaintainer indexMaintainer :
columnIndexMaintainers.values()) {
+ Map<String, byte[]> mapBytes =
indexMaintainer.serializedBytes();
+ for (Map.Entry<String, byte[]> entry : mapBytes.entrySet()) {
+ indexMaps
+ .computeIfAbsent(entry.getKey(), k -> new
HashMap<>())
+ .put(indexMaintainer.getIndexType(),
entry.getValue());
+ }
}
}
return indexMaps;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java
new file mode 100644
index 0000000000..71c60fa4bc
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.fileindex.FileIndexFormat;
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+
+/** Tests for {@link DataFileIndexWriter}. */
+public class DataFileIndexWriterTest {
+
+ @TempDir java.nio.file.Path tempFile;
+
+ FileIO fileIO = LocalFileIO.create();
+
+ boolean bitmapExist = false;
+ boolean bsiExist = false;
+ boolean bloomExists = false;
+
+ @Test
+ public void testCreatingMultipleIndexesOnOneColumn() throws Exception {
+
+ String tableName = "test";
+ String col1 = "f0";
+ String col2 = "f1";
+ Identifier identifier = Identifier.create(tableName, tableName);
+
+ Map<String, String> optionsMap = new HashMap<>();
+ optionsMap.put("file-index.bitmap.columns", col1);
+ optionsMap.put("file-index.bsi.columns", col1);
+ optionsMap.put("file-index.bloom-filter.columns", col2);
+ optionsMap.put("file-index.read.enabled", "true");
+ optionsMap.put("file-index.in-manifest-threshold", "1B");
+
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.options(optionsMap);
+ schemaBuilder.column(col1, DataTypes.INT());
+ schemaBuilder.column(col2, DataTypes.INT());
+ Schema schema = schemaBuilder.build();
+
+ Options catalogOptions = new Options();
+ catalogOptions.set(CatalogOptions.WAREHOUSE,
tempFile.toUri().toString());
+ catalogOptions.set(CACHE_ENABLED, false);
+ CatalogContext context = CatalogContext.create(catalogOptions);
+ FileSystemCatalog catalog = (FileSystemCatalog)
CatalogFactory.createCatalog(context);
+ catalog.createDatabase(tableName, false);
+ catalog.createTable(identifier, schema, false);
+ Table table = catalog.getTable(identifier);
+
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ IOManager ioManager = new IOManagerImpl("/tmp");
+ BatchTableWrite write = writeBuilder.newWrite();
+ write.withIOManager(ioManager);
+ write.write(GenericRow.of(1, 1));
+ write.write(GenericRow.of(1, 2));
+ write.write(GenericRow.of(2, 3));
+ List<CommitMessage> commitMessages = write.prepareCommit();
+ writeBuilder.newCommit().commit(commitMessages);
+
+ foreachIndexReader(
+ catalog,
+ tableName,
+ col1,
+ fileIndexReader -> {
+ String className = fileIndexReader.getClass().getName();
+ if (className.endsWith(".BitmapFileIndex$Reader")) {
+ bitmapExist = true;
+ } else if
(className.endsWith(".BitSliceIndexBitmapFileIndex$Reader")) {
+ bsiExist = true;
+ } else {
+ throw new RuntimeException("unknown file index reader:
" + className);
+ }
+ BitmapIndexResult result =
+ (BitmapIndexResult)
+ fileIndexReader.visitEqual(
+ new FieldRef(0, col1,
DataTypes.INT()), 1);
+ assert result.get().equals(RoaringBitmap32.bitmapOf(0, 1));
+ });
+
+ foreachIndexReader(
+ catalog,
+ tableName,
+ col2,
+ fileIndexReader -> {
+ String className = fileIndexReader.getClass().getName();
+ if (className.endsWith(".BloomFilterFileIndex$Reader")) {
+ bloomExists = true;
+ }
+ });
+
+ assert bitmapExist;
+ assert bsiExist;
+ assert bloomExists;
+ }
+
+ protected void foreachIndexReader(
+ FileSystemCatalog fileSystemCatalog,
+ String tableName,
+ String columnName,
+ Consumer<FileIndexReader> consumer)
+ throws Catalog.TableNotExistException {
+ Path tableRoot =
+
fileSystemCatalog.getTableLocation(Identifier.create(tableName, tableName));
+ SchemaManager schemaManager = new SchemaManager(fileIO, tableRoot);
+ FileStorePathFactory pathFactory =
+ new FileStorePathFactory(
+ tableRoot,
+ RowType.of(),
+ new CoreOptions(new Options()).partitionDefaultName(),
+ CoreOptions.FILE_FORMAT.defaultValue(),
+ CoreOptions.DATA_FILE_PREFIX.defaultValue(),
+ CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue(),
+ null,
+ null);
+
+ Table table = fileSystemCatalog.getTable(Identifier.create(tableName,
tableName));
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ for (Split split : splits) {
+ DataSplit dataSplit = (DataSplit) split;
+ DataFilePathFactory dataFilePathFactory =
+ pathFactory.createDataFilePathFactory(
+ dataSplit.partition(), dataSplit.bucket());
+ for (DataFileMeta dataFileMeta : dataSplit.dataFiles()) {
+ TableSchema tableSchema =
schemaManager.schema(dataFileMeta.schemaId());
+ List<String> indexFiles =
+ dataFileMeta.extraFiles().stream()
+ .filter(
+ name ->
+ name.endsWith(
+
DataFilePathFactory.INDEX_PATH_SUFFIX))
+ .collect(Collectors.toList());
+ // assert index file exist and only one index file
+ assert indexFiles.size() == 1;
+ try (FileIndexFormat.Reader reader =
+ FileIndexFormat.createReader(
+ fileIO.newInputStream(
+ dataFilePathFactory.toAlignedPath(
+ indexFiles.get(0),
dataFileMeta)),
+ tableSchema.logicalRowType())) {
+ Set<FileIndexReader> fileIndexReaders =
reader.readColumnIndex(columnName);
+ for (FileIndexReader fileIndexReader : fileIndexReaders) {
+ consumer.accept(fileIndexReader);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+}