This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5c49750a4 [core] File index format should include all type index and
columns (#3190)
5c49750a4 is described below
commit 5c49750a48f812ec2e483d00d0b49562cad4fa3f
Author: YeJunHao <[email protected]>
AuthorDate: Wed Apr 10 16:47:23 2024 +0800
[core] File index format should include all type index and columns (#3190)
---
.../apache/paimon/fileindex/FileIndexFormat.java | 126 ++++++++++++++-------
.../paimon/fileindex/FileIndexPredicate.java | 22 ++--
.../fileindex/FileIndexFormatFormatTest.java | 25 ++--
3 files changed, 117 insertions(+), 56 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
index a7a3c534e..ec809dff4 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
@@ -35,9 +35,12 @@ import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
/**
* File index file format. Put all column and offset in the header.
@@ -46,13 +49,23 @@ import java.util.Optional;
* _______________________________________ _____________________
* | magic |version|head length |
* |-------------------------------------|
- * | index type |body info size|
+ * | column size |
* |-------------------------------------|
- * | column name 1 |start pos |length |
+ * | column 1 | index size |
+ * |-------------------------------------|
+ * | index name 1 |start pos |length |
+ * |-------------------------------------|
+ * | index name 2 |start pos |length |
+ * |-------------------------------------|
+ * | index name 3 |start pos |length |
* |-------------------------------------| HEAD
- * | column name 2 |start pos |length |
+ * | column 2 | index size |
+ * |-------------------------------------|
+ * | index name 1 |start pos |length |
* |-------------------------------------|
- * | column name 3 |start pos |length |
+ * | index name 2 |start pos |length |
+ * |-------------------------------------|
+ * | index name 3 |start pos |length |
* |-------------------------------------|
* | ... |
* |-------------------------------------|
@@ -118,30 +131,35 @@ public final class FileIndexFormat {
this.dataOutputStream = new DataOutputStream(outputStream);
}
- public void writeColumnIndex(String indexType, Map<String, byte[]>
bytesMap)
+ public void writeColumnIndexes(Map<String, Map<String, byte[]>>
indexes)
throws IOException {
- Map<String, Pair<Integer, Integer>> bodyInfo = new HashMap<>();
+ Map<String, Map<String, Pair<Integer, Integer>>> bodyInfo = new
HashMap<>();
// construct body
ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
- for (Map.Entry<String, byte[]> entry : bytesMap.entrySet()) {
- int startPosition = baos.size();
- baos.write(entry.getValue());
- bodyInfo.put(entry.getKey(), Pair.of(startPosition,
baos.size() - startPosition));
+ for (Map.Entry<String, Map<String, byte[]>> columnMap :
indexes.entrySet()) {
+ Map<String, Pair<Integer, Integer>> innerMap =
+ bodyInfo.computeIfAbsent(columnMap.getKey(), k -> new
HashMap<>());
+ Map<String, byte[]> bytesMap = columnMap.getValue();
+ for (Map.Entry<String, byte[]> entry : bytesMap.entrySet()) {
+ int startPosition = baos.size();
+ baos.write(entry.getValue());
+ innerMap.put(
+ entry.getKey(), Pair.of(startPosition, baos.size()
- startPosition));
+ }
}
byte[] body = baos.toByteArray();
-
- writeHead(indexType, bodyInfo);
+ writeHead(bodyInfo);
// writeBody
dataOutputStream.write(body);
}
- private void writeHead(String indexType, Map<String, Pair<Integer,
Integer>> bodyInfo)
+ private void writeHead(Map<String, Map<String, Pair<Integer,
Integer>>> bodyInfo)
throws IOException {
- int headLength = calculateHeadLength(indexType, bodyInfo);
+ int headLength = calculateHeadLength(bodyInfo);
// writeMagic
dataOutputStream.writeLong(MAGIC);
@@ -149,32 +167,48 @@ public final class FileIndexFormat {
dataOutputStream.writeInt(Version.V_1.version());
// writeHeadLength
dataOutputStream.writeInt(headLength);
- // writeIndexType
- dataOutputStream.writeUTF(indexType);
// writeColumnSize
dataOutputStream.writeInt(bodyInfo.size());
- // writeColumnInfo, offset = headLength
- for (Map.Entry<String, Pair<Integer, Integer>> entry :
bodyInfo.entrySet()) {
+ for (Map.Entry<String, Map<String, Pair<Integer, Integer>>> entry :
+ bodyInfo.entrySet()) {
+ // writeColumnName
dataOutputStream.writeUTF(entry.getKey());
- dataOutputStream.writeInt(entry.getValue().getLeft() +
headLength);
- dataOutputStream.writeInt(entry.getValue().getRight());
+ // writeIndexTypeSize
+ dataOutputStream.writeInt(entry.getValue().size());
+ // writeColumnInfo, offset = headLength
+ for (Map.Entry<String, Pair<Integer, Integer>> indexEntry :
+ entry.getValue().entrySet()) {
+ dataOutputStream.writeUTF(indexEntry.getKey());
+ dataOutputStream.writeInt(indexEntry.getValue().getLeft()
+ headLength);
+
dataOutputStream.writeInt(indexEntry.getValue().getRight());
+ }
}
// writeRedundantLength
dataOutputStream.writeInt(REDUNDANT_LENGTH);
}
- private int calculateHeadLength(
- String indexType, Map<String, Pair<Integer, Integer>>
bodyInfo) throws IOException {
+ private int calculateHeadLength(Map<String, Map<String, Pair<Integer,
Integer>>> bodyInfo)
+ throws IOException {
// magic 8 bytes, version 4 bytes, head length 4 bytes,
// column size 4 bytes, body info start&end 8 bytes per
- // item, redundant length 4 bytes;
- int baseLength = 8 + 4 + 4 + 4 + bodyInfo.size() * 8 + 4;
+ // column-index, index type size 4 bytes per column, redundant
length 4 bytes;
+ int baseLength =
+ 8
+ + 4
+ + 4
+ + 4
+ +
bodyInfo.values().stream().mapToInt(Map::size).sum() * 8
+ + bodyInfo.size() * 4
+ + 4;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutput dataOutput = new DataOutputStream(baos);
- dataOutput.writeUTF(indexType);
- for (String s : bodyInfo.keySet()) {
- dataOutput.writeUTF(s);
+ for (Map.Entry<String, Map<String, Pair<Integer, Integer>>> entry :
+ bodyInfo.entrySet()) {
+ dataOutput.writeUTF(entry.getKey());
+ for (String s : entry.getValue().keySet()) {
+ dataOutput.writeUTF(s);
+ }
}
return baseLength + baos.size();
@@ -191,9 +225,8 @@ public final class FileIndexFormat {
private final SeekableInputStream seekableInputStream;
// get header and cache it.
- private final Map<String, Pair<Integer, Integer>> header = new
HashMap<>();
+ private final Map<String, Map<String, Pair<Integer, Integer>>> header
= new HashMap<>();
private final Map<String, DataField> fields = new HashMap<>();
- private final String type;
public Reader(SeekableInputStream seekableInputStream, RowType
fileRowType) {
this.seekableInputStream = seekableInputStream;
@@ -221,15 +254,19 @@ public final class FileIndexFormat {
try (DataInputStream dataInput =
new DataInputStream(new ByteArrayInputStream(head))) {
- this.type = dataInput.readUTF();
int columnSize = dataInput.readInt();
for (int i = 0; i < columnSize; i++) {
- this.header.put(
- dataInput.readUTF(),
- Pair.of(dataInput.readInt(),
dataInput.readInt()));
+ String columnName = dataInput.readUTF();
+ int indexSize = dataInput.readInt();
+ Map<String, Pair<Integer, Integer>> indexMap =
+ this.header.computeIfAbsent(columnName, n ->
new HashMap<>());
+ for (int j = 0; j < indexSize; j++) {
+ indexMap.put(
+ dataInput.readUTF(),
+ Pair.of(dataInput.readInt(),
dataInput.readInt()));
+ }
}
}
-
} catch (IOException e) {
IOUtils.closeQuietly(seekableInputStream);
throw new RuntimeException(
@@ -237,13 +274,25 @@ public final class FileIndexFormat {
}
}
- public FileIndexReader readColumnIndex(String columnName) {
+ public List<FileIndexReader> readColumnIndex(String columnName) {
+ return Optional.ofNullable(header.getOrDefault(columnName, null))
+ .map(
+ f ->
+ f.keySet().stream()
+ .map(
+ indexType ->
+
readColumnIndex(columnName, indexType))
+ .collect(Collectors.toList()))
+ .orElse(Collections.emptyList());
+ }
+
+ public FileIndexReader readColumnIndex(String columnName, String
indexType) {
- return readColumnInputStream(columnName)
+ return readColumnInputStream(columnName, indexType)
.map(
serializedBytes ->
FileIndexer.create(
- type,
+ indexType,
fields.get(columnName).type(),
new Options())
.createReader(serializedBytes))
@@ -251,8 +300,9 @@ public final class FileIndexFormat {
}
@VisibleForTesting
- Optional<byte[]> readColumnInputStream(String columnName) {
+ Optional<byte[]> readColumnInputStream(String columnName, String
indexType) {
return Optional.ofNullable(header.getOrDefault(columnName, null))
+ .map(m -> m.getOrDefault(indexType, null))
.map(
startAndLength -> {
byte[] b = new byte[startAndLength.getRight()];
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
index b07c6b8f0..06a691848 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
@@ -116,11 +116,11 @@ public class FileIndexPredicate implements Closeable {
private static class FileIndexFieldPredicate implements
PredicateVisitor<Boolean> {
private final String columnName;
- private final FileIndexReader fileIndexReader;
+ private final List<FileIndexReader> fileIndexReaders;
- public FileIndexFieldPredicate(String columnName, FileIndexReader
fileIndexReader) {
+ public FileIndexFieldPredicate(String columnName,
List<FileIndexReader> fileIndexReaders) {
this.columnName = columnName;
- this.fileIndexReader = fileIndexReader;
+ this.fileIndexReaders = fileIndexReaders;
}
public Boolean test(Predicate predicate) {
@@ -130,13 +130,15 @@ public class FileIndexPredicate implements Closeable {
@Override
public Boolean visit(LeafPredicate predicate) {
if (columnName.equals(predicate.fieldName())) {
- return predicate
- .function()
- .visit(
- fileIndexReader,
- new FieldRef(
- predicate.index(),
predicate.fieldName(), predicate.type()),
- predicate.literals());
+ FieldRef fieldRef =
+ new FieldRef(predicate.index(), predicate.fieldName(),
predicate.type());
+ for (FileIndexReader fileIndexReader : fileIndexReaders) {
+ if (!predicate
+ .function()
+ .visit(fileIndexReader, fieldRef,
predicate.literals())) {
+ return false;
+ }
+ }
}
return true;
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
index 0f157ae99..8aaecb152 100644
---
a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
@@ -43,13 +43,17 @@ public class FileIndexFormatFormatTest {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FileIndexFormat.Writer writer = FileIndexFormat.createWriter(baos);
- String type = randomString(RANDOM.nextInt(100));
- Map<String, byte[]> indexes = new HashMap<>();
- for (int i = 0; i < RANDOM.nextInt(1000); i++) {
- indexes.put(randomString(RANDOM.nextInt(20)),
randomBytes(RANDOM.nextInt(100000)));
+ Map<String, Map<String, byte[]>> indexes = new HashMap<>();
+ for (int j = 0; j < RANDOM.nextInt(1000); j++) {
+ String type = randomString(RANDOM.nextInt(100));
+ Map<String, byte[]> typeIndex = indexes.computeIfAbsent(type, t ->
new HashMap<>());
+ for (int i = 0; i < RANDOM.nextInt(1000); i++) {
+ typeIndex.put(
+ randomString(RANDOM.nextInt(20)),
randomBytes(RANDOM.nextInt(100000)));
+ }
}
- writer.writeColumnIndex(type, indexes);
+ writer.writeColumnIndexes(indexes);
writer.close();
byte[] indexBytes = baos.toByteArray();
@@ -58,9 +62,14 @@ public class FileIndexFormatFormatTest {
FileIndexFormat.createReader(
new ByteArraySeekableStream(indexBytes),
RowType.builder().build());
- for (String s : indexes.keySet()) {
- byte[] b =
reader.readColumnInputStream(s).orElseThrow(RuntimeException::new);
- Assertions.assertThat(b).containsExactly(indexes.get(s));
+ for (Map.Entry<String, Map<String, byte[]>> entry :
indexes.entrySet()) {
+ String column = entry.getKey();
+ for (String type : entry.getValue().keySet()) {
+ byte[] b =
+ reader.readColumnInputStream(column, type)
+ .orElseThrow(RuntimeException::new);
+
Assertions.assertThat(b).containsExactly(indexes.get(column).get(type));
+ }
}
}
}