This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f70f9d02b [core] Introduce File index format for data skipping (#3068)
f70f9d02b is described below

commit f70f9d02b50cfe4438c7b67da1fecac078879ce6
Author: YeJunHao <[email protected]>
AuthorDate: Wed Mar 27 16:42:19 2024 +0800

    [core] Introduce File index format for data skipping (#3068)
---
 .../apache/paimon/fileindex/FileIndexFormat.java   | 280 +++++++++++++++++++++
 .../paimon/fileindex/FileIndexPredicate.java       | 165 ++++++++++++
 .../apache/paimon/fileindex/FileIndexReader.java   | 108 ++++++++
 .../apache/paimon/fileindex/FileIndexWriter.java   |  27 ++
 .../org/apache/paimon/fileindex/FileIndexer.java   |  36 +++
 .../apache/paimon/fs/ByteArraySeekableStream.java  | 105 ++++++++
 .../fileindex/FileIndexFormatFormatTest.java       |  66 +++++
 .../paimon/fs/ByteArraySeekableStreamTest.java     |  72 ++++++
 .../java/org/apache/paimon/utils/RandomUtil.java   |  43 ++++
 .../apache/paimon/sort/zorder/ZIndexerTest.java    |  12 +-
 .../apache/paimon/utils/TestZOrderByteUtil.java    |  28 +--
 11 files changed, 911 insertions(+), 31 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
new file mode 100644
index 000000000..c9b827ee7
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.Pair;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * File index file format. Put all column and offset in the header.
+ *
+ * <pre>
+ * _______________________________________    _____________________
+ * |     magic    |version|head length |
+ * |-------------------------------------|
+ * |   index type        |body info size|
+ * |-------------------------------------|
+ * | column name 1 |start pos |length  |
+ * |-------------------------------------|            HEAD
+ * | column name 2 |start pos |length  |
+ * |-------------------------------------|
+ * | column name 3 |start pos |length  |
+ * |-------------------------------------|
+ * |                 ...                 |
+ * |-------------------------------------|
+ * |                 ...                 |
+ * |-------------------------------------|
+ * |  redundant length |redundant bytes |
+ * |-------------------------------------|    ---------------------
+ * |                BODY                 |
+ * |                BODY                 |
+ * |                BODY                 |             BODY
+ * |                BODY                 |
+ * |_____________________________________|    _____________________
+ *
+ * magic:                            8 bytes long
+ * version:                          4 bytes int
+ * head length:                      4 bytes int
+ * index type:                       var bytes utf (length + bytes)
+ * body info size:                   4 bytes int (how many column items below)
+ * column name:                      var bytes utf
+ * start pos:                        4 bytes int
+ * length:                           4 bytes int
+ * redundant length:                 4 bytes int (for compatibility with later 
versions, in this version, content is zero)
+ * redundant bytes:                  var bytes (for compatibility with later 
version, in this version, is empty)
+ * BODY:                             column bytes + column bytes + column 
bytes + .......
+ *
+ * </pre>
+ */
+public final class FileIndexFormat {
+
+    private static final long MAGIC = 1493475289347502L;
+
+    enum Version {
+        V_1(1);
+
+        private final int version;
+
+        Version(int version) {
+            this.version = version;
+        }
+
+        public int version() {
+            return version;
+        }
+    }
+
+    public static Writer createWriter(OutputStream outputStream) {
+        return new Writer(outputStream);
+    }
+
+    public static Reader createReader(SeekableInputStream inputStream, RowType 
fileRowType) {
+        return new Reader(inputStream, fileRowType);
+    }
+
+    /** Writer for file index file. */
+    public static class Writer implements Closeable {
+
+        private final DataOutputStream dataOutputStream;
+
+        // for version compatible
+        private static final int REDUNDANT_LENGTH = 0;
+
+        public Writer(OutputStream outputStream) {
+            this.dataOutputStream = new DataOutputStream(outputStream);
+        }
+
+        public void writeColumnIndex(String indexType, Map<String, byte[]> 
bytesMap)
+                throws IOException {
+
+            Map<String, Pair<Integer, Integer>> bodyInfo = new HashMap<>();
+
+            // construct body
+            ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
+            for (Map.Entry<String, byte[]> entry : bytesMap.entrySet()) {
+                int startPosition = baos.size();
+                baos.write(entry.getValue());
+                bodyInfo.put(entry.getKey(), Pair.of(startPosition, 
baos.size() - startPosition));
+            }
+            byte[] body = baos.toByteArray();
+
+            writeHead(indexType, bodyInfo);
+
+            // writeBody
+            dataOutputStream.write(body);
+        }
+
+        private void writeHead(String indexType, Map<String, Pair<Integer, 
Integer>> bodyInfo)
+                throws IOException {
+
+            int headLength = calculateHeadLength(indexType, bodyInfo);
+
+            // writeMagic
+            dataOutputStream.writeLong(MAGIC);
+            // writeVersion
+            dataOutputStream.writeInt(Version.V_1.version());
+            // writeHeadLength
+            dataOutputStream.writeInt(headLength);
+            // writeIndexType
+            dataOutputStream.writeUTF(indexType);
+            // writeColumnSize
+            dataOutputStream.writeInt(bodyInfo.size());
+            // writeColumnInfo, offset = headLength
+            for (Map.Entry<String, Pair<Integer, Integer>> entry : 
bodyInfo.entrySet()) {
+                dataOutputStream.writeUTF(entry.getKey());
+                dataOutputStream.writeInt(entry.getValue().getLeft() + 
headLength);
+                dataOutputStream.writeInt(entry.getValue().getRight());
+            }
+            // writeRedundantLength
+            dataOutputStream.writeInt(REDUNDANT_LENGTH);
+        }
+
+        private int calculateHeadLength(
+                String indexType, Map<String, Pair<Integer, Integer>> 
bodyInfo) throws IOException {
+            // magic 8 bytes, version 4 bytes, head length 4 bytes,
+            // column size 4 bytes, body info start&end 8 bytes per
+            // item, redundant length 4 bytes;
+            int baseLength = 8 + 4 + 4 + 4 + bodyInfo.size() * 8 + 4;
+
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutput dataOutput = new DataOutputStream(baos);
+            dataOutput.writeUTF(indexType);
+            for (String s : bodyInfo.keySet()) {
+                dataOutput.writeUTF(s);
+            }
+
+            return baseLength + baos.size();
+        }
+
+        @Override
+        public void close() throws IOException {
+            IOUtils.closeQuietly(dataOutputStream);
+        }
+    }
+
+    /** Reader for file index file. */
+    public static class Reader implements Closeable {
+
+        private final SeekableInputStream seekableInputStream;
+        // get header and cache it.
+        private final Map<String, Pair<Integer, Integer>> header = new 
HashMap<>();
+        private final Map<String, DataField> fields = new HashMap<>();
+        private final String type;
+
+        public Reader(SeekableInputStream seekableInputStream, RowType 
fileRowType) {
+            this.seekableInputStream = seekableInputStream;
+            DataInputStream dataInputStream = new 
DataInputStream(seekableInputStream);
+            fileRowType.getFields().forEach(field -> 
this.fields.put(field.name(), field));
+            try {
+                long magic = dataInputStream.readLong();
+                if (magic != MAGIC) {
+                    throw new RuntimeException("This file is not file index 
file.");
+                }
+
+                int version = dataInputStream.readInt();
+                if (version != Version.V_1.version()) {
+                    throw new RuntimeException(
+                            "This index file is version of "
+                                    + version
+                                    + ", not in supported version list ["
+                                    + Version.V_1.version()
+                                    + "]");
+                }
+
+                int headLength = dataInputStream.readInt();
+                byte[] head = new byte[headLength - 8 - 4 - 4];
+                dataInputStream.readFully(head);
+
+                try (DataInputStream dataInput =
+                        new DataInputStream(new ByteArrayInputStream(head))) {
+                    this.type = dataInput.readUTF();
+                    int columnSize = dataInput.readInt();
+                    for (int i = 0; i < columnSize; i++) {
+                        this.header.put(
+                                dataInput.readUTF(),
+                                Pair.of(dataInput.readInt(), 
dataInput.readInt()));
+                    }
+                }
+
+            } catch (IOException e) {
+                IOUtils.closeQuietly(seekableInputStream);
+                throw new RuntimeException(
+                        "Exception happens while construct file index 
reader.", e);
+            }
+        }
+
+        public FileIndexReader readColumnIndex(String columnName) {
+
+            return readColumnInputStream(columnName)
+                    .map(
+                            serializedBytes ->
+                                    FileIndexer.create(type, 
fields.get(columnName).type())
+                                            .createReader()
+                                            .recoverFrom(serializedBytes))
+                    .orElse(null);
+        }
+
+        @VisibleForTesting
+        Optional<byte[]> readColumnInputStream(String columnName) {
+            return Optional.ofNullable(header.getOrDefault(columnName, null))
+                    .map(
+                            startAndLength -> {
+                                byte[] b = new byte[startAndLength.getRight()];
+                                try {
+                                    
seekableInputStream.seek(startAndLength.getLeft());
+                                    int n = 0;
+                                    int len = b.length;
+                                    // read fully until b is full else throw.
+                                    while (n < len) {
+                                        int count = 
seekableInputStream.read(b, n, len - n);
+                                        if (count < 0) {
+                                            throw new EOFException();
+                                        }
+                                        n += count;
+                                    }
+                                } catch (IOException e) {
+                                    throw new RuntimeException(e);
+                                }
+                                return b;
+                            });
+        }
+
+        @Override
+        public void close() throws IOException {
+            IOUtils.closeQuietly(seekableInputStream);
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
new file mode 100644
index 000000000..b07c6b8f0
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.fs.ByteArraySeekableStream;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.Or;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateVisitor;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Utils to check secondary index (e.g. bloom filter) predicate. */
+public class FileIndexPredicate implements Closeable {
+
+    private final FileIndexFormat.Reader reader;
+    private final Map<String, FileIndexFieldPredicate> fieldPredicates = new 
HashMap<>();
+
+    public FileIndexPredicate(Path path, FileIO fileIO, RowType fileRowType) 
throws IOException {
+        this(fileIO.newInputStream(path), fileRowType);
+    }
+
+    public FileIndexPredicate(byte[] serializedBytes, RowType fileRowType) {
+        this(new ByteArraySeekableStream(serializedBytes), fileRowType);
+    }
+
+    public FileIndexPredicate(SeekableInputStream inputStream, RowType 
fileRowType) {
+        this.reader = FileIndexFormat.createReader(inputStream, fileRowType);
+    }
+
+    public boolean testPredicate(@Nullable Predicate filePredicate) {
+        if (filePredicate == null) {
+            return true;
+        }
+
+        Set<String> requredFieldNames = getRequiredNames(filePredicate);
+
+        List<FileIndexFieldPredicate> testWorkers =
+                requredFieldNames.stream()
+                        .map(
+                                cname ->
+                                        fieldPredicates.computeIfAbsent(
+                                                cname,
+                                                k ->
+                                                        new 
FileIndexFieldPredicate(
+                                                                cname,
+                                                                
reader.readColumnIndex(cname))))
+                        .collect(Collectors.toList());
+
+        for (FileIndexFieldPredicate testWorker : testWorkers) {
+            if (!testWorker.test(filePredicate)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private Set<String> getRequiredNames(Predicate filePredicate) {
+        return filePredicate.visit(
+                new PredicateVisitor<Set<String>>() {
+                    final Set<String> names = new HashSet<>();
+
+                    @Override
+                    public Set<String> visit(LeafPredicate predicate) {
+                        names.add(predicate.fieldName());
+                        return names;
+                    }
+
+                    @Override
+                    public Set<String> visit(CompoundPredicate predicate) {
+                        for (Predicate child : predicate.children()) {
+                            child.visit(this);
+                        }
+                        return names;
+                    }
+                });
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.reader.close();
+    }
+
+    /** Predicate test worker. */
+    private static class FileIndexFieldPredicate implements 
PredicateVisitor<Boolean> {
+
+        private final String columnName;
+        private final FileIndexReader fileIndexReader;
+
+        public FileIndexFieldPredicate(String columnName, FileIndexReader 
fileIndexReader) {
+            this.columnName = columnName;
+            this.fileIndexReader = fileIndexReader;
+        }
+
+        public Boolean test(Predicate predicate) {
+            return predicate.visit(this);
+        }
+
+        @Override
+        public Boolean visit(LeafPredicate predicate) {
+            if (columnName.equals(predicate.fieldName())) {
+                return predicate
+                        .function()
+                        .visit(
+                                fileIndexReader,
+                                new FieldRef(
+                                        predicate.index(), 
predicate.fieldName(), predicate.type()),
+                                predicate.literals());
+            }
+            return true;
+        }
+
+        @Override
+        public Boolean visit(CompoundPredicate predicate) {
+
+            if (predicate.function() instanceof Or) {
+                for (Predicate predicate1 : predicate.children()) {
+                    if (predicate1.visit(this)) {
+                        return true;
+                    }
+                }
+                return false;
+
+            } else {
+                for (Predicate predicate1 : predicate.children()) {
+                    if (!predicate1.visit(this)) {
+                        return false;
+                    }
+                }
+                return true;
+            }
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
new file mode 100644
index 000000000..6d9404564
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.FunctionVisitor;
+
+import java.util.List;
+
+/**
+ * Read file index from serialized bytes. Return true, means we need to search 
this file, else means
+ * needn't.
+ */
+public interface FileIndexReader extends FunctionVisitor<Boolean> {
+
+    FileIndexReader recoverFrom(byte[] serializedBytes);
+
+    @Override
+    default Boolean visitIsNotNull(FieldRef fieldRef) {
+        return true;
+    }
+
+    @Override
+    default Boolean visitIsNull(FieldRef fieldRef) {
+        return true;
+    }
+
+    @Override
+    default Boolean visitStartsWith(FieldRef fieldRef, Object literal) {
+        return true;
+    }
+
+    @Override
+    default Boolean visitLessThan(FieldRef fieldRef, Object literal) {
+        return true;
+    }
+
+    @Override
+    default Boolean visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
+        return true;
+    }
+
+    @Override
+    default Boolean visitNotEqual(FieldRef fieldRef, Object literal) {
+        return true;
+    }
+
+    @Override
+    default Boolean visitLessOrEqual(FieldRef fieldRef, Object literal) {
+        return true;
+    }
+
+    @Override
+    default Boolean visitEqual(FieldRef fieldRef, Object literal) {
+        return true;
+    }
+
+    @Override
+    default Boolean visitGreaterThan(FieldRef fieldRef, Object literal) {
+        return true;
+    }
+
+    @Override
+    default Boolean visitIn(FieldRef fieldRef, List<Object> literals) {
+        for (Object key : literals) {
+            if (visitEqual(fieldRef, key)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    default Boolean visitNotIn(FieldRef fieldRef, List<Object> literals) {
+        for (Object key : literals) {
+            if (visitNotEqual(fieldRef, key)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    default Boolean visitAnd(List<Boolean> children) {
+        throw new UnsupportedOperationException("Should not invoke this");
+    }
+
+    @Override
+    default Boolean visitOr(List<Boolean> children) {
+        throw new UnsupportedOperationException("Should not invoke this");
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
new file mode 100644
index 000000000..9eab19cde
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+/** To write file index. */
+public interface FileIndexWriter {
+
+    void write(Object key);
+
+    byte[] serializedBytes();
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java
new file mode 100644
index 000000000..e7e3d40bf
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.types.DataType;
+
+/** File index interface. To build a file index. */
+public interface FileIndexer {
+
+    FileIndexWriter createWriter();
+
+    FileIndexReader createReader();
+
+    static FileIndexer create(String type, DataType dataType) {
+        switch (type) {
+            default:
+                throw new RuntimeException("Doesn't support filter type: " + 
type);
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/ByteArraySeekableStream.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/ByteArraySeekableStream.java
new file mode 100644
index 000000000..d6536927b
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/ByteArraySeekableStream.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+/** Wrap byte buf to a seekable input stream. */
+public class ByteArraySeekableStream extends SeekableInputStream {
+
+    private final ByteArrayStream byteArrayStream;
+
+    public ByteArraySeekableStream(byte[] buf) {
+        this.byteArrayStream = new ByteArrayStream(buf);
+    }
+
+    @Override
+    public void seek(long desired) throws IOException {
+        byteArrayStream.seek((int) desired);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return byteArrayStream.getPos();
+    }
+
+    @Override
+    public int read() throws IOException {
+        return byteArrayStream.read();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        return byteArrayStream.read(b, off, len);
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        return byteArrayStream.read(b);
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        return byteArrayStream.skip(n);
+    }
+
+    @Override
+    public int available() throws IOException {
+        return byteArrayStream.available();
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+        byteArrayStream.mark(readlimit);
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+        byteArrayStream.reset();
+    }
+
+    @Override
+    public boolean markSupported() {
+        return byteArrayStream.markSupported();
+    }
+
+    @Override
+    public void close() throws IOException {
+        byteArrayStream.close();
+    }
+
+    private static class ByteArrayStream extends ByteArrayInputStream {
+        public ByteArrayStream(byte[] buf) {
+            super(buf);
+        }
+
+        public void seek(int position) throws IOException {
+            if (position >= count) {
+                throw new EOFException("Can't seek position: " + position + ", 
length is " + count);
+            }
+            pos = position;
+        }
+
+        public long getPos() {
+            return pos;
+        }
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
new file mode 100644
index 000000000..0f157ae99
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.fs.ByteArraySeekableStream;
+import org.apache.paimon.types.RowType;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.paimon.utils.RandomUtil.randomBytes;
+import static org.apache.paimon.utils.RandomUtil.randomString;
+
+/** Test for {@link FileIndexFormat}. */
+public class FileIndexFormatFormatTest {
+
+    private static final Random RANDOM = new Random();
+
+    @Test
+    public void testWriteRead() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        FileIndexFormat.Writer writer = FileIndexFormat.createWriter(baos);
+
+        String type = randomString(RANDOM.nextInt(100));
+        Map<String, byte[]> indexes = new HashMap<>();
+        for (int i = 0; i < RANDOM.nextInt(1000); i++) {
+            indexes.put(randomString(RANDOM.nextInt(20)), 
randomBytes(RANDOM.nextInt(100000)));
+        }
+
+        writer.writeColumnIndex(type, indexes);
+        writer.close();
+
+        byte[] indexBytes = baos.toByteArray();
+
+        FileIndexFormat.Reader reader =
+                FileIndexFormat.createReader(
+                        new ByteArraySeekableStream(indexBytes), 
RowType.builder().build());
+
+        for (String s : indexes.keySet()) {
+            byte[] b = 
reader.readColumnInputStream(s).orElseThrow(RuntimeException::new);
+            Assertions.assertThat(b).containsExactly(indexes.get(s));
+        }
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/ByteArraySeekableStreamTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fs/ByteArraySeekableStreamTest.java
new file mode 100644
index 000000000..e725e2a3d
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fs/ByteArraySeekableStreamTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.apache.paimon.utils.RandomUtil.randomBytes;
+
+/** Test for {@link ByteArraySeekableStream}. */
+public class ByteArraySeekableStreamTest {
+
+    private static final Random RANDOM = new Random();
+
+    @Test
+    public void testBasic() throws IOException {
+        int bl = 100000;
+        byte[] b = randomBytes(bl);
+        ByteArraySeekableStream byteArraySeekableStream = new 
ByteArraySeekableStream(b);
+
+        
Assertions.assertThat(byteArraySeekableStream.available()).isEqualTo(b.length);
+
+        for (int i = 0; i < RANDOM.nextInt(1000); i++) {
+            int position = RANDOM.nextInt(bl);
+            int length = RANDOM.nextInt(b.length - position - 1);
+            byte[] expected = new byte[length];
+            System.arraycopy(b, position, expected, 0, length);
+
+            byte[] actual = new byte[length];
+            byteArraySeekableStream.seek(position);
+            byteArraySeekableStream.read(actual);
+            Assertions.assertThat(actual).containsExactly(expected);
+        }
+
+        for (int i = 0; i < RANDOM.nextInt(1000); i++) {
+            int position = RANDOM.nextInt(bl);
+            byteArraySeekableStream.seek(position);
+            for (int j = 0; j < 100; j++) {
+                Assertions.assertThat(b[position + j])
+                        .isEqualTo((byte) byteArraySeekableStream.read());
+            }
+        }
+    }
+
+    @Test
+    public void testThrow() {
+        int bl = 10;
+        byte[] b = randomBytes(bl);
+        ByteArraySeekableStream byteArraySeekableStream = new 
ByteArraySeekableStream(b);
+        Assertions.assertThatCode(() -> byteArraySeekableStream.seek(10))
+                .hasMessage("Can't seek position: 10, length is 10");
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/RandomUtil.java 
b/paimon-common/src/test/java/org/apache/paimon/utils/RandomUtil.java
new file mode 100644
index 000000000..e6d6805b2
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/RandomUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.Random;
+
+/** Utils for tests. */
+public class RandomUtil {
+
+    private static final Random RANDOM = new Random();
+
+    public static byte[] randomBytes(int length) {
+        byte[] b = new byte[length];
+        RANDOM.nextBytes(b);
+        return b;
+    }
+
+    public static String randomString(int length) {
+        byte[] buffer = new byte[length];
+
+        for (int i = 0; i < length; i += 1) {
+            buffer[i] = (byte) ('a' + RANDOM.nextInt(26));
+        }
+
+        return new String(buffer);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
index 801a1927d..c94327817 100644
--- a/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
@@ -34,6 +34,8 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Random;
 
+import static org.apache.paimon.utils.RandomUtil.randomString;
+
 /** Tests for {@link ZIndexer}. */
 public class ZIndexerTest {
 
@@ -135,14 +137,4 @@ public class ZIndexerTest {
             }
         }
     }
-
-    public static String randomString(int length) {
-        byte[] buffer = new byte[length];
-
-        for (int i = 0; i < length; i += 1) {
-            buffer[i] = (byte) ('a' + RANDOM.nextInt(26));
-        }
-
-        return new String(buffer);
-    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
index dded0cf7e..92786b701 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
@@ -26,6 +26,9 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Random;
 
+import static org.apache.paimon.utils.RandomUtil.randomBytes;
+import static org.apache.paimon.utils.RandomUtil.randomString;
+
 /* This file is based on source code from the Iceberg Project 
(http://iceberg.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
  * additional information regarding copyright ownership. */
@@ -352,8 +355,8 @@ public class TestZOrderByteUtil {
         ByteBuffer aBuffer = ByteBuffer.allocate(128);
         ByteBuffer bBuffer = ByteBuffer.allocate(128);
         for (int i = 0; i < NUM_TESTS; i++) {
-            String aString = randomString();
-            String bString = randomString();
+            String aString = randomString(50);
+            String bString = randomString(50);
             int stringCompare = Integer.signum(aString.compareTo(bString));
             byte[] aBytes = ZOrderByteUtils.stringToOrderedBytes(aString, 128, 
aBuffer).array();
             byte[] bBytes = ZOrderByteUtils.stringToOrderedBytes(bString, 128, 
bBuffer).array();
@@ -380,8 +383,8 @@ public class TestZOrderByteUtil {
         ByteBuffer aBuffer = ByteBuffer.allocate(128);
         ByteBuffer bBuffer = ByteBuffer.allocate(128);
         for (int i = 0; i < NUM_TESTS; i++) {
-            byte[] aBytesRaw = randomBytes();
-            byte[] bBytesRaw = randomBytes();
+            byte[] aBytesRaw = randomBytes(50);
+            byte[] bBytesRaw = randomBytes(50);
             int stringCompare =
                     Integer.signum(
                             UnsignedBytes.lexicographicalComparator()
@@ -405,21 +408,4 @@ public class TestZOrderByteUtil {
                     byteCompare);
         }
     }
-
-    private byte[] randomBytes() {
-        byte[] binary = new byte[random.nextInt(50)];
-        random.nextBytes(binary);
-        return binary;
-    }
-
-    private String randomString() {
-        int length = random.nextInt(50);
-        byte[] buffer = new byte[length];
-
-        for (int i = 0; i < length; i += 1) {
-            buffer[i] = (byte) ('a' + random.nextInt(26));
-        }
-
-        return new String(buffer);
-    }
 }


Reply via email to