This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a74f49d2a [core] Enable file index for map type with map-keys (#3221)
a74f49d2a is described below
commit a74f49d2aae7d62805f7e87b18d45075dcbddbf9
Author: YeJunHao <[email protected]>
AuthorDate: Fri Apr 26 09:43:07 2024 +0800
[core] Enable file index for map type with map-keys (#3221)
---
.../main/java/org/apache/paimon/CoreOptions.java | 70 +-------
.../{FileIndexWriter.java => FileIndexCommon.java} | 25 ++-
.../apache/paimon/fileindex/FileIndexFormat.java | 100 ++++++-----
.../apache/paimon/fileindex/FileIndexOptions.java | 196 +++++++++++++++++++--
.../paimon/fileindex/FileIndexPredicate.java | 6 +-
.../apache/paimon/fileindex/FileIndexReader.java | 28 +--
.../apache/paimon/fileindex/FileIndexWriter.java | 19 +-
.../bloomfilter/BloomFilterFileIndex.java | 8 +-
.../fileindex/empty/EmptyFileIndexReader.java | 71 ++++++++
.../java/org/apache/paimon/options/Options.java | 7 +
.../fileindex/FileIndexFormatFormatTest.java | 35 +++-
.../java/org/apache/paimon/io/FileIndexWriter.java | 172 +++++++++++++++---
.../paimon/table/AppendOnlyFileStoreTableTest.java | 161 ++++++++++++++++-
13 files changed, 714 insertions(+), 184 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 86bcb91fd..d883a0107 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1738,75 +1738,7 @@ public class CoreOptions implements Serializable {
}
public FileIndexOptions indexColumnsOptions() {
- String fileIndexPrefix = FILE_INDEX + ".";
- String fileIndexColumnSuffix = "." + COLUMNS;
-
- FileIndexOptions fileIndexOptions = new
FileIndexOptions(fileIndexInManifestThreshold());
- for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
- String key = entry.getKey();
- if (key.startsWith(fileIndexPrefix)) {
- // start with file-index, decode this option
- if (key.endsWith(fileIndexColumnSuffix)) {
- // if end with .column, set up indexes
- String indexType =
- key.substring(
- fileIndexPrefix.length(),
- key.length() -
fileIndexColumnSuffix.length());
- String[] names = entry.getValue().split(",");
- for (String name : names) {
- if (StringUtils.isBlank(name)) {
- throw new IllegalArgumentException(
- "Wrong option in " + key + ", should not
have empty column");
- }
- fileIndexOptions.computeIfAbsent(name.trim(),
indexType);
- }
- } else {
- // else, it must be an option
- String[] kv =
key.substring(fileIndexPrefix.length()).split("\\.");
- if (kv.length != 3) {
- continue;
- }
- String indexType = kv[0];
- String cname = kv[1];
- String opkey = kv[2];
-
- if (fileIndexOptions.get(cname, indexType) == null) {
- // if indexes have not set, find .column in options,
then set them
- String columns =
- options.get(fileIndexPrefix + indexType +
fileIndexColumnSuffix);
- if (columns == null) {
- continue;
- }
- String[] names = columns.split(",");
- boolean foundTarget = false;
- for (String name : names) {
- if (StringUtils.isBlank(name)) {
- throw new IllegalArgumentException(
- "Wrong option in "
- + key
- + ", should not have empty
column");
- }
- String tname = name.trim();
- if (cname.equals(tname)) {
- foundTarget = true;
- }
- fileIndexOptions.computeIfAbsent(name.trim(),
indexType);
- }
- if (!foundTarget) {
- throw new IllegalArgumentException(
- "Wrong option in "
- + key
- + ", can't found column "
- + cname
- + " in "
- + columns);
- }
- }
- fileIndexOptions.get(cname, indexType).set(opkey,
entry.getValue());
- }
- }
- }
- return fileIndexOptions;
+ return new FileIndexOptions(this);
}
public long fileIndexInManifestThreshold() {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexCommon.java
similarity index 50%
copy from
paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
copy to
paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexCommon.java
index 9eab19cde..3b8d0c793 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexCommon.java
@@ -18,10 +18,27 @@
package org.apache.paimon.fileindex;
-/** To write file index. */
-public interface FileIndexWriter {
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.MapType;
- void write(Object key);
+import java.util.Map;
+import java.util.Optional;
- byte[] serializedBytes();
+/** Common function of file index put here. */
+public class FileIndexCommon {
+
+ public static String toMapKey(String mapColumnName, String keyName) {
+ return mapColumnName + "[" + keyName + "]";
+ }
+
+ public static DataType getFieldType(Map<String, DataField> fields, String
columnsName) {
+ Optional<Integer> topLevelIndex =
FileIndexOptions.topLevelIndexOfNested(columnsName);
+ if (topLevelIndex.isPresent()) {
+ return ((MapType) fields.get(columnsName.substring(0,
topLevelIndex.get())).type())
+ .getValueType();
+ } else {
+ return fields.get(columnsName).type();
+ }
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
index 4f03faf90..9d4a97b1a 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
@@ -19,6 +19,7 @@
package org.apache.paimon.fileindex;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fileindex.empty.EmptyFileIndexReader;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
@@ -37,9 +38,9 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -97,6 +98,7 @@ import java.util.stream.Collectors;
public final class FileIndexFormat {
private static final long MAGIC = 1493475289347502L;
+ private static final int EMPTY_INDEX_FLAG = -1;
enum Version {
V_1(1);
@@ -145,9 +147,15 @@ public final class FileIndexFormat {
Map<String, byte[]> bytesMap = columnMap.getValue();
for (Map.Entry<String, byte[]> entry : bytesMap.entrySet()) {
int startPosition = baos.size();
- baos.write(entry.getValue());
- innerMap.put(
- entry.getKey(), Pair.of(startPosition, baos.size()
- startPosition));
+ byte[] v = entry.getValue();
+ if (v == null) {
+ innerMap.put(entry.getKey(), Pair.of(EMPTY_INDEX_FLAG,
0));
+ } else {
+ baos.write(entry.getValue());
+ innerMap.put(
+ entry.getKey(),
+ Pair.of(startPosition, baos.size() -
startPosition));
+ }
}
}
byte[] body = baos.toByteArray();
@@ -180,7 +188,9 @@ public final class FileIndexFormat {
for (Map.Entry<String, Pair<Integer, Integer>> indexEntry :
entry.getValue().entrySet()) {
dataOutputStream.writeUTF(indexEntry.getKey());
- dataOutputStream.writeInt(indexEntry.getValue().getLeft()
+ headLength);
+ int start = indexEntry.getValue().getLeft();
+ dataOutputStream.writeInt(
+ start == EMPTY_INDEX_FLAG ? EMPTY_INDEX_FLAG :
start + headLength);
dataOutputStream.writeInt(indexEntry.getValue().getRight());
}
}
@@ -275,55 +285,59 @@ public final class FileIndexFormat {
}
}
- public List<FileIndexReader> readColumnIndex(String columnName) {
+ public Set<FileIndexReader> readColumnIndex(String columnName) {
return Optional.ofNullable(header.getOrDefault(columnName, null))
.map(
f ->
- f.keySet().stream()
+ f.entrySet().stream()
.map(
- indexType ->
-
readColumnIndex(columnName, indexType))
- .collect(Collectors.toList()))
- .orElse(Collections.emptyList());
+ entry ->
+ getFileIndexReader(
+ columnName,
+
entry.getKey(),
+
entry.getValue()))
+ .collect(Collectors.toSet()))
+ .orElse(Collections.emptySet());
}
- public FileIndexReader readColumnIndex(String columnName, String
indexType) {
+ private FileIndexReader getFileIndexReader(
+ String columnName, String indexType, Pair<Integer, Integer>
startAndLength) {
+ if (startAndLength.getLeft() == EMPTY_INDEX_FLAG) {
+ return EmptyFileIndexReader.INSTANCE;
+ }
+ return FileIndexer.create(
+ indexType,
+ FileIndexCommon.getFieldType(fields, columnName),
+ new Options())
+ .createReader(getBytesWithStartAndLength(startAndLength));
+ }
- return readColumnInputStream(columnName, indexType)
- .map(
- serializedBytes ->
- FileIndexer.create(
- indexType,
-
fields.get(columnName).type(),
- new Options())
- .createReader(serializedBytes))
- .orElse(null);
+ private byte[] getBytesWithStartAndLength(Pair<Integer, Integer>
startAndLength) {
+ byte[] b = new byte[startAndLength.getRight()];
+ try {
+ seekableInputStream.seek(startAndLength.getLeft());
+ int n = 0;
+ int len = b.length;
+ // read fully until b is full else throw.
+ while (n < len) {
+ int count = seekableInputStream.read(b, n, len - n);
+ if (count < 0) {
+ throw new EOFException();
+ }
+ n += count;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return b;
}
@VisibleForTesting
- Optional<byte[]> readColumnInputStream(String columnName, String
indexType) {
+ // only for test yet
+ Optional<byte[]> getBytesWithNameAndType(String columnName, String
indexType) {
return Optional.ofNullable(header.getOrDefault(columnName, null))
- .map(m -> m.getOrDefault(indexType, null))
- .map(
- startAndLength -> {
- byte[] b = new byte[startAndLength.getRight()];
- try {
-
seekableInputStream.seek(startAndLength.getLeft());
- int n = 0;
- int len = b.length;
- // read fully until b is full else throw.
- while (n < len) {
- int count =
seekableInputStream.read(b, n, len - n);
- if (count < 0) {
- throw new EOFException();
- }
- n += count;
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return b;
- });
+ .map(i -> i.getOrDefault(indexType, null))
+ .map(this::getBytesWithStartAndLength);
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java
index e8751245c..a7eb7b952 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java
@@ -20,41 +20,153 @@ package org.apache.paimon.fileindex;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.StringUtils;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/** Options of file index column. */
public class FileIndexOptions {
+ public static final String FILE_INDEX = "file-index";
+
+ public static final String COLUMNS = "columns";
+
// if the filter size greater than fileIndexInManifestThreshold, we put it
in file
private final long fileIndexInManifestThreshold;
- private final Map<String, Map<String, Options>> indexTypeOptions;
+ private final Map<Column, Map<String, Options>> indexTypeOptions;
+ private final Map<Column, Map<String, Options>> topLevelMapColumnOptions;
public FileIndexOptions() {
-
this(CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.defaultValue().getBytes());
+ this(new CoreOptions(new Options()));
}
- public FileIndexOptions(long fileIndexInManifestThreshold) {
+ public FileIndexOptions(CoreOptions coreOptions) {
this.indexTypeOptions = new HashMap<>();
- this.fileIndexInManifestThreshold = fileIndexInManifestThreshold;
+ this.topLevelMapColumnOptions = new HashMap<>();
+ this.fileIndexInManifestThreshold =
coreOptions.fileIndexInManifestThreshold();
+ setupOptions(coreOptions);
}
- public void computeIfAbsent(String column, String indexType) {
- indexTypeOptions
- .computeIfAbsent(column, c -> new HashMap<>())
- .computeIfAbsent(indexType, i -> new Options());
+ private void setupOptions(CoreOptions coreOptions) {
+ String fileIndexPrefix = FILE_INDEX + ".";
+ String fileIndexColumnSuffix = "." + COLUMNS;
+
+ Map<String, String> optionMap = new HashMap<>();
+ // find the column to be indexed.
+ for (Map.Entry<String, String> entry : coreOptions.toMap().entrySet())
{
+ String key = entry.getKey();
+ if (key.startsWith(fileIndexPrefix)) {
+ // start with file-index, decode this option
+ if (key.endsWith(fileIndexColumnSuffix)) {
+ // if end with .column, set up indexes
+ String indexType =
+ key.substring(
+ fileIndexPrefix.length(),
+ key.length() -
fileIndexColumnSuffix.length());
+ String[] names = entry.getValue().split(",");
+ for (String name : names) {
+ if (StringUtils.isBlank(name)) {
+ throw new IllegalArgumentException(
+ "Wrong option in " + key + ", should not
have empty column");
+ }
+ computeIfAbsent(name.trim(), indexType);
+ }
+ } else {
+ optionMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ // fill out the options
+ for (Map.Entry<String, String> optionEntry : optionMap.entrySet()) {
+ String key = optionEntry.getKey();
+
+ String[] kv = key.substring(fileIndexPrefix.length()).split("\\.");
+ if (kv.length != 3) {
+ // just ignore options those are not expected
+ continue;
+ }
+ String indexType = kv[0];
+ String cname = kv[1];
+ String opkey = kv[2];
+
+ // if reaches here, must be an option.
+ if (get(cname, indexType) != null) {
+ get(cname, indexType).set(opkey, optionEntry.getValue());
+ } else if (getMapTopLevelOptions(cname, indexType) != null) {
+ getMapTopLevelOptions(cname, indexType).set(opkey,
optionEntry.getValue());
+ } else {
+ throw new IllegalArgumentException(
+ "Wrong option in \""
+ + key
+ + "\", can't found column \""
+ + cname
+ + "\" in \""
+ + fileIndexPrefix
+ + indexType
+ + fileIndexColumnSuffix
+ + "\"");
+ }
+ }
}
- public Options get(String column, String indexType) {
- return Optional.ofNullable(indexTypeOptions.getOrDefault(column, null))
+ private void computeIfAbsent(String column, String indexType) {
+ Optional<Integer> nestedColumnPosition = topLevelIndexOfNested(column);
+ if (nestedColumnPosition.isPresent()) {
+ int position = nestedColumnPosition.get();
+ String columnName = column.substring(0, position);
+ String nestedName = column.substring(position + 1, column.length()
- 1);
+
+ indexTypeOptions
+ .computeIfAbsent(new Column(columnName, nestedName), c ->
new HashMap<>())
+ .computeIfAbsent(indexType, i -> new Options());
+ topLevelMapColumnOptions
+ .computeIfAbsent(new Column(columnName), c -> new
HashMap<>())
+ .computeIfAbsent(indexType, i -> new Options());
+ } else {
+ indexTypeOptions
+ .computeIfAbsent(new Column(column), c -> new HashMap<>())
+ .computeIfAbsent(indexType, i -> new Options());
+ }
+ }
+
+ private Options get(String column, String indexType) {
+ Optional<Integer> nestedColumnPosition = topLevelIndexOfNested(column);
+
+ Column columnKey;
+ if (nestedColumnPosition.isPresent()) {
+ int position = nestedColumnPosition.get();
+ String columnName = column.substring(0, position);
+ String nestedName = column.substring(position + 1, column.length()
- 1);
+
+ columnKey = new Column(columnName, nestedName);
+ } else {
+ columnKey = new Column(column);
+ }
+
+ return Optional.ofNullable(indexTypeOptions.getOrDefault(columnKey,
null))
.map(x -> x.get(indexType))
.orElse(null);
}
+ public Options getMapTopLevelOptions(String column, String indexType) {
+ return Optional.ofNullable(topLevelMapColumnOptions.getOrDefault(new
Column(column), null))
+ .map(x -> x.get(indexType))
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ "Can't find top level column options
for map type: "
+ + column
+ + " "
+ + indexType));
+ }
+
public boolean isEmpty() {
return indexTypeOptions.isEmpty();
}
@@ -63,7 +175,69 @@ public class FileIndexOptions {
return fileIndexInManifestThreshold;
}
- public Set<Map.Entry<String, Map<String, Options>>> entrySet() {
+ public Set<Map.Entry<Column, Map<String, Options>>> entrySet() {
return indexTypeOptions.entrySet();
}
+
+ public static Optional<Integer> topLevelIndexOfNested(String column) {
+ int start = column.indexOf('[');
+ if (start != -1 && column.endsWith("]")) {
+ return Optional.of(start);
+ }
+ return Optional.empty();
+ }
+
+ /** Column to be file indexed. */
+ public static class Column {
+
+ private final String columnName;
+ private final String nestedColumnName;
+ private final boolean isNestedColumn;
+
+ public Column(String columnName) {
+ this.columnName = columnName;
+ this.nestedColumnName = null;
+ this.isNestedColumn = false;
+ }
+
+ public Column(String columnName, String nestedColumnName) {
+ this.columnName = columnName;
+ this.nestedColumnName = nestedColumnName;
+ this.isNestedColumn = true;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public String getNestedColumnName() {
+ if (!isNestedColumn) {
+ throw new RuntimeException(
+ "Column " + columnName + " is not nested column in
options.");
+ }
+ return nestedColumnName;
+ }
+
+ public boolean isNestedColumn() {
+ return isNestedColumn;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(new Object[] {columnName, nestedColumnName,
isNestedColumn});
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof Column)) {
+ return false;
+ }
+
+ Column that = (Column) obj;
+
+ return Objects.equals(columnName, that.columnName)
+ && Objects.equals(nestedColumnName, that.nestedColumnName)
+ && isNestedColumn == that.isNestedColumn;
+ }
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
index 06a691848..31672a8b7 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -116,9 +117,10 @@ public class FileIndexPredicate implements Closeable {
private static class FileIndexFieldPredicate implements
PredicateVisitor<Boolean> {
private final String columnName;
- private final List<FileIndexReader> fileIndexReaders;
+ private final Collection<FileIndexReader> fileIndexReaders;
- public FileIndexFieldPredicate(String columnName,
List<FileIndexReader> fileIndexReaders) {
+ public FileIndexFieldPredicate(
+ String columnName, Collection<FileIndexReader>
fileIndexReaders) {
this.columnName = columnName;
this.fileIndexReaders = fileIndexReaders;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
index 8a881b4b8..f4708c2ee 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
@@ -27,55 +27,55 @@ import java.util.List;
* Read file index from serialized bytes. Return true, means we need to search
this file, else means
* needn't.
*/
-public interface FileIndexReader extends FunctionVisitor<Boolean> {
+public abstract class FileIndexReader implements FunctionVisitor<Boolean> {
@Override
- default Boolean visitIsNotNull(FieldRef fieldRef) {
+ public Boolean visitIsNotNull(FieldRef fieldRef) {
return true;
}
@Override
- default Boolean visitIsNull(FieldRef fieldRef) {
+ public Boolean visitIsNull(FieldRef fieldRef) {
return true;
}
@Override
- default Boolean visitStartsWith(FieldRef fieldRef, Object literal) {
+ public Boolean visitStartsWith(FieldRef fieldRef, Object literal) {
return true;
}
@Override
- default Boolean visitLessThan(FieldRef fieldRef, Object literal) {
+ public Boolean visitLessThan(FieldRef fieldRef, Object literal) {
return true;
}
@Override
- default Boolean visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
+ public Boolean visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
return true;
}
@Override
- default Boolean visitNotEqual(FieldRef fieldRef, Object literal) {
+ public Boolean visitNotEqual(FieldRef fieldRef, Object literal) {
return true;
}
@Override
- default Boolean visitLessOrEqual(FieldRef fieldRef, Object literal) {
+ public Boolean visitLessOrEqual(FieldRef fieldRef, Object literal) {
return true;
}
@Override
- default Boolean visitEqual(FieldRef fieldRef, Object literal) {
+ public Boolean visitEqual(FieldRef fieldRef, Object literal) {
return true;
}
@Override
- default Boolean visitGreaterThan(FieldRef fieldRef, Object literal) {
+ public Boolean visitGreaterThan(FieldRef fieldRef, Object literal) {
return true;
}
@Override
- default Boolean visitIn(FieldRef fieldRef, List<Object> literals) {
+ public Boolean visitIn(FieldRef fieldRef, List<Object> literals) {
for (Object key : literals) {
if (visitEqual(fieldRef, key)) {
return true;
@@ -85,7 +85,7 @@ public interface FileIndexReader extends
FunctionVisitor<Boolean> {
}
@Override
- default Boolean visitNotIn(FieldRef fieldRef, List<Object> literals) {
+ public Boolean visitNotIn(FieldRef fieldRef, List<Object> literals) {
for (Object key : literals) {
if (visitNotEqual(fieldRef, key)) {
return true;
@@ -95,12 +95,12 @@ public interface FileIndexReader extends
FunctionVisitor<Boolean> {
}
@Override
- default Boolean visitAnd(List<Boolean> children) {
+ public Boolean visitAnd(List<Boolean> children) {
throw new UnsupportedOperationException("Should not invoke this");
}
@Override
- default Boolean visitOr(List<Boolean> children) {
+ public Boolean visitOr(List<Boolean> children) {
throw new UnsupportedOperationException("Should not invoke this");
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
index 9eab19cde..57b583f9b 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
@@ -19,9 +19,22 @@
package org.apache.paimon.fileindex;
/** To write file index. */
-public interface FileIndexWriter {
+public abstract class FileIndexWriter {
- void write(Object key);
+ private boolean empty = true;
- byte[] serializedBytes();
+ public void writeRecord(Object key) {
+ if (key != null) {
+ empty = false;
+ write(key);
+ }
+ }
+
+ public abstract void write(Object key);
+
+ public abstract byte[] serializedBytes();
+
+ public boolean empty() {
+ return empty;
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
index 37ba4d205..48f109a63 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
@@ -71,7 +71,7 @@ public class BloomFilterFileIndex implements FileIndexer {
return new Reader(dataType, serializedBytes);
}
- private static class Writer implements FileIndexWriter {
+ private static class Writer extends FileIndexWriter {
private final BloomFilter64 filter;
private final FastHash hashFunction;
@@ -83,9 +83,7 @@ public class BloomFilterFileIndex implements FileIndexer {
@Override
public void write(Object key) {
- if (key != null) {
- filter.addHash(hashFunction.hash(key));
- }
+ filter.addHash(hashFunction.hash(key));
}
@Override
@@ -102,7 +100,7 @@ public class BloomFilterFileIndex implements FileIndexer {
}
}
- private static class Reader implements FileIndexReader {
+ private static class Reader extends FileIndexReader {
private final BloomFilter64 filter;
private final FastHash hashFunction;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/empty/EmptyFileIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/empty/EmptyFileIndexReader.java
new file mode 100644
index 000000000..cf2eb6812
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/empty/EmptyFileIndexReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.empty;
+
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.predicate.FieldRef;
+
+import java.util.List;
+
+/** Empty file index which has no writer and no serialized bytes. */
+public class EmptyFileIndexReader extends FileIndexReader {
+
+ /** No data in the file index, which mean this file has no related
records. */
+ public static final EmptyFileIndexReader INSTANCE = new
EmptyFileIndexReader();
+
+ @Override
+ public Boolean visitEqual(FieldRef fieldRef, Object literal) {
+ return false;
+ }
+
+ @Override
+ public Boolean visitIsNotNull(FieldRef fieldRef) {
+ return false;
+ }
+
+ @Override
+ public Boolean visitStartsWith(FieldRef fieldRef, Object literal) {
+ return false;
+ }
+
+ @Override
+ public Boolean visitLessThan(FieldRef fieldRef, Object literal) {
+ return false;
+ }
+
+ @Override
+ public Boolean visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
+ return false;
+ }
+
+ @Override
+ public Boolean visitLessOrEqual(FieldRef fieldRef, Object literal) {
+ return false;
+ }
+
+ @Override
+ public Boolean visitGreaterThan(FieldRef fieldRef, Object literal) {
+ return false;
+ }
+
+ @Override
+ public Boolean visitIn(FieldRef fieldRef, List<Object> literals) {
+ return false;
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/Options.java
b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
index 161eef067..b52676420 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/Options.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
@@ -62,6 +62,13 @@ public class Options implements Serializable {
map.forEach(this::setString);
}
+ /** Creates a new configuration that is initialized with the options of
the given two maps. */
+ public Options(Map<String, String> map1, Map<String, String> map2) {
+ this();
+ map1.forEach(this::setString);
+ map2.forEach(this::setString);
+ }
+
public static Options fromMap(Map<String, String> map) {
return new Options(map);
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
index 8aaecb152..d9828e75a 100644
---
a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
@@ -18,7 +18,9 @@
package org.apache.paimon.fileindex;
+import org.apache.paimon.fileindex.empty.EmptyFileIndexReader;
import org.apache.paimon.fs.ByteArraySeekableStream;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
@@ -26,6 +28,8 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
@@ -66,10 +70,39 @@ public class FileIndexFormatFormatTest {
String column = entry.getKey();
for (String type : entry.getValue().keySet()) {
byte[] b =
- reader.readColumnInputStream(column, type)
+ reader.getBytesWithNameAndType(column, type)
.orElseThrow(RuntimeException::new);
Assertions.assertThat(b).containsExactly(indexes.get(column).get(type));
}
}
}
+
+ @Test
+ public void testEmptyFileIndex() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FileIndexFormat.Writer writer = FileIndexFormat.createWriter(baos);
+
+ Map<String, Map<String, byte[]>> indexes = new HashMap<>();
+
+ indexes.computeIfAbsent("a", a -> new HashMap<>()).put("b", null);
+ indexes.computeIfAbsent("a", a -> new HashMap<>()).put("c", null);
+
+ writer.writeColumnIndexes(indexes);
+ writer.close();
+
+ byte[] indexBytes = baos.toByteArray();
+
+ FileIndexFormat.Reader reader =
+ FileIndexFormat.createReader(
+ new ByteArraySeekableStream(indexBytes),
+ RowType.builder()
+ .field("a", DataTypes.BYTES())
+ .field("b", DataTypes.STRING())
+ .build());
+
+ Collection<FileIndexReader> fileIndexFormatList =
reader.readColumnIndex("a");
+ Assertions.assertThat(fileIndexFormatList.size()).isEqualTo(1);
+ Assertions.assertThat(new ArrayList<>(fileIndexFormatList).get(0))
+ .isEqualTo(EmptyFileIndexReader.INSTANCE);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
index 6ba635c5e..0ec473fed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
@@ -18,7 +18,10 @@
package org.apache.paimon.io;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexCommon;
import org.apache.paimon.fileindex.FileIndexFormat;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.fileindex.FileIndexer;
@@ -26,6 +29,9 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
@@ -34,7 +40,7 @@ import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -51,7 +57,7 @@ public final class FileIndexWriter implements Closeable {
// if the filter size greater than fileIndexInManifestThreshold, we put it
in file
private final long inManifestThreshold;
- private final List<IndexMaintainer> indexMaintainers = new ArrayList<>();
+ private final Map<String, IndexMaintainer> indexMaintainers = new
HashMap<>();
private String resultFileName;
@@ -69,39 +75,75 @@ public final class FileIndexWriter implements Closeable {
map.put(dataField.name(), dataField);
index.put(dataField.name(),
rowType.getFieldIndex(dataField.name()));
});
- for (Map.Entry<String, Map<String, Options>> entry :
fileIndexOptions.entrySet()) {
- String columnName = entry.getKey();
+ for (Map.Entry<FileIndexOptions.Column, Map<String, Options>> entry :
+ fileIndexOptions.entrySet()) {
+ FileIndexOptions.Column entryColumn = entry.getKey();
+ String columnName = entryColumn.getColumnName();
DataField field = map.get(columnName);
if (field == null) {
throw new IllegalArgumentException(columnName + " does not
exist in column fields");
}
+
for (Map.Entry<String, Options> typeEntry :
entry.getValue().entrySet()) {
String indexType = typeEntry.getKey();
- indexMaintainers.add(
- new IndexMaintainer(
- columnName,
- indexType,
- FileIndexer.create(indexType, field.type(),
typeEntry.getValue())
- .createWriter(),
- InternalRow.createFieldGetter(
- field.type(), index.get(columnName))));
+ if (entryColumn.isNestedColumn()) {
+ if (field.type().getTypeRoot() != DataTypeRoot.MAP) {
+ throw new IllegalArgumentException(
+ "Column "
+ + columnName
+ + " is nested column, but is not map
type. Only should map type yet.");
+ }
+ MapType mapType = (MapType) field.type();
+ ((MapFileIndexMaintainer)
+ indexMaintainers.computeIfAbsent(
+ columnName,
+ name ->
+ new MapFileIndexMaintainer(
+ columnName,
+ indexType,
+
mapType.getKeyType(),
+
mapType.getValueType(),
+
fileIndexOptions.getMapTopLevelOptions(
+
columnName, typeEntry.getKey()),
+
index.get(columnName))))
+ .add(entryColumn.getNestedColumnName(),
typeEntry.getValue());
+ } else {
+ indexMaintainers.computeIfAbsent(
+ columnName,
+ name ->
+ new FileIndexMaintainer(
+ columnName,
+ indexType,
+ FileIndexer.create(
+ indexType,
+ field.type(),
+
typeEntry.getValue())
+ .createWriter(),
+ InternalRow.createFieldGetter(
+ field.type(),
index.get(columnName))));
+ }
}
}
this.inManifestThreshold =
fileIndexOptions.fileIndexInManifestThreshold();
}
public void write(InternalRow row) {
- indexMaintainers.forEach(indexMaintainer ->
indexMaintainer.write(row));
+ indexMaintainers
+ .values()
+ .forEach(mapFileIndexMaintainer ->
mapFileIndexMaintainer.write(row));
}
@Override
public void close() throws IOException {
Map<String, Map<String, byte[]>> indexMaps = new HashMap<>();
- for (IndexMaintainer indexMaintainer : indexMaintainers) {
- indexMaps
- .computeIfAbsent(indexMaintainer.getColumnName(), k -> new
HashMap<>())
- .put(indexMaintainer.getIndexType(),
indexMaintainer.serializedBytes());
+ for (IndexMaintainer indexMaintainer : indexMaintainers.values()) {
+ Map<String, byte[]> mapBytes = indexMaintainer.serializedBytes();
+ for (Map.Entry<String, byte[]> entry : mapBytes.entrySet()) {
+ indexMaps
+ .computeIfAbsent(entry.getKey(), k -> new HashMap<>())
+ .put(indexMaintainer.getIndexType(), entry.getValue());
+ }
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -156,15 +198,24 @@ public final class FileIndexWriter implements Closeable {
}
}
+ interface IndexMaintainer {
+
+ void write(InternalRow row);
+
+ String getIndexType();
+
+ Map<String, byte[]> serializedBytes();
+ }
+
/** One index maintainer for one column. */
- private static class IndexMaintainer {
+ private static class FileIndexMaintainer implements IndexMaintainer {
private final String columnName;
private final String indexType;
private final org.apache.paimon.fileindex.FileIndexWriter
fileIndexWriter;
private final InternalRow.FieldGetter getter;
- public IndexMaintainer(
+ public FileIndexMaintainer(
String columnName,
String indexType,
org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter,
@@ -176,19 +227,92 @@ public final class FileIndexWriter implements Closeable {
}
public void write(InternalRow row) {
- fileIndexWriter.write(getter.getFieldOrNull(row));
+ fileIndexWriter.writeRecord(getter.getFieldOrNull(row));
}
public String getIndexType() {
return indexType;
}
- public String getColumnName() {
- return columnName;
+ public Map<String, byte[]> serializedBytes() {
+ return Collections.singletonMap(columnName,
fileIndexWriter.serializedBytes());
+ }
+ }
+
+ /** File index writer for map data type. */
+ private static class MapFileIndexMaintainer implements IndexMaintainer {
+
+ private final String columnName;
+ private final String indexType;
+ private final Options options;
+ private final DataType valueType;
+ private final Map<String, org.apache.paimon.fileindex.FileIndexWriter>
indexWritersMap;
+ private final InternalArray.ElementGetter valueElementGetter;
+ private final int position;
+
+ public MapFileIndexMaintainer(
+ String columnName,
+ String indexType,
+ DataType keyType,
+ DataType valueType,
+ Options options,
+ int position) {
+ this.columnName = columnName;
+ this.indexType = indexType;
+ this.valueType = valueType;
+ this.options = options;
+ this.position = position;
+ this.indexWritersMap = new HashMap<>();
+ this.valueElementGetter =
InternalArray.createElementGetter(valueType);
+
+ DataTypeRoot rootType = keyType.getTypeRoot();
+ if (rootType != DataTypeRoot.CHAR && rootType !=
DataTypeRoot.VARCHAR) {
+ throw new IllegalArgumentException(
+ "Only support map data type with key field of
CHAR、VARCHAR、STRING.");
+ }
+ }
+
+ public void write(InternalRow row) {
+ InternalMap internalMap = row.getMap(position);
+ InternalArray keyArray = internalMap.keyArray();
+ InternalArray valueArray = internalMap.valueArray();
+
+ for (int i = 0; i < keyArray.size(); i++) {
+ String key = keyArray.getString(i).toString();
+ org.apache.paimon.fileindex.FileIndexWriter writer =
+ indexWritersMap.getOrDefault(key, null);
+ if (writer != null) {
+
writer.writeRecord(valueElementGetter.getElementOrNull(valueArray, i));
+ }
+ }
+ }
+
+ public void add(String nestedKey, Options nestedOptions) {
+ indexWritersMap.put(
+ nestedKey,
+ FileIndexer.create(
+ indexType,
+ valueType,
+ new Options(options.toMap(),
nestedOptions.toMap()))
+ .createWriter());
+ }
+
+ public String getIndexType() {
+ return indexType;
}
- public byte[] serializedBytes() {
- return fileIndexWriter.serializedBytes();
+ public Map<String, byte[]> serializedBytes() {
+ Map<String, byte[]> result = new HashMap<>();
+ indexWritersMap.forEach(
+ (k, v) -> {
+ if (!v.empty()) {
+ result.put(
+ FileIndexCommon.toMapKey(columnName, k),
v.serializedBytes());
+ } else {
+ result.put(FileIndexCommon.toMapKey(columnName,
k), null);
+ }
+ });
+ return result;
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 6893649d5..73a9b96be 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -21,15 +21,19 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndex;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
@@ -63,6 +67,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD;
import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket;
import static
org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode;
import static org.assertj.core.api.Assertions.assertThat;
@@ -371,32 +376,31 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
rowType,
options -> {
options.set(
- CoreOptions.FILE_INDEX
+ FileIndexOptions.FILE_INDEX
+ "."
+ BloomFilterFileIndex.BLOOM_FILTER
+ "."
+ CoreOptions.COLUMNS,
"index_column, index_column2,
index_column3");
options.set(
- CoreOptions.FILE_INDEX
+ FileIndexOptions.FILE_INDEX
+ "."
+ BloomFilterFileIndex.BLOOM_FILTER
+ ".index_column.items",
"150");
options.set(
- CoreOptions.FILE_INDEX
+ FileIndexOptions.FILE_INDEX
+ "."
+ BloomFilterFileIndex.BLOOM_FILTER
+ ".index_column2.items",
"150");
options.set(
- CoreOptions.FILE_INDEX
+ FileIndexOptions.FILE_INDEX
+ "."
+ BloomFilterFileIndex.BLOOM_FILTER
+ ".index_column3.items",
"150");
- options.set(
-
CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "500 B");
+
options.set(FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "500 B");
});
StreamTableWrite write = table.newWrite(commitUser);
@@ -438,13 +442,13 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
rowType,
options -> {
options.set(
- CoreOptions.FILE_INDEX
+ FileIndexOptions.FILE_INDEX
+ "."
+ BloomFilterFileIndex.BLOOM_FILTER
+ "."
+ CoreOptions.COLUMNS,
"index_column, index_column2,
index_column3");
-
options.set(CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "50 B");
+
options.set(FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "50 B");
});
StreamTableWrite write = table.newWrite(commitUser);
@@ -479,6 +483,147 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
reader.forEachRemaining(row ->
assertThat(row.getString(1).toString()).isEqualTo("b"));
}
+ @Test
+ public void testBloomFilterForMapField() throws Exception {
+ RowType rowType =
+ RowType.builder()
+ .field("id", DataTypes.INT())
+ .field("index_column", DataTypes.STRING())
+ .field("index_column2", DataTypes.INT())
+ .field(
+ "index_column3",
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()))
+ .build();
+ // in unaware-bucket mode, we split files into splits all the time
+ FileStoreTable table =
+ createUnawareBucketFileStoreTable(
+ rowType,
+ options -> {
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + BloomFilterFileIndex.BLOOM_FILTER
+ + "."
+ + CoreOptions.COLUMNS,
+ "index_column, index_column2,
index_column3[a], index_column3[b], index_column3[c], index_column3[d]");
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + BloomFilterFileIndex.BLOOM_FILTER
+ + ".index_column.items",
+ "150");
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + BloomFilterFileIndex.BLOOM_FILTER
+ + ".index_column2.items",
+ "150");
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + BloomFilterFileIndex.BLOOM_FILTER
+ + ".index_column3.items",
+ "150");
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + BloomFilterFileIndex.BLOOM_FILTER
+ + ".index_column3[a].items",
+ "10000");
+ });
+
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ List<CommitMessage> result = new ArrayList<>();
+ write.write(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("a"),
+ 2,
+ new GenericMap(
+ new HashMap<BinaryString, BinaryString>() {
+ {
+ put(
+ BinaryString.fromString("a"),
+
BinaryString.fromString("10086"));
+ put(
+ BinaryString.fromString("b"),
+
BinaryString.fromString("1008611"));
+ put(
+ BinaryString.fromString("c"),
+
BinaryString.fromString("1008612"));
+ put(
+ BinaryString.fromString("d"),
+
BinaryString.fromString("1008613"));
+ }
+ })));
+ write.write(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("c"),
+ 2,
+ new GenericMap(
+ new HashMap<BinaryString, BinaryString>() {
+ {
+ put(
+ BinaryString.fromString("a"),
+
BinaryString.fromString("我是一个粉刷匠"));
+ put(
+ BinaryString.fromString("b"),
+
BinaryString.fromString("啦啦啦"));
+ put(
+ BinaryString.fromString("c"),
+
BinaryString.fromString("快乐的粉刷匠"));
+ put(
+ BinaryString.fromString("d"),
+
BinaryString.fromString("大风大雨去刷墙"));
+ }
+ })));
+ result.addAll(write.prepareCommit(true, 0));
+ write.write(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("b"),
+ 2,
+ new GenericMap(
+ new HashMap<BinaryString, BinaryString>() {
+ {
+ put(
+ BinaryString.fromString("a"),
+ BinaryString.fromString("I am
a good girl"));
+ put(
+ BinaryString.fromString("b"),
+ BinaryString.fromString("A
good girl"));
+ put(
+ BinaryString.fromString("c"),
+ BinaryString.fromString("Good
girl"));
+ put(
+ BinaryString.fromString("d"),
+
BinaryString.fromString("Girl"));
+ }
+ })));
+ result.addAll(write.prepareCommit(true, 0));
+ commit.commit(0, result);
+ result.clear();
+ Predicate predicate =
+ new LeafPredicate(
+ Equal.INSTANCE,
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
+ 3,
+ "index_column3[a]",
+ Collections.singletonList(BinaryString.fromString("I
am a good girl")));
+ TableScan.Plan plan = table.newScan().withFilter(predicate).plan();
+ List<DataFileMeta> metas =
+ plan.splits().stream()
+ .flatMap(split -> ((DataSplit)
split).dataFiles().stream())
+ .collect(Collectors.toList());
+ assertThat(metas.size()).isEqualTo(2);
+
+ RecordReader<InternalRow> reader =
+
table.newRead().withFilter(predicate).createReader(plan.splits());
+ reader.forEachRemaining(row ->
assertThat(row.getString(1).toString()).isEqualTo("b"));
+ }
+
@Test
public void testStreamingProjection() throws Exception {
writeData();