This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a74f49d2a [core] Enable file index for map type with map-keys (#3221)
a74f49d2a is described below

commit a74f49d2aae7d62805f7e87b18d45075dcbddbf9
Author: YeJunHao <[email protected]>
AuthorDate: Fri Apr 26 09:43:07 2024 +0800

    [core] Enable file index for map type with map-keys (#3221)
---
 .../main/java/org/apache/paimon/CoreOptions.java   |  70 +-------
 .../{FileIndexWriter.java => FileIndexCommon.java} |  25 ++-
 .../apache/paimon/fileindex/FileIndexFormat.java   | 100 ++++++-----
 .../apache/paimon/fileindex/FileIndexOptions.java  | 196 +++++++++++++++++++--
 .../paimon/fileindex/FileIndexPredicate.java       |   6 +-
 .../apache/paimon/fileindex/FileIndexReader.java   |  28 +--
 .../apache/paimon/fileindex/FileIndexWriter.java   |  19 +-
 .../bloomfilter/BloomFilterFileIndex.java          |   8 +-
 .../fileindex/empty/EmptyFileIndexReader.java      |  71 ++++++++
 .../java/org/apache/paimon/options/Options.java    |   7 +
 .../fileindex/FileIndexFormatFormatTest.java       |  35 +++-
 .../java/org/apache/paimon/io/FileIndexWriter.java | 172 +++++++++++++++---
 .../paimon/table/AppendOnlyFileStoreTableTest.java | 161 ++++++++++++++++-
 13 files changed, 714 insertions(+), 184 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 86bcb91fd..d883a0107 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1738,75 +1738,7 @@ public class CoreOptions implements Serializable {
     }
 
     public FileIndexOptions indexColumnsOptions() {
-        String fileIndexPrefix = FILE_INDEX + ".";
-        String fileIndexColumnSuffix = "." + COLUMNS;
-
-        FileIndexOptions fileIndexOptions = new 
FileIndexOptions(fileIndexInManifestThreshold());
-        for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
-            String key = entry.getKey();
-            if (key.startsWith(fileIndexPrefix)) {
-                // start with file-index, decode this option
-                if (key.endsWith(fileIndexColumnSuffix)) {
-                    // if end with .column, set up indexes
-                    String indexType =
-                            key.substring(
-                                    fileIndexPrefix.length(),
-                                    key.length() - 
fileIndexColumnSuffix.length());
-                    String[] names = entry.getValue().split(",");
-                    for (String name : names) {
-                        if (StringUtils.isBlank(name)) {
-                            throw new IllegalArgumentException(
-                                    "Wrong option in " + key + ", should not 
have empty column");
-                        }
-                        fileIndexOptions.computeIfAbsent(name.trim(), 
indexType);
-                    }
-                } else {
-                    // else, it must be an option
-                    String[] kv = 
key.substring(fileIndexPrefix.length()).split("\\.");
-                    if (kv.length != 3) {
-                        continue;
-                    }
-                    String indexType = kv[0];
-                    String cname = kv[1];
-                    String opkey = kv[2];
-
-                    if (fileIndexOptions.get(cname, indexType) == null) {
-                        // if indexes have not set, find .column in options, 
then set them
-                        String columns =
-                                options.get(fileIndexPrefix + indexType + 
fileIndexColumnSuffix);
-                        if (columns == null) {
-                            continue;
-                        }
-                        String[] names = columns.split(",");
-                        boolean foundTarget = false;
-                        for (String name : names) {
-                            if (StringUtils.isBlank(name)) {
-                                throw new IllegalArgumentException(
-                                        "Wrong option in "
-                                                + key
-                                                + ", should not have empty 
column");
-                            }
-                            String tname = name.trim();
-                            if (cname.equals(tname)) {
-                                foundTarget = true;
-                            }
-                            fileIndexOptions.computeIfAbsent(name.trim(), 
indexType);
-                        }
-                        if (!foundTarget) {
-                            throw new IllegalArgumentException(
-                                    "Wrong option in "
-                                            + key
-                                            + ", can't found column "
-                                            + cname
-                                            + " in "
-                                            + columns);
-                        }
-                    }
-                    fileIndexOptions.get(cname, indexType).set(opkey, 
entry.getValue());
-                }
-            }
-        }
-        return fileIndexOptions;
+        return new FileIndexOptions(this);
     }
 
     public long fileIndexInManifestThreshold() {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexCommon.java
similarity index 50%
copy from 
paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
copy to 
paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexCommon.java
index 9eab19cde..3b8d0c793 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexCommon.java
@@ -18,10 +18,27 @@
 
 package org.apache.paimon.fileindex;
 
-/** To write file index. */
-public interface FileIndexWriter {
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.MapType;
 
-    void write(Object key);
+import java.util.Map;
+import java.util.Optional;
 
-    byte[] serializedBytes();
+/** Common function of file index put here. */
+public class FileIndexCommon {
+
+    public static String toMapKey(String mapColumnName, String keyName) {
+        return mapColumnName + "[" + keyName + "]";
+    }
+
+    public static DataType getFieldType(Map<String, DataField> fields, String 
columnsName) {
+        Optional<Integer> topLevelIndex = 
FileIndexOptions.topLevelIndexOfNested(columnsName);
+        if (topLevelIndex.isPresent()) {
+            return ((MapType) fields.get(columnsName.substring(0, 
topLevelIndex.get())).type())
+                    .getValueType();
+        } else {
+            return fields.get(columnsName).type();
+        }
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
index 4f03faf90..9d4a97b1a 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.fileindex;
 
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fileindex.empty.EmptyFileIndexReader;
 import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataField;
@@ -37,9 +38,9 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -97,6 +98,7 @@ import java.util.stream.Collectors;
 public final class FileIndexFormat {
 
     private static final long MAGIC = 1493475289347502L;
+    private static final int EMPTY_INDEX_FLAG = -1;
 
     enum Version {
         V_1(1);
@@ -145,9 +147,15 @@ public final class FileIndexFormat {
                 Map<String, byte[]> bytesMap = columnMap.getValue();
                 for (Map.Entry<String, byte[]> entry : bytesMap.entrySet()) {
                     int startPosition = baos.size();
-                    baos.write(entry.getValue());
-                    innerMap.put(
-                            entry.getKey(), Pair.of(startPosition, baos.size() 
- startPosition));
+                    byte[] v = entry.getValue();
+                    if (v == null) {
+                        innerMap.put(entry.getKey(), Pair.of(EMPTY_INDEX_FLAG, 
0));
+                    } else {
+                        baos.write(entry.getValue());
+                        innerMap.put(
+                                entry.getKey(),
+                                Pair.of(startPosition, baos.size() - 
startPosition));
+                    }
                 }
             }
             byte[] body = baos.toByteArray();
@@ -180,7 +188,9 @@ public final class FileIndexFormat {
                 for (Map.Entry<String, Pair<Integer, Integer>> indexEntry :
                         entry.getValue().entrySet()) {
                     dataOutputStream.writeUTF(indexEntry.getKey());
-                    dataOutputStream.writeInt(indexEntry.getValue().getLeft() 
+ headLength);
+                    int start = indexEntry.getValue().getLeft();
+                    dataOutputStream.writeInt(
+                            start == EMPTY_INDEX_FLAG ? EMPTY_INDEX_FLAG : 
start + headLength);
                     
dataOutputStream.writeInt(indexEntry.getValue().getRight());
                 }
             }
@@ -275,55 +285,59 @@ public final class FileIndexFormat {
             }
         }
 
-        public List<FileIndexReader> readColumnIndex(String columnName) {
+        public Set<FileIndexReader> readColumnIndex(String columnName) {
             return Optional.ofNullable(header.getOrDefault(columnName, null))
                     .map(
                             f ->
-                                    f.keySet().stream()
+                                    f.entrySet().stream()
                                             .map(
-                                                    indexType ->
-                                                            
readColumnIndex(columnName, indexType))
-                                            .collect(Collectors.toList()))
-                    .orElse(Collections.emptyList());
+                                                    entry ->
+                                                            getFileIndexReader(
+                                                                    columnName,
+                                                                    
entry.getKey(),
+                                                                    
entry.getValue()))
+                                            .collect(Collectors.toSet()))
+                    .orElse(Collections.emptySet());
         }
 
-        public FileIndexReader readColumnIndex(String columnName, String 
indexType) {
+        private FileIndexReader getFileIndexReader(
+                String columnName, String indexType, Pair<Integer, Integer> 
startAndLength) {
+            if (startAndLength.getLeft() == EMPTY_INDEX_FLAG) {
+                return EmptyFileIndexReader.INSTANCE;
+            }
+            return FileIndexer.create(
+                            indexType,
+                            FileIndexCommon.getFieldType(fields, columnName),
+                            new Options())
+                    .createReader(getBytesWithStartAndLength(startAndLength));
+        }
 
-            return readColumnInputStream(columnName, indexType)
-                    .map(
-                            serializedBytes ->
-                                    FileIndexer.create(
-                                                    indexType,
-                                                    
fields.get(columnName).type(),
-                                                    new Options())
-                                            .createReader(serializedBytes))
-                    .orElse(null);
+        private byte[] getBytesWithStartAndLength(Pair<Integer, Integer> 
startAndLength) {
+            byte[] b = new byte[startAndLength.getRight()];
+            try {
+                seekableInputStream.seek(startAndLength.getLeft());
+                int n = 0;
+                int len = b.length;
+                // read fully until b is full else throw.
+                while (n < len) {
+                    int count = seekableInputStream.read(b, n, len - n);
+                    if (count < 0) {
+                        throw new EOFException();
+                    }
+                    n += count;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            return b;
         }
 
         @VisibleForTesting
-        Optional<byte[]> readColumnInputStream(String columnName, String 
indexType) {
+        // only for test yet
+        Optional<byte[]> getBytesWithNameAndType(String columnName, String 
indexType) {
             return Optional.ofNullable(header.getOrDefault(columnName, null))
-                    .map(m -> m.getOrDefault(indexType, null))
-                    .map(
-                            startAndLength -> {
-                                byte[] b = new byte[startAndLength.getRight()];
-                                try {
-                                    
seekableInputStream.seek(startAndLength.getLeft());
-                                    int n = 0;
-                                    int len = b.length;
-                                    // read fully until b is full else throw.
-                                    while (n < len) {
-                                        int count = 
seekableInputStream.read(b, n, len - n);
-                                        if (count < 0) {
-                                            throw new EOFException();
-                                        }
-                                        n += count;
-                                    }
-                                } catch (IOException e) {
-                                    throw new RuntimeException(e);
-                                }
-                                return b;
-                            });
+                    .map(i -> i.getOrDefault(indexType, null))
+                    .map(this::getBytesWithStartAndLength);
         }
 
         @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java
index e8751245c..a7eb7b952 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java
@@ -20,41 +20,153 @@ package org.apache.paimon.fileindex;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.StringUtils;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 
 /** Options of file index column. */
 public class FileIndexOptions {
 
+    public static final String FILE_INDEX = "file-index";
+
+    public static final String COLUMNS = "columns";
+
     // if the filter size greater than fileIndexInManifestThreshold, we put it 
in file
     private final long fileIndexInManifestThreshold;
 
-    private final Map<String, Map<String, Options>> indexTypeOptions;
+    private final Map<Column, Map<String, Options>> indexTypeOptions;
+    private final Map<Column, Map<String, Options>> topLevelMapColumnOptions;
 
     public FileIndexOptions() {
-        
this(CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.defaultValue().getBytes());
+        this(new CoreOptions(new Options()));
     }
 
-    public FileIndexOptions(long fileIndexInManifestThreshold) {
+    public FileIndexOptions(CoreOptions coreOptions) {
         this.indexTypeOptions = new HashMap<>();
-        this.fileIndexInManifestThreshold = fileIndexInManifestThreshold;
+        this.topLevelMapColumnOptions = new HashMap<>();
+        this.fileIndexInManifestThreshold = 
coreOptions.fileIndexInManifestThreshold();
+        setupOptions(coreOptions);
     }
 
-    public void computeIfAbsent(String column, String indexType) {
-        indexTypeOptions
-                .computeIfAbsent(column, c -> new HashMap<>())
-                .computeIfAbsent(indexType, i -> new Options());
+    private void setupOptions(CoreOptions coreOptions) {
+        String fileIndexPrefix = FILE_INDEX + ".";
+        String fileIndexColumnSuffix = "." + COLUMNS;
+
+        Map<String, String> optionMap = new HashMap<>();
+        // find the column to be indexed.
+        for (Map.Entry<String, String> entry : coreOptions.toMap().entrySet()) 
{
+            String key = entry.getKey();
+            if (key.startsWith(fileIndexPrefix)) {
+                // start with file-index, decode this option
+                if (key.endsWith(fileIndexColumnSuffix)) {
+                    // if end with .column, set up indexes
+                    String indexType =
+                            key.substring(
+                                    fileIndexPrefix.length(),
+                                    key.length() - 
fileIndexColumnSuffix.length());
+                    String[] names = entry.getValue().split(",");
+                    for (String name : names) {
+                        if (StringUtils.isBlank(name)) {
+                            throw new IllegalArgumentException(
+                                    "Wrong option in " + key + ", should not 
have empty column");
+                        }
+                        computeIfAbsent(name.trim(), indexType);
+                    }
+                } else {
+                    optionMap.put(entry.getKey(), entry.getValue());
+                }
+            }
+        }
+
+        // fill out the options
+        for (Map.Entry<String, String> optionEntry : optionMap.entrySet()) {
+            String key = optionEntry.getKey();
+
+            String[] kv = key.substring(fileIndexPrefix.length()).split("\\.");
+            if (kv.length != 3) {
+                // just ignore options those are not expected
+                continue;
+            }
+            String indexType = kv[0];
+            String cname = kv[1];
+            String opkey = kv[2];
+
+            // if reaches here, must be an option.
+            if (get(cname, indexType) != null) {
+                get(cname, indexType).set(opkey, optionEntry.getValue());
+            } else if (getMapTopLevelOptions(cname, indexType) != null) {
+                getMapTopLevelOptions(cname, indexType).set(opkey, 
optionEntry.getValue());
+            } else {
+                throw new IllegalArgumentException(
+                        "Wrong option in \""
+                                + key
+                                + "\", can't found column \""
+                                + cname
+                                + "\" in \""
+                                + fileIndexPrefix
+                                + indexType
+                                + fileIndexColumnSuffix
+                                + "\"");
+            }
+        }
     }
 
-    public Options get(String column, String indexType) {
-        return Optional.ofNullable(indexTypeOptions.getOrDefault(column, null))
+    private void computeIfAbsent(String column, String indexType) {
+        Optional<Integer> nestedColumnPosition = topLevelIndexOfNested(column);
+        if (nestedColumnPosition.isPresent()) {
+            int position = nestedColumnPosition.get();
+            String columnName = column.substring(0, position);
+            String nestedName = column.substring(position + 1, column.length() 
- 1);
+
+            indexTypeOptions
+                    .computeIfAbsent(new Column(columnName, nestedName), c -> 
new HashMap<>())
+                    .computeIfAbsent(indexType, i -> new Options());
+            topLevelMapColumnOptions
+                    .computeIfAbsent(new Column(columnName), c -> new 
HashMap<>())
+                    .computeIfAbsent(indexType, i -> new Options());
+        } else {
+            indexTypeOptions
+                    .computeIfAbsent(new Column(column), c -> new HashMap<>())
+                    .computeIfAbsent(indexType, i -> new Options());
+        }
+    }
+
+    private Options get(String column, String indexType) {
+        Optional<Integer> nestedColumnPosition = topLevelIndexOfNested(column);
+
+        Column columnKey;
+        if (nestedColumnPosition.isPresent()) {
+            int position = nestedColumnPosition.get();
+            String columnName = column.substring(0, position);
+            String nestedName = column.substring(position + 1, column.length() 
- 1);
+
+            columnKey = new Column(columnName, nestedName);
+        } else {
+            columnKey = new Column(column);
+        }
+
+        return Optional.ofNullable(indexTypeOptions.getOrDefault(columnKey, 
null))
                 .map(x -> x.get(indexType))
                 .orElse(null);
     }
 
+    public Options getMapTopLevelOptions(String column, String indexType) {
+        return Optional.ofNullable(topLevelMapColumnOptions.getOrDefault(new 
Column(column), null))
+                .map(x -> x.get(indexType))
+                .orElseThrow(
+                        () ->
+                                new IllegalArgumentException(
+                                        "Can't find top level column options 
for map type: "
+                                                + column
+                                                + " "
+                                                + indexType));
+    }
+
     public boolean isEmpty() {
         return indexTypeOptions.isEmpty();
     }
@@ -63,7 +175,69 @@ public class FileIndexOptions {
         return fileIndexInManifestThreshold;
     }
 
-    public Set<Map.Entry<String, Map<String, Options>>> entrySet() {
+    public Set<Map.Entry<Column, Map<String, Options>>> entrySet() {
         return indexTypeOptions.entrySet();
     }
+
+    public static Optional<Integer> topLevelIndexOfNested(String column) {
+        int start = column.indexOf('[');
+        if (start != -1 && column.endsWith("]")) {
+            return Optional.of(start);
+        }
+        return Optional.empty();
+    }
+
+    /** Column to be file indexed. */
+    public static class Column {
+
+        private final String columnName;
+        private final String nestedColumnName;
+        private final boolean isNestedColumn;
+
+        public Column(String columnName) {
+            this.columnName = columnName;
+            this.nestedColumnName = null;
+            this.isNestedColumn = false;
+        }
+
+        public Column(String columnName, String nestedColumnName) {
+            this.columnName = columnName;
+            this.nestedColumnName = nestedColumnName;
+            this.isNestedColumn = true;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        public String getNestedColumnName() {
+            if (!isNestedColumn) {
+                throw new RuntimeException(
+                        "Column " + columnName + " is not nested column in 
options.");
+            }
+            return nestedColumnName;
+        }
+
+        public boolean isNestedColumn() {
+            return isNestedColumn;
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(new Object[] {columnName, nestedColumnName, 
isNestedColumn});
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof Column)) {
+                return false;
+            }
+
+            Column that = (Column) obj;
+
+            return Objects.equals(columnName, that.columnName)
+                    && Objects.equals(nestedColumnName, that.nestedColumnName)
+                    && isNestedColumn == that.isNestedColumn;
+        }
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
index 06a691848..31672a8b7 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -116,9 +117,10 @@ public class FileIndexPredicate implements Closeable {
     private static class FileIndexFieldPredicate implements 
PredicateVisitor<Boolean> {
 
         private final String columnName;
-        private final List<FileIndexReader> fileIndexReaders;
+        private final Collection<FileIndexReader> fileIndexReaders;
 
-        public FileIndexFieldPredicate(String columnName, 
List<FileIndexReader> fileIndexReaders) {
+        public FileIndexFieldPredicate(
+                String columnName, Collection<FileIndexReader> 
fileIndexReaders) {
             this.columnName = columnName;
             this.fileIndexReaders = fileIndexReaders;
         }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
index 8a881b4b8..f4708c2ee 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
@@ -27,55 +27,55 @@ import java.util.List;
  * Read file index from serialized bytes. Return true, means we need to search 
this file, else means
  * needn't.
  */
-public interface FileIndexReader extends FunctionVisitor<Boolean> {
+public abstract class FileIndexReader implements FunctionVisitor<Boolean> {
 
     @Override
-    default Boolean visitIsNotNull(FieldRef fieldRef) {
+    public Boolean visitIsNotNull(FieldRef fieldRef) {
         return true;
     }
 
     @Override
-    default Boolean visitIsNull(FieldRef fieldRef) {
+    public Boolean visitIsNull(FieldRef fieldRef) {
         return true;
     }
 
     @Override
-    default Boolean visitStartsWith(FieldRef fieldRef, Object literal) {
+    public Boolean visitStartsWith(FieldRef fieldRef, Object literal) {
         return true;
     }
 
     @Override
-    default Boolean visitLessThan(FieldRef fieldRef, Object literal) {
+    public Boolean visitLessThan(FieldRef fieldRef, Object literal) {
         return true;
     }
 
     @Override
-    default Boolean visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
+    public Boolean visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
         return true;
     }
 
     @Override
-    default Boolean visitNotEqual(FieldRef fieldRef, Object literal) {
+    public Boolean visitNotEqual(FieldRef fieldRef, Object literal) {
         return true;
     }
 
     @Override
-    default Boolean visitLessOrEqual(FieldRef fieldRef, Object literal) {
+    public Boolean visitLessOrEqual(FieldRef fieldRef, Object literal) {
         return true;
     }
 
     @Override
-    default Boolean visitEqual(FieldRef fieldRef, Object literal) {
+    public Boolean visitEqual(FieldRef fieldRef, Object literal) {
         return true;
     }
 
     @Override
-    default Boolean visitGreaterThan(FieldRef fieldRef, Object literal) {
+    public Boolean visitGreaterThan(FieldRef fieldRef, Object literal) {
         return true;
     }
 
     @Override
-    default Boolean visitIn(FieldRef fieldRef, List<Object> literals) {
+    public Boolean visitIn(FieldRef fieldRef, List<Object> literals) {
         for (Object key : literals) {
             if (visitEqual(fieldRef, key)) {
                 return true;
@@ -85,7 +85,7 @@ public interface FileIndexReader extends 
FunctionVisitor<Boolean> {
     }
 
     @Override
-    default Boolean visitNotIn(FieldRef fieldRef, List<Object> literals) {
+    public Boolean visitNotIn(FieldRef fieldRef, List<Object> literals) {
         for (Object key : literals) {
             if (visitNotEqual(fieldRef, key)) {
                 return true;
@@ -95,12 +95,12 @@ public interface FileIndexReader extends 
FunctionVisitor<Boolean> {
     }
 
     @Override
-    default Boolean visitAnd(List<Boolean> children) {
+    public Boolean visitAnd(List<Boolean> children) {
         throw new UnsupportedOperationException("Should not invoke this");
     }
 
     @Override
-    default Boolean visitOr(List<Boolean> children) {
+    public Boolean visitOr(List<Boolean> children) {
         throw new UnsupportedOperationException("Should not invoke this");
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
index 9eab19cde..57b583f9b 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
@@ -19,9 +19,22 @@
 package org.apache.paimon.fileindex;
 
 /** To write file index. */
-public interface FileIndexWriter {
+public abstract class FileIndexWriter {
 
-    void write(Object key);
+    private boolean empty = true;
 
-    byte[] serializedBytes();
+    public void writeRecord(Object key) {
+        if (key != null) {
+            empty = false;
+            write(key);
+        }
+    }
+
+    public abstract void write(Object key);
+
+    public abstract byte[] serializedBytes();
+
+    public boolean empty() {
+        return empty;
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
index 37ba4d205..48f109a63 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
@@ -71,7 +71,7 @@ public class BloomFilterFileIndex implements FileIndexer {
         return new Reader(dataType, serializedBytes);
     }
 
-    private static class Writer implements FileIndexWriter {
+    private static class Writer extends FileIndexWriter {
 
         private final BloomFilter64 filter;
         private final FastHash hashFunction;
@@ -83,9 +83,7 @@ public class BloomFilterFileIndex implements FileIndexer {
 
         @Override
         public void write(Object key) {
-            if (key != null) {
-                filter.addHash(hashFunction.hash(key));
-            }
+            filter.addHash(hashFunction.hash(key));
         }
 
         @Override
@@ -102,7 +100,7 @@ public class BloomFilterFileIndex implements FileIndexer {
         }
     }
 
-    private static class Reader implements FileIndexReader {
+    private static class Reader extends FileIndexReader {
 
         private final BloomFilter64 filter;
         private final FastHash hashFunction;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/empty/EmptyFileIndexReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/empty/EmptyFileIndexReader.java
new file mode 100644
index 000000000..cf2eb6812
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/empty/EmptyFileIndexReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.empty;
+
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.predicate.FieldRef;
+
+import java.util.List;
+
+/** Empty file index which has no writer and no serialized bytes. */
+public class EmptyFileIndexReader extends FileIndexReader {
+
+    /** No data in the file index, which mean this file has no related 
records. */
+    public static final EmptyFileIndexReader INSTANCE = new 
EmptyFileIndexReader();
+
+    @Override
+    public Boolean visitEqual(FieldRef fieldRef, Object literal) {
+        return false;
+    }
+
+    @Override
+    public Boolean visitIsNotNull(FieldRef fieldRef) {
+        return false;
+    }
+
+    @Override
+    public Boolean visitStartsWith(FieldRef fieldRef, Object literal) {
+        return false;
+    }
+
+    @Override
+    public Boolean visitLessThan(FieldRef fieldRef, Object literal) {
+        return false;
+    }
+
+    @Override
+    public Boolean visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
+        return false;
+    }
+
+    @Override
+    public Boolean visitLessOrEqual(FieldRef fieldRef, Object literal) {
+        return false;
+    }
+
+    @Override
+    public Boolean visitGreaterThan(FieldRef fieldRef, Object literal) {
+        return false;
+    }
+
+    @Override
+    public Boolean visitIn(FieldRef fieldRef, List<Object> literals) {
+        return false;
+    }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/Options.java 
b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
index 161eef067..b52676420 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/Options.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
@@ -62,6 +62,13 @@ public class Options implements Serializable {
         map.forEach(this::setString);
     }
 
+    /** Creates a new configuration that is initialized with the options of 
the given two maps. */
+    public Options(Map<String, String> map1, Map<String, String> map2) {
+        this();
+        map1.forEach(this::setString);
+        map2.forEach(this::setString);
+    }
+
     public static Options fromMap(Map<String, String> map) {
         return new Options(map);
     }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
index 8aaecb152..d9828e75a 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
@@ -18,7 +18,9 @@
 
 package org.apache.paimon.fileindex;
 
+import org.apache.paimon.fileindex.empty.EmptyFileIndexReader;
 import org.apache.paimon.fs.ByteArraySeekableStream;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
 import org.assertj.core.api.Assertions;
@@ -26,6 +28,8 @@ import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
@@ -66,10 +70,39 @@ public class FileIndexFormatFormatTest {
             String column = entry.getKey();
             for (String type : entry.getValue().keySet()) {
                 byte[] b =
-                        reader.readColumnInputStream(column, type)
+                        reader.getBytesWithNameAndType(column, type)
                                 .orElseThrow(RuntimeException::new);
                 
Assertions.assertThat(b).containsExactly(indexes.get(column).get(type));
             }
         }
     }
+
+    @Test
+    public void testEmptyFileIndex() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        FileIndexFormat.Writer writer = FileIndexFormat.createWriter(baos);
+
+        Map<String, Map<String, byte[]>> indexes = new HashMap<>();
+
+        indexes.computeIfAbsent("a", a -> new HashMap<>()).put("b", null);
+        indexes.computeIfAbsent("a", a -> new HashMap<>()).put("c", null);
+
+        writer.writeColumnIndexes(indexes);
+        writer.close();
+
+        byte[] indexBytes = baos.toByteArray();
+
+        FileIndexFormat.Reader reader =
+                FileIndexFormat.createReader(
+                        new ByteArraySeekableStream(indexBytes),
+                        RowType.builder()
+                                .field("a", DataTypes.BYTES())
+                                .field("b", DataTypes.STRING())
+                                .build());
+
+        Collection<FileIndexReader> fileIndexFormatList = 
reader.readColumnIndex("a");
+        Assertions.assertThat(fileIndexFormatList.size()).isEqualTo(1);
+        Assertions.assertThat(new ArrayList<>(fileIndexFormatList).get(0))
+                .isEqualTo(EmptyFileIndexReader.INSTANCE);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
index 6ba635c5e..0ec473fed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
@@ -18,7 +18,10 @@
 
 package org.apache.paimon.io;
 
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexCommon;
 import org.apache.paimon.fileindex.FileIndexFormat;
 import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.fileindex.FileIndexer;
@@ -26,6 +29,9 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
@@ -34,7 +40,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -51,7 +57,7 @@ public final class FileIndexWriter implements Closeable {
     // if the filter size greater than fileIndexInManifestThreshold, we put it 
in file
     private final long inManifestThreshold;
 
-    private final List<IndexMaintainer> indexMaintainers = new ArrayList<>();
+    private final Map<String, IndexMaintainer> indexMaintainers = new 
HashMap<>();
 
     private String resultFileName;
 
@@ -69,39 +75,75 @@ public final class FileIndexWriter implements Closeable {
                     map.put(dataField.name(), dataField);
                     index.put(dataField.name(), 
rowType.getFieldIndex(dataField.name()));
                 });
-        for (Map.Entry<String, Map<String, Options>> entry : 
fileIndexOptions.entrySet()) {
-            String columnName = entry.getKey();
+        for (Map.Entry<FileIndexOptions.Column, Map<String, Options>> entry :
+                fileIndexOptions.entrySet()) {
+            FileIndexOptions.Column entryColumn = entry.getKey();
+            String columnName = entryColumn.getColumnName();
             DataField field = map.get(columnName);
             if (field == null) {
                 throw new IllegalArgumentException(columnName + " does not 
exist in column fields");
             }
+
             for (Map.Entry<String, Options> typeEntry : 
entry.getValue().entrySet()) {
                 String indexType = typeEntry.getKey();
-                indexMaintainers.add(
-                        new IndexMaintainer(
-                                columnName,
-                                indexType,
-                                FileIndexer.create(indexType, field.type(), 
typeEntry.getValue())
-                                        .createWriter(),
-                                InternalRow.createFieldGetter(
-                                        field.type(), index.get(columnName))));
+                if (entryColumn.isNestedColumn()) {
+                    if (field.type().getTypeRoot() != DataTypeRoot.MAP) {
+                        throw new IllegalArgumentException(
+                                "Column "
+                                        + columnName
+                                        + " is nested column, but is not map 
type. Only should map type yet.");
+                    }
+                    MapType mapType = (MapType) field.type();
+                    ((MapFileIndexMaintainer)
+                                    indexMaintainers.computeIfAbsent(
+                                            columnName,
+                                            name ->
+                                                    new MapFileIndexMaintainer(
+                                                            columnName,
+                                                            indexType,
+                                                            
mapType.getKeyType(),
+                                                            
mapType.getValueType(),
+                                                            
fileIndexOptions.getMapTopLevelOptions(
+                                                                    
columnName, typeEntry.getKey()),
+                                                            
index.get(columnName))))
+                            .add(entryColumn.getNestedColumnName(), 
typeEntry.getValue());
+                } else {
+                    indexMaintainers.computeIfAbsent(
+                            columnName,
+                            name ->
+                                    new FileIndexMaintainer(
+                                            columnName,
+                                            indexType,
+                                            FileIndexer.create(
+                                                            indexType,
+                                                            field.type(),
+                                                            
typeEntry.getValue())
+                                                    .createWriter(),
+                                            InternalRow.createFieldGetter(
+                                                    field.type(), 
index.get(columnName))));
+                }
             }
         }
         this.inManifestThreshold = 
fileIndexOptions.fileIndexInManifestThreshold();
     }
 
     public void write(InternalRow row) {
-        indexMaintainers.forEach(indexMaintainer -> 
indexMaintainer.write(row));
+        indexMaintainers
+                .values()
+                .forEach(mapFileIndexMaintainer -> 
mapFileIndexMaintainer.write(row));
     }
 
     @Override
     public void close() throws IOException {
         Map<String, Map<String, byte[]>> indexMaps = new HashMap<>();
 
-        for (IndexMaintainer indexMaintainer : indexMaintainers) {
-            indexMaps
-                    .computeIfAbsent(indexMaintainer.getColumnName(), k -> new 
HashMap<>())
-                    .put(indexMaintainer.getIndexType(), 
indexMaintainer.serializedBytes());
+        for (IndexMaintainer indexMaintainer : indexMaintainers.values()) {
+            Map<String, byte[]> mapBytes = indexMaintainer.serializedBytes();
+            for (Map.Entry<String, byte[]> entry : mapBytes.entrySet()) {
+                indexMaps
+                        .computeIfAbsent(entry.getKey(), k -> new HashMap<>())
+                        .put(indexMaintainer.getIndexType(), entry.getValue());
+            }
         }
 
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -156,15 +198,24 @@ public final class FileIndexWriter implements Closeable {
         }
     }
 
+    interface IndexMaintainer {
+
+        void write(InternalRow row);
+
+        String getIndexType();
+
+        Map<String, byte[]> serializedBytes();
+    }
+
     /** One index maintainer for one column. */
-    private static class IndexMaintainer {
+    private static class FileIndexMaintainer implements IndexMaintainer {
 
         private final String columnName;
         private final String indexType;
         private final org.apache.paimon.fileindex.FileIndexWriter 
fileIndexWriter;
         private final InternalRow.FieldGetter getter;
 
-        public IndexMaintainer(
+        public FileIndexMaintainer(
                 String columnName,
                 String indexType,
                 org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter,
@@ -176,19 +227,92 @@ public final class FileIndexWriter implements Closeable {
         }
 
         public void write(InternalRow row) {
-            fileIndexWriter.write(getter.getFieldOrNull(row));
+            fileIndexWriter.writeRecord(getter.getFieldOrNull(row));
         }
 
         public String getIndexType() {
             return indexType;
         }
 
-        public String getColumnName() {
-            return columnName;
+        public Map<String, byte[]> serializedBytes() {
+            return Collections.singletonMap(columnName, 
fileIndexWriter.serializedBytes());
+        }
+    }
+
+    /** File index writer for map data type. */
+    private static class MapFileIndexMaintainer implements IndexMaintainer {
+
+        private final String columnName;
+        private final String indexType;
+        private final Options options;
+        private final DataType valueType;
+        private final Map<String, org.apache.paimon.fileindex.FileIndexWriter> 
indexWritersMap;
+        private final InternalArray.ElementGetter valueElementGetter;
+        private final int position;
+
+        public MapFileIndexMaintainer(
+                String columnName,
+                String indexType,
+                DataType keyType,
+                DataType valueType,
+                Options options,
+                int position) {
+            this.columnName = columnName;
+            this.indexType = indexType;
+            this.valueType = valueType;
+            this.options = options;
+            this.position = position;
+            this.indexWritersMap = new HashMap<>();
+            this.valueElementGetter = 
InternalArray.createElementGetter(valueType);
+
+            DataTypeRoot rootType = keyType.getTypeRoot();
+            if (rootType != DataTypeRoot.CHAR && rootType != 
DataTypeRoot.VARCHAR) {
+                throw new IllegalArgumentException(
+                        "Only support map data type with key field of 
CHAR、VARCHAR、STRING.");
+            }
+        }
+
+        public void write(InternalRow row) {
+            InternalMap internalMap = row.getMap(position);
+            InternalArray keyArray = internalMap.keyArray();
+            InternalArray valueArray = internalMap.valueArray();
+
+            for (int i = 0; i < keyArray.size(); i++) {
+                String key = keyArray.getString(i).toString();
+                org.apache.paimon.fileindex.FileIndexWriter writer =
+                        indexWritersMap.getOrDefault(key, null);
+                if (writer != null) {
+                    
writer.writeRecord(valueElementGetter.getElementOrNull(valueArray, i));
+                }
+            }
+        }
+
+        public void add(String nestedKey, Options nestedOptions) {
+            indexWritersMap.put(
+                    nestedKey,
+                    FileIndexer.create(
+                                    indexType,
+                                    valueType,
+                                    new Options(options.toMap(), 
nestedOptions.toMap()))
+                            .createWriter());
+        }
+
+        public String getIndexType() {
+            return indexType;
         }
 
-        public byte[] serializedBytes() {
-            return fileIndexWriter.serializedBytes();
+        public Map<String, byte[]> serializedBytes() {
+            Map<String, byte[]> result = new HashMap<>();
+            indexWritersMap.forEach(
+                    (k, v) -> {
+                        if (!v.empty()) {
+                            result.put(
+                                    FileIndexCommon.toMapKey(columnName, k), 
v.serializedBytes());
+                        } else {
+                            result.put(FileIndexCommon.toMapKey(columnName, 
k), null);
+                        }
+                    });
+            return result;
         }
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 6893649d5..73a9b96be 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -21,15 +21,19 @@ package org.apache.paimon.table;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericMap;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndex;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReader;
@@ -63,6 +67,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD;
 import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket;
 import static 
org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -371,32 +376,31 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
                         rowType,
                         options -> {
                             options.set(
-                                    CoreOptions.FILE_INDEX
+                                    FileIndexOptions.FILE_INDEX
                                             + "."
                                             + BloomFilterFileIndex.BLOOM_FILTER
                                             + "."
                                             + CoreOptions.COLUMNS,
                                     "index_column, index_column2, 
index_column3");
                             options.set(
-                                    CoreOptions.FILE_INDEX
+                                    FileIndexOptions.FILE_INDEX
                                             + "."
                                             + BloomFilterFileIndex.BLOOM_FILTER
                                             + ".index_column.items",
                                     "150");
                             options.set(
-                                    CoreOptions.FILE_INDEX
+                                    FileIndexOptions.FILE_INDEX
                                             + "."
                                             + BloomFilterFileIndex.BLOOM_FILTER
                                             + ".index_column2.items",
                                     "150");
                             options.set(
-                                    CoreOptions.FILE_INDEX
+                                    FileIndexOptions.FILE_INDEX
                                             + "."
                                             + BloomFilterFileIndex.BLOOM_FILTER
                                             + ".index_column3.items",
                                     "150");
-                            options.set(
-                                    
CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "500 B");
+                            
options.set(FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "500 B");
                         });
 
         StreamTableWrite write = table.newWrite(commitUser);
@@ -438,13 +442,13 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
                         rowType,
                         options -> {
                             options.set(
-                                    CoreOptions.FILE_INDEX
+                                    FileIndexOptions.FILE_INDEX
                                             + "."
                                             + BloomFilterFileIndex.BLOOM_FILTER
                                             + "."
                                             + CoreOptions.COLUMNS,
                                     "index_column, index_column2, 
index_column3");
-                            
options.set(CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "50 B");
+                            
options.set(FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "50 B");
                         });
 
         StreamTableWrite write = table.newWrite(commitUser);
@@ -479,6 +483,147 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         reader.forEachRemaining(row -> 
assertThat(row.getString(1).toString()).isEqualTo("b"));
     }
 
+    @Test
+    public void testBloomFilterForMapField() throws Exception {
+        RowType rowType =
+                RowType.builder()
+                        .field("id", DataTypes.INT())
+                        .field("index_column", DataTypes.STRING())
+                        .field("index_column2", DataTypes.INT())
+                        .field(
+                                "index_column3",
+                                DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()))
+                        .build();
+        // in unaware-bucket mode, we split files into splits all the time
+        FileStoreTable table =
+                createUnawareBucketFileStoreTable(
+                        rowType,
+                        options -> {
+                            options.set(
+                                    FileIndexOptions.FILE_INDEX
+                                            + "."
+                                            + BloomFilterFileIndex.BLOOM_FILTER
+                                            + "."
+                                            + CoreOptions.COLUMNS,
+                                    "index_column, index_column2, 
index_column3[a], index_column3[b], index_column3[c], index_column3[d]");
+                            options.set(
+                                    FileIndexOptions.FILE_INDEX
+                                            + "."
+                                            + BloomFilterFileIndex.BLOOM_FILTER
+                                            + ".index_column.items",
+                                    "150");
+                            options.set(
+                                    FileIndexOptions.FILE_INDEX
+                                            + "."
+                                            + BloomFilterFileIndex.BLOOM_FILTER
+                                            + ".index_column2.items",
+                                    "150");
+                            options.set(
+                                    FileIndexOptions.FILE_INDEX
+                                            + "."
+                                            + BloomFilterFileIndex.BLOOM_FILTER
+                                            + ".index_column3.items",
+                                    "150");
+                            options.set(
+                                    FileIndexOptions.FILE_INDEX
+                                            + "."
+                                            + BloomFilterFileIndex.BLOOM_FILTER
+                                            + ".index_column3[a].items",
+                                    "10000");
+                        });
+
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+        List<CommitMessage> result = new ArrayList<>();
+        write.write(
+                GenericRow.of(
+                        1,
+                        BinaryString.fromString("a"),
+                        2,
+                        new GenericMap(
+                                new HashMap<BinaryString, BinaryString>() {
+                                    {
+                                        put(
+                                                BinaryString.fromString("a"),
+                                                
BinaryString.fromString("10086"));
+                                        put(
+                                                BinaryString.fromString("b"),
+                                                
BinaryString.fromString("1008611"));
+                                        put(
+                                                BinaryString.fromString("c"),
+                                                
BinaryString.fromString("1008612"));
+                                        put(
+                                                BinaryString.fromString("d"),
+                                                
BinaryString.fromString("1008613"));
+                                    }
+                                })));
+        write.write(
+                GenericRow.of(
+                        1,
+                        BinaryString.fromString("c"),
+                        2,
+                        new GenericMap(
+                                new HashMap<BinaryString, BinaryString>() {
+                                    {
+                                        put(
+                                                BinaryString.fromString("a"),
+                                                
BinaryString.fromString("我是一个粉刷匠"));
+                                        put(
+                                                BinaryString.fromString("b"),
+                                                
BinaryString.fromString("啦啦啦"));
+                                        put(
+                                                BinaryString.fromString("c"),
+                                                
BinaryString.fromString("快乐的粉刷匠"));
+                                        put(
+                                                BinaryString.fromString("d"),
+                                                
BinaryString.fromString("大风大雨去刷墙"));
+                                    }
+                                })));
+        result.addAll(write.prepareCommit(true, 0));
+        write.write(
+                GenericRow.of(
+                        1,
+                        BinaryString.fromString("b"),
+                        2,
+                        new GenericMap(
+                                new HashMap<BinaryString, BinaryString>() {
+                                    {
+                                        put(
+                                                BinaryString.fromString("a"),
+                                                BinaryString.fromString("I am 
a good girl"));
+                                        put(
+                                                BinaryString.fromString("b"),
+                                                BinaryString.fromString("A 
good girl"));
+                                        put(
+                                                BinaryString.fromString("c"),
+                                                BinaryString.fromString("Good 
girl"));
+                                        put(
+                                                BinaryString.fromString("d"),
+                                                
BinaryString.fromString("Girl"));
+                                    }
+                                })));
+        result.addAll(write.prepareCommit(true, 0));
+        commit.commit(0, result);
+        result.clear();
+        Predicate predicate =
+                new LeafPredicate(
+                        Equal.INSTANCE,
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
+                        3,
+                        "index_column3[a]",
+                        Collections.singletonList(BinaryString.fromString("I 
am a good girl")));
+        TableScan.Plan plan = table.newScan().withFilter(predicate).plan();
+        List<DataFileMeta> metas =
+                plan.splits().stream()
+                        .flatMap(split -> ((DataSplit) 
split).dataFiles().stream())
+                        .collect(Collectors.toList());
+        assertThat(metas.size()).isEqualTo(2);
+
+        RecordReader<InternalRow> reader =
+                
table.newRead().withFilter(predicate).createReader(plan.splits());
+        reader.forEachRemaining(row -> 
assertThat(row.getString(1).toString()).isEqualTo("b"));
+    }
+
     @Test
     public void testStreamingProjection() throws Exception {
         writeData();

Reply via email to