This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c49750a4 [core] File index format should include all type index and 
columns (#3190)
5c49750a4 is described below

commit 5c49750a48f812ec2e483d00d0b49562cad4fa3f
Author: YeJunHao <[email protected]>
AuthorDate: Wed Apr 10 16:47:23 2024 +0800

    [core] File index format should include all type index and columns (#3190)
---
 .../apache/paimon/fileindex/FileIndexFormat.java   | 126 ++++++++++++++-------
 .../paimon/fileindex/FileIndexPredicate.java       |  22 ++--
 .../fileindex/FileIndexFormatFormatTest.java       |  25 ++--
 3 files changed, 117 insertions(+), 56 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
index a7a3c534e..ec809dff4 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
@@ -35,9 +35,12 @@ import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  * File index file format. Put all column and offset in the header.
@@ -46,13 +49,23 @@ import java.util.Optional;
  * _______________________________________    _____________________
  * |     magic    |version|head length |
  * |-------------------------------------|
- * |   index type        |body info size|
+ * |              column size            |
  * |-------------------------------------|
- * | column name 1 |start pos |length  |
+ * |   column 1        |   index size   |
+ * |-------------------------------------|
+ * |  index name 1 |start pos |length  |
+ * |-------------------------------------|
+ * |  index name 2 |start pos |length  |
+ * |-------------------------------------|
+ * |  index name 3 |start pos |length  |
  * |-------------------------------------|            HEAD
- * | column name 2 |start pos |length  |
+ * |   column 2        |   index size   |
+ * |-------------------------------------|
+ * |  index name 1 |start pos |length  |
  * |-------------------------------------|
- * | column name 3 |start pos |length  |
+ * |  index name 2 |start pos |length  |
+ * |-------------------------------------|
+ * |  index name 3 |start pos |length  |
  * |-------------------------------------|
  * |                 ...                 |
  * |-------------------------------------|
@@ -118,30 +131,35 @@ public final class FileIndexFormat {
             this.dataOutputStream = new DataOutputStream(outputStream);
         }
 
-        public void writeColumnIndex(String indexType, Map<String, byte[]> 
bytesMap)
+        public void writeColumnIndexes(Map<String, Map<String, byte[]>> 
indexes)
                 throws IOException {
 
-            Map<String, Pair<Integer, Integer>> bodyInfo = new HashMap<>();
+            Map<String, Map<String, Pair<Integer, Integer>>> bodyInfo = new 
HashMap<>();
 
             // construct body
             ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
-            for (Map.Entry<String, byte[]> entry : bytesMap.entrySet()) {
-                int startPosition = baos.size();
-                baos.write(entry.getValue());
-                bodyInfo.put(entry.getKey(), Pair.of(startPosition, 
baos.size() - startPosition));
+            for (Map.Entry<String, Map<String, byte[]>> columnMap : 
indexes.entrySet()) {
+                Map<String, Pair<Integer, Integer>> innerMap =
+                        bodyInfo.computeIfAbsent(columnMap.getKey(), k -> new 
HashMap<>());
+                Map<String, byte[]> bytesMap = columnMap.getValue();
+                for (Map.Entry<String, byte[]> entry : bytesMap.entrySet()) {
+                    int startPosition = baos.size();
+                    baos.write(entry.getValue());
+                    innerMap.put(
+                            entry.getKey(), Pair.of(startPosition, baos.size() 
- startPosition));
+                }
             }
             byte[] body = baos.toByteArray();
-
-            writeHead(indexType, bodyInfo);
+            writeHead(bodyInfo);
 
             // writeBody
             dataOutputStream.write(body);
         }
 
-        private void writeHead(String indexType, Map<String, Pair<Integer, 
Integer>> bodyInfo)
+        private void writeHead(Map<String, Map<String, Pair<Integer, 
Integer>>> bodyInfo)
                 throws IOException {
 
-            int headLength = calculateHeadLength(indexType, bodyInfo);
+            int headLength = calculateHeadLength(bodyInfo);
 
             // writeMagic
             dataOutputStream.writeLong(MAGIC);
@@ -149,32 +167,48 @@ public final class FileIndexFormat {
             dataOutputStream.writeInt(Version.V_1.version());
             // writeHeadLength
             dataOutputStream.writeInt(headLength);
-            // writeIndexType
-            dataOutputStream.writeUTF(indexType);
             // writeColumnSize
             dataOutputStream.writeInt(bodyInfo.size());
-            // writeColumnInfo, offset = headLength
-            for (Map.Entry<String, Pair<Integer, Integer>> entry : 
bodyInfo.entrySet()) {
+            for (Map.Entry<String, Map<String, Pair<Integer, Integer>>> entry :
+                    bodyInfo.entrySet()) {
+                // writeColumnName
                 dataOutputStream.writeUTF(entry.getKey());
-                dataOutputStream.writeInt(entry.getValue().getLeft() + 
headLength);
-                dataOutputStream.writeInt(entry.getValue().getRight());
+                // writeIndexTypeSize
+                dataOutputStream.writeInt(entry.getValue().size());
+                // writeColumnInfo, offset = headLength
+                for (Map.Entry<String, Pair<Integer, Integer>> indexEntry :
+                        entry.getValue().entrySet()) {
+                    dataOutputStream.writeUTF(indexEntry.getKey());
+                    dataOutputStream.writeInt(indexEntry.getValue().getLeft() 
+ headLength);
+                    
dataOutputStream.writeInt(indexEntry.getValue().getRight());
+                }
             }
             // writeRedundantLength
             dataOutputStream.writeInt(REDUNDANT_LENGTH);
         }
 
-        private int calculateHeadLength(
-                String indexType, Map<String, Pair<Integer, Integer>> 
bodyInfo) throws IOException {
+        private int calculateHeadLength(Map<String, Map<String, Pair<Integer, 
Integer>>> bodyInfo)
+                throws IOException {
             // magic 8 bytes, version 4 bytes, head length 4 bytes,
             // column size 4 bytes, body info start&end 8 bytes per
-            // item, redundant length 4 bytes;
-            int baseLength = 8 + 4 + 4 + 4 + bodyInfo.size() * 8 + 4;
+            // column-index, index type size 4 bytes per column, redundant 
length 4 bytes;
+            int baseLength =
+                    8
+                            + 4
+                            + 4
+                            + 4
+                            + 
bodyInfo.values().stream().mapToInt(Map::size).sum() * 8
+                            + bodyInfo.size() * 4
+                            + 4;
 
             ByteArrayOutputStream baos = new ByteArrayOutputStream();
             DataOutput dataOutput = new DataOutputStream(baos);
-            dataOutput.writeUTF(indexType);
-            for (String s : bodyInfo.keySet()) {
-                dataOutput.writeUTF(s);
+            for (Map.Entry<String, Map<String, Pair<Integer, Integer>>> entry :
+                    bodyInfo.entrySet()) {
+                dataOutput.writeUTF(entry.getKey());
+                for (String s : entry.getValue().keySet()) {
+                    dataOutput.writeUTF(s);
+                }
             }
 
             return baseLength + baos.size();
@@ -191,9 +225,8 @@ public final class FileIndexFormat {
 
         private final SeekableInputStream seekableInputStream;
         // get header and cache it.
-        private final Map<String, Pair<Integer, Integer>> header = new 
HashMap<>();
+        private final Map<String, Map<String, Pair<Integer, Integer>>> header 
= new HashMap<>();
         private final Map<String, DataField> fields = new HashMap<>();
-        private final String type;
 
         public Reader(SeekableInputStream seekableInputStream, RowType 
fileRowType) {
             this.seekableInputStream = seekableInputStream;
@@ -221,15 +254,19 @@ public final class FileIndexFormat {
 
                 try (DataInputStream dataInput =
                         new DataInputStream(new ByteArrayInputStream(head))) {
-                    this.type = dataInput.readUTF();
                     int columnSize = dataInput.readInt();
                     for (int i = 0; i < columnSize; i++) {
-                        this.header.put(
-                                dataInput.readUTF(),
-                                Pair.of(dataInput.readInt(), 
dataInput.readInt()));
+                        String columnName = dataInput.readUTF();
+                        int indexSize = dataInput.readInt();
+                        Map<String, Pair<Integer, Integer>> indexMap =
+                                this.header.computeIfAbsent(columnName, n -> 
new HashMap<>());
+                        for (int j = 0; j < indexSize; j++) {
+                            indexMap.put(
+                                    dataInput.readUTF(),
+                                    Pair.of(dataInput.readInt(), 
dataInput.readInt()));
+                        }
                     }
                 }
-
             } catch (IOException e) {
                 IOUtils.closeQuietly(seekableInputStream);
                 throw new RuntimeException(
@@ -237,13 +274,25 @@ public final class FileIndexFormat {
             }
         }
 
-        public FileIndexReader readColumnIndex(String columnName) {
+        public List<FileIndexReader> readColumnIndex(String columnName) {
+            return Optional.ofNullable(header.getOrDefault(columnName, null))
+                    .map(
+                            f ->
+                                    f.keySet().stream()
+                                            .map(
+                                                    indexType ->
+                                                            
readColumnIndex(columnName, indexType))
+                                            .collect(Collectors.toList()))
+                    .orElse(Collections.emptyList());
+        }
+
+        public FileIndexReader readColumnIndex(String columnName, String 
indexType) {
 
-            return readColumnInputStream(columnName)
+            return readColumnInputStream(columnName, indexType)
                     .map(
                             serializedBytes ->
                                     FileIndexer.create(
-                                                    type,
+                                                    indexType,
                                                     
fields.get(columnName).type(),
                                                     new Options())
                                             .createReader(serializedBytes))
@@ -251,8 +300,9 @@ public final class FileIndexFormat {
         }
 
         @VisibleForTesting
-        Optional<byte[]> readColumnInputStream(String columnName) {
+        Optional<byte[]> readColumnInputStream(String columnName, String 
indexType) {
             return Optional.ofNullable(header.getOrDefault(columnName, null))
+                    .map(m -> m.getOrDefault(indexType, null))
                     .map(
                             startAndLength -> {
                                 byte[] b = new byte[startAndLength.getRight()];
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
index b07c6b8f0..06a691848 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
@@ -116,11 +116,11 @@ public class FileIndexPredicate implements Closeable {
     private static class FileIndexFieldPredicate implements 
PredicateVisitor<Boolean> {
 
         private final String columnName;
-        private final FileIndexReader fileIndexReader;
+        private final List<FileIndexReader> fileIndexReaders;
 
-        public FileIndexFieldPredicate(String columnName, FileIndexReader 
fileIndexReader) {
+        public FileIndexFieldPredicate(String columnName, 
List<FileIndexReader> fileIndexReaders) {
             this.columnName = columnName;
-            this.fileIndexReader = fileIndexReader;
+            this.fileIndexReaders = fileIndexReaders;
         }
 
         public Boolean test(Predicate predicate) {
@@ -130,13 +130,15 @@ public class FileIndexPredicate implements Closeable {
         @Override
         public Boolean visit(LeafPredicate predicate) {
             if (columnName.equals(predicate.fieldName())) {
-                return predicate
-                        .function()
-                        .visit(
-                                fileIndexReader,
-                                new FieldRef(
-                                        predicate.index(), 
predicate.fieldName(), predicate.type()),
-                                predicate.literals());
+                FieldRef fieldRef =
+                        new FieldRef(predicate.index(), predicate.fieldName(), 
predicate.type());
+                for (FileIndexReader fileIndexReader : fileIndexReaders) {
+                    if (!predicate
+                            .function()
+                            .visit(fileIndexReader, fieldRef, 
predicate.literals())) {
+                        return false;
+                    }
+                }
             }
             return true;
         }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
index 0f157ae99..8aaecb152 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
@@ -43,13 +43,17 @@ public class FileIndexFormatFormatTest {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         FileIndexFormat.Writer writer = FileIndexFormat.createWriter(baos);
 
-        String type = randomString(RANDOM.nextInt(100));
-        Map<String, byte[]> indexes = new HashMap<>();
-        for (int i = 0; i < RANDOM.nextInt(1000); i++) {
-            indexes.put(randomString(RANDOM.nextInt(20)), 
randomBytes(RANDOM.nextInt(100000)));
+        Map<String, Map<String, byte[]>> indexes = new HashMap<>();
+        for (int j = 0; j < RANDOM.nextInt(1000); j++) {
+            String type = randomString(RANDOM.nextInt(100));
+            Map<String, byte[]> typeIndex = indexes.computeIfAbsent(type, t -> 
new HashMap<>());
+            for (int i = 0; i < RANDOM.nextInt(1000); i++) {
+                typeIndex.put(
+                        randomString(RANDOM.nextInt(20)), 
randomBytes(RANDOM.nextInt(100000)));
+            }
         }
 
-        writer.writeColumnIndex(type, indexes);
+        writer.writeColumnIndexes(indexes);
         writer.close();
 
         byte[] indexBytes = baos.toByteArray();
@@ -58,9 +62,14 @@ public class FileIndexFormatFormatTest {
                 FileIndexFormat.createReader(
                         new ByteArraySeekableStream(indexBytes), 
RowType.builder().build());
 
-        for (String s : indexes.keySet()) {
-            byte[] b = 
reader.readColumnInputStream(s).orElseThrow(RuntimeException::new);
-            Assertions.assertThat(b).containsExactly(indexes.get(s));
+        for (Map.Entry<String, Map<String, byte[]>> entry : 
indexes.entrySet()) {
+            String column = entry.getKey();
+            for (String type : entry.getValue().keySet()) {
+                byte[] b =
+                        reader.readColumnInputStream(column, type)
+                                .orElseThrow(RuntimeException::new);
+                
Assertions.assertThat(b).containsExactly(indexes.get(column).get(type));
+            }
         }
     }
 }

Reply via email to