This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f70f9d02b [core] Introduce File index format for data skipping (#3068)
f70f9d02b is described below
commit f70f9d02b50cfe4438c7b67da1fecac078879ce6
Author: YeJunHao <[email protected]>
AuthorDate: Wed Mar 27 16:42:19 2024 +0800
[core] Introduce File index format for data skipping (#3068)
---
.../apache/paimon/fileindex/FileIndexFormat.java | 280 +++++++++++++++++++++
.../paimon/fileindex/FileIndexPredicate.java | 165 ++++++++++++
.../apache/paimon/fileindex/FileIndexReader.java | 108 ++++++++
.../apache/paimon/fileindex/FileIndexWriter.java | 27 ++
.../org/apache/paimon/fileindex/FileIndexer.java | 36 +++
.../apache/paimon/fs/ByteArraySeekableStream.java | 105 ++++++++
.../fileindex/FileIndexFormatFormatTest.java | 66 +++++
.../paimon/fs/ByteArraySeekableStreamTest.java | 72 ++++++
.../java/org/apache/paimon/utils/RandomUtil.java | 43 ++++
.../apache/paimon/sort/zorder/ZIndexerTest.java | 12 +-
.../apache/paimon/utils/TestZOrderByteUtil.java | 28 +--
11 files changed, 911 insertions(+), 31 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
new file mode 100644
index 000000000..c9b827ee7
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.Pair;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * File index file format. Put all column and offset in the header.
+ *
+ * <pre>
+ * _______________________________________ _____________________
+ * | magic |version|head length |
+ * |-------------------------------------|
+ * | index type |body info size|
+ * |-------------------------------------|
+ * | column name 1 |start pos |length |
+ * |-------------------------------------| HEAD
+ * | column name 2 |start pos |length |
+ * |-------------------------------------|
+ * | column name 3 |start pos |length |
+ * |-------------------------------------|
+ * | ... |
+ * |-------------------------------------|
+ * | ... |
+ * |-------------------------------------|
+ * | redundant length |redundant bytes |
+ * |-------------------------------------| ---------------------
+ * | BODY |
+ * | BODY |
+ * | BODY | BODY
+ * | BODY |
+ * |_____________________________________| _____________________
+ *
+ * magic: 8 bytes long
+ * version: 4 bytes int
+ * head length: 4 bytes int
+ * index type: var bytes utf (length + bytes)
+ * body info size: 4 bytes int (how many column items below)
+ * column name: var bytes utf
+ * start pos: 4 bytes int
+ * length: 4 bytes int
+ * redundant length: 4 bytes int (for compatibility with later
versions, in this version, content is zero)
+ * redundant bytes: var bytes (for compatibility with later
version, in this version, is empty)
+ * BODY: column bytes + column bytes + column
bytes + .......
+ *
+ * </pre>
+ */
+public final class FileIndexFormat {
+
+ private static final long MAGIC = 1493475289347502L;
+
+ enum Version {
+ V_1(1);
+
+ private final int version;
+
+ Version(int version) {
+ this.version = version;
+ }
+
+ public int version() {
+ return version;
+ }
+ }
+
+ public static Writer createWriter(OutputStream outputStream) {
+ return new Writer(outputStream);
+ }
+
+ public static Reader createReader(SeekableInputStream inputStream, RowType
fileRowType) {
+ return new Reader(inputStream, fileRowType);
+ }
+
+ /** Writer for file index file. */
+ public static class Writer implements Closeable {
+
+ private final DataOutputStream dataOutputStream;
+
+ // for version compatible
+ private static final int REDUNDANT_LENGTH = 0;
+
+ public Writer(OutputStream outputStream) {
+ this.dataOutputStream = new DataOutputStream(outputStream);
+ }
+
+ public void writeColumnIndex(String indexType, Map<String, byte[]>
bytesMap)
+ throws IOException {
+
+ Map<String, Pair<Integer, Integer>> bodyInfo = new HashMap<>();
+
+ // construct body
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
+ for (Map.Entry<String, byte[]> entry : bytesMap.entrySet()) {
+ int startPosition = baos.size();
+ baos.write(entry.getValue());
+ bodyInfo.put(entry.getKey(), Pair.of(startPosition,
baos.size() - startPosition));
+ }
+ byte[] body = baos.toByteArray();
+
+ writeHead(indexType, bodyInfo);
+
+ // writeBody
+ dataOutputStream.write(body);
+ }
+
+ private void writeHead(String indexType, Map<String, Pair<Integer,
Integer>> bodyInfo)
+ throws IOException {
+
+ int headLength = calculateHeadLength(indexType, bodyInfo);
+
+ // writeMagic
+ dataOutputStream.writeLong(MAGIC);
+ // writeVersion
+ dataOutputStream.writeInt(Version.V_1.version());
+ // writeHeadLength
+ dataOutputStream.writeInt(headLength);
+ // writeIndexType
+ dataOutputStream.writeUTF(indexType);
+ // writeColumnSize
+ dataOutputStream.writeInt(bodyInfo.size());
+ // writeColumnInfo, offset = headLength
+ for (Map.Entry<String, Pair<Integer, Integer>> entry :
bodyInfo.entrySet()) {
+ dataOutputStream.writeUTF(entry.getKey());
+ dataOutputStream.writeInt(entry.getValue().getLeft() +
headLength);
+ dataOutputStream.writeInt(entry.getValue().getRight());
+ }
+ // writeRedundantLength
+ dataOutputStream.writeInt(REDUNDANT_LENGTH);
+ }
+
+ private int calculateHeadLength(
+ String indexType, Map<String, Pair<Integer, Integer>>
bodyInfo) throws IOException {
+ // magic 8 bytes, version 4 bytes, head length 4 bytes,
+ // column size 4 bytes, body info start&end 8 bytes per
+ // item, redundant length 4 bytes;
+ int baseLength = 8 + 4 + 4 + 4 + bodyInfo.size() * 8 + 4;
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutput dataOutput = new DataOutputStream(baos);
+ dataOutput.writeUTF(indexType);
+ for (String s : bodyInfo.keySet()) {
+ dataOutput.writeUTF(s);
+ }
+
+ return baseLength + baos.size();
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.closeQuietly(dataOutputStream);
+ }
+ }
+
+ /** Reader for file index file. */
+ public static class Reader implements Closeable {
+
+ private final SeekableInputStream seekableInputStream;
+ // get header and cache it.
+ private final Map<String, Pair<Integer, Integer>> header = new
HashMap<>();
+ private final Map<String, DataField> fields = new HashMap<>();
+ private final String type;
+
+ public Reader(SeekableInputStream seekableInputStream, RowType
fileRowType) {
+ this.seekableInputStream = seekableInputStream;
+ DataInputStream dataInputStream = new
DataInputStream(seekableInputStream);
+ fileRowType.getFields().forEach(field ->
this.fields.put(field.name(), field));
+ try {
+ long magic = dataInputStream.readLong();
+ if (magic != MAGIC) {
+ throw new RuntimeException("This file is not file index
file.");
+ }
+
+ int version = dataInputStream.readInt();
+ if (version != Version.V_1.version()) {
+ throw new RuntimeException(
+ "This index file is version of "
+ + version
+ + ", not in supported version list ["
+ + Version.V_1.version()
+ + "]");
+ }
+
+ int headLength = dataInputStream.readInt();
+ byte[] head = new byte[headLength - 8 - 4 - 4];
+ dataInputStream.readFully(head);
+
+ try (DataInputStream dataInput =
+ new DataInputStream(new ByteArrayInputStream(head))) {
+ this.type = dataInput.readUTF();
+ int columnSize = dataInput.readInt();
+ for (int i = 0; i < columnSize; i++) {
+ this.header.put(
+ dataInput.readUTF(),
+ Pair.of(dataInput.readInt(),
dataInput.readInt()));
+ }
+ }
+
+ } catch (IOException e) {
+ IOUtils.closeQuietly(seekableInputStream);
+ throw new RuntimeException(
+ "Exception happens while construct file index
reader.", e);
+ }
+ }
+
+ public FileIndexReader readColumnIndex(String columnName) {
+
+ return readColumnInputStream(columnName)
+ .map(
+ serializedBytes ->
+ FileIndexer.create(type,
fields.get(columnName).type())
+ .createReader()
+ .recoverFrom(serializedBytes))
+ .orElse(null);
+ }
+
+ @VisibleForTesting
+ Optional<byte[]> readColumnInputStream(String columnName) {
+ return Optional.ofNullable(header.getOrDefault(columnName, null))
+ .map(
+ startAndLength -> {
+ byte[] b = new byte[startAndLength.getRight()];
+ try {
+
seekableInputStream.seek(startAndLength.getLeft());
+ int n = 0;
+ int len = b.length;
+ // read fully until b is full else throw.
+ while (n < len) {
+ int count =
seekableInputStream.read(b, n, len - n);
+ if (count < 0) {
+ throw new EOFException();
+ }
+ n += count;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return b;
+ });
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.closeQuietly(seekableInputStream);
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
new file mode 100644
index 000000000..b07c6b8f0
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.fs.ByteArraySeekableStream;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.Or;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateVisitor;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Utils to check secondary index (e.g. bloom filter) predicate. */
+public class FileIndexPredicate implements Closeable {
+
+ private final FileIndexFormat.Reader reader;
+ private final Map<String, FileIndexFieldPredicate> fieldPredicates = new
HashMap<>();
+
+ public FileIndexPredicate(Path path, FileIO fileIO, RowType fileRowType)
throws IOException {
+ this(fileIO.newInputStream(path), fileRowType);
+ }
+
+ public FileIndexPredicate(byte[] serializedBytes, RowType fileRowType) {
+ this(new ByteArraySeekableStream(serializedBytes), fileRowType);
+ }
+
+ public FileIndexPredicate(SeekableInputStream inputStream, RowType
fileRowType) {
+ this.reader = FileIndexFormat.createReader(inputStream, fileRowType);
+ }
+
+ public boolean testPredicate(@Nullable Predicate filePredicate) {
+ if (filePredicate == null) {
+ return true;
+ }
+
+ Set<String> requredFieldNames = getRequiredNames(filePredicate);
+
+ List<FileIndexFieldPredicate> testWorkers =
+ requredFieldNames.stream()
+ .map(
+ cname ->
+ fieldPredicates.computeIfAbsent(
+ cname,
+ k ->
+ new
FileIndexFieldPredicate(
+ cname,
+
reader.readColumnIndex(cname))))
+ .collect(Collectors.toList());
+
+ for (FileIndexFieldPredicate testWorker : testWorkers) {
+ if (!testWorker.test(filePredicate)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private Set<String> getRequiredNames(Predicate filePredicate) {
+ return filePredicate.visit(
+ new PredicateVisitor<Set<String>>() {
+ final Set<String> names = new HashSet<>();
+
+ @Override
+ public Set<String> visit(LeafPredicate predicate) {
+ names.add(predicate.fieldName());
+ return names;
+ }
+
+ @Override
+ public Set<String> visit(CompoundPredicate predicate) {
+ for (Predicate child : predicate.children()) {
+ child.visit(this);
+ }
+ return names;
+ }
+ });
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.reader.close();
+ }
+
+ /** Predicate test worker. */
+ private static class FileIndexFieldPredicate implements
PredicateVisitor<Boolean> {
+
+ private final String columnName;
+ private final FileIndexReader fileIndexReader;
+
+ public FileIndexFieldPredicate(String columnName, FileIndexReader
fileIndexReader) {
+ this.columnName = columnName;
+ this.fileIndexReader = fileIndexReader;
+ }
+
+ public Boolean test(Predicate predicate) {
+ return predicate.visit(this);
+ }
+
+ @Override
+ public Boolean visit(LeafPredicate predicate) {
+ if (columnName.equals(predicate.fieldName())) {
+ return predicate
+ .function()
+ .visit(
+ fileIndexReader,
+ new FieldRef(
+ predicate.index(),
predicate.fieldName(), predicate.type()),
+ predicate.literals());
+ }
+ return true;
+ }
+
+ @Override
+ public Boolean visit(CompoundPredicate predicate) {
+
+ if (predicate.function() instanceof Or) {
+ for (Predicate predicate1 : predicate.children()) {
+ if (predicate1.visit(this)) {
+ return true;
+ }
+ }
+ return false;
+
+ } else {
+ for (Predicate predicate1 : predicate.children()) {
+ if (!predicate1.visit(this)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
new file mode 100644
index 000000000..6d9404564
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.FunctionVisitor;
+
+import java.util.List;
+
+/**
+ * Read file index from serialized bytes. Return true, means we need to search
this file, else means
+ * needn't.
+ */
+public interface FileIndexReader extends FunctionVisitor<Boolean> {
+
+ FileIndexReader recoverFrom(byte[] serializedBytes);
+
+ @Override
+ default Boolean visitIsNotNull(FieldRef fieldRef) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitIsNull(FieldRef fieldRef) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitStartsWith(FieldRef fieldRef, Object literal) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitLessThan(FieldRef fieldRef, Object literal) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitNotEqual(FieldRef fieldRef, Object literal) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitLessOrEqual(FieldRef fieldRef, Object literal) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitEqual(FieldRef fieldRef, Object literal) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitGreaterThan(FieldRef fieldRef, Object literal) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitIn(FieldRef fieldRef, List<Object> literals) {
+ for (Object key : literals) {
+ if (visitEqual(fieldRef, key)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ default Boolean visitNotIn(FieldRef fieldRef, List<Object> literals) {
+ for (Object key : literals) {
+ if (visitNotEqual(fieldRef, key)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ default Boolean visitAnd(List<Boolean> children) {
+ throw new UnsupportedOperationException("Should not invoke this");
+ }
+
+ @Override
+ default Boolean visitOr(List<Boolean> children) {
+ throw new UnsupportedOperationException("Should not invoke this");
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
new file mode 100644
index 000000000..9eab19cde
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexWriter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+/** To write file index. */
+public interface FileIndexWriter {
+
+ void write(Object key);
+
+ byte[] serializedBytes();
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java
new file mode 100644
index 000000000..e7e3d40bf
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.types.DataType;
+
+/** File index interface. To build a file index. */
+public interface FileIndexer {
+
+ FileIndexWriter createWriter();
+
+ FileIndexReader createReader();
+
+ static FileIndexer create(String type, DataType dataType) {
+ switch (type) {
+ default:
+ throw new RuntimeException("Doesn't support filter type: " +
type);
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/ByteArraySeekableStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/ByteArraySeekableStream.java
new file mode 100644
index 000000000..d6536927b
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/ByteArraySeekableStream.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+/** Wrap byte buf to a seekable input stream. */
+public class ByteArraySeekableStream extends SeekableInputStream {
+
+ private final ByteArrayStream byteArrayStream;
+
+ public ByteArraySeekableStream(byte[] buf) {
+ this.byteArrayStream = new ByteArrayStream(buf);
+ }
+
+ @Override
+ public void seek(long desired) throws IOException {
+ byteArrayStream.seek((int) desired);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return byteArrayStream.getPos();
+ }
+
+ @Override
+ public int read() throws IOException {
+ return byteArrayStream.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return byteArrayStream.read(b, off, len);
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return byteArrayStream.read(b);
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return byteArrayStream.skip(n);
+ }
+
+ @Override
+ public int available() throws IOException {
+ return byteArrayStream.available();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ byteArrayStream.mark(readlimit);
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ byteArrayStream.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return byteArrayStream.markSupported();
+ }
+
+ @Override
+ public void close() throws IOException {
+ byteArrayStream.close();
+ }
+
+ private static class ByteArrayStream extends ByteArrayInputStream {
+ public ByteArrayStream(byte[] buf) {
+ super(buf);
+ }
+
+ public void seek(int position) throws IOException {
+ if (position >= count) {
+ throw new EOFException("Can't seek position: " + position + ",
length is " + count);
+ }
+ pos = position;
+ }
+
+ public long getPos() {
+ return pos;
+ }
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
new file mode 100644
index 000000000..0f157ae99
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.fs.ByteArraySeekableStream;
+import org.apache.paimon.types.RowType;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.paimon.utils.RandomUtil.randomBytes;
+import static org.apache.paimon.utils.RandomUtil.randomString;
+
+/** Test for {@link FileIndexFormat}. */
+public class FileIndexFormatFormatTest {
+
+ private static final Random RANDOM = new Random();
+
+ @Test
+ public void testWriteRead() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FileIndexFormat.Writer writer = FileIndexFormat.createWriter(baos);
+
+ String type = randomString(RANDOM.nextInt(100));
+ Map<String, byte[]> indexes = new HashMap<>();
+ for (int i = 0; i < RANDOM.nextInt(1000); i++) {
+ indexes.put(randomString(RANDOM.nextInt(20)),
randomBytes(RANDOM.nextInt(100000)));
+ }
+
+ writer.writeColumnIndex(type, indexes);
+ writer.close();
+
+ byte[] indexBytes = baos.toByteArray();
+
+ FileIndexFormat.Reader reader =
+ FileIndexFormat.createReader(
+ new ByteArraySeekableStream(indexBytes),
RowType.builder().build());
+
+ for (String s : indexes.keySet()) {
+ byte[] b =
reader.readColumnInputStream(s).orElseThrow(RuntimeException::new);
+ Assertions.assertThat(b).containsExactly(indexes.get(s));
+ }
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/ByteArraySeekableStreamTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/ByteArraySeekableStreamTest.java
new file mode 100644
index 000000000..e725e2a3d
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/ByteArraySeekableStreamTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.apache.paimon.utils.RandomUtil.randomBytes;
+
+/** Test for {@link ByteArraySeekableStream}. */
+public class ByteArraySeekableStreamTest {
+
+ private static final Random RANDOM = new Random();
+
+ @Test
+ public void testBasic() throws IOException {
+ int bl = 100000;
+ byte[] b = randomBytes(bl);
+ ByteArraySeekableStream byteArraySeekableStream = new
ByteArraySeekableStream(b);
+
+
Assertions.assertThat(byteArraySeekableStream.available()).isEqualTo(b.length);
+
+ for (int i = 0; i < RANDOM.nextInt(1000); i++) {
+ int position = RANDOM.nextInt(bl);
+ int length = RANDOM.nextInt(b.length - position - 1);
+ byte[] expected = new byte[length];
+ System.arraycopy(b, position, expected, 0, length);
+
+ byte[] actual = new byte[length];
+ byteArraySeekableStream.seek(position);
+ byteArraySeekableStream.read(actual);
+ Assertions.assertThat(actual).containsExactly(expected);
+ }
+
+ for (int i = 0; i < RANDOM.nextInt(1000); i++) {
+ int position = RANDOM.nextInt(bl);
+ byteArraySeekableStream.seek(position);
+ for (int j = 0; j < 100; j++) {
+ Assertions.assertThat(b[position + j])
+ .isEqualTo((byte) byteArraySeekableStream.read());
+ }
+ }
+ }
+
+ @Test
+ public void testThrow() {
+ int bl = 10;
+ byte[] b = randomBytes(bl);
+ ByteArraySeekableStream byteArraySeekableStream = new
ByteArraySeekableStream(b);
+ Assertions.assertThatCode(() -> byteArraySeekableStream.seek(10))
+ .hasMessage("Can't seek position: 10, length is 10");
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/utils/RandomUtil.java
b/paimon-common/src/test/java/org/apache/paimon/utils/RandomUtil.java
new file mode 100644
index 000000000..e6d6805b2
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/RandomUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.Random;
+
+/** Utils for tests. */
+public class RandomUtil {
+
+ private static final Random RANDOM = new Random();
+
+ public static byte[] randomBytes(int length) {
+ byte[] b = new byte[length];
+ RANDOM.nextBytes(b);
+ return b;
+ }
+
+ public static String randomString(int length) {
+ byte[] buffer = new byte[length];
+
+ for (int i = 0; i < length; i += 1) {
+ buffer[i] = (byte) ('a' + RANDOM.nextInt(26));
+ }
+
+ return new String(buffer);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
b/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
index 801a1927d..c94327817 100644
--- a/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
@@ -34,6 +34,8 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
+import static org.apache.paimon.utils.RandomUtil.randomString;
+
/** Tests for {@link ZIndexer}. */
public class ZIndexerTest {
@@ -135,14 +137,4 @@ public class ZIndexerTest {
}
}
}
-
- public static String randomString(int length) {
- byte[] buffer = new byte[length];
-
- for (int i = 0; i < length; i += 1) {
- buffer[i] = (byte) ('a' + RANDOM.nextInt(26));
- }
-
- return new String(buffer);
- }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
b/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
index dded0cf7e..92786b701 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
@@ -26,6 +26,9 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
+import static org.apache.paimon.utils.RandomUtil.randomBytes;
+import static org.apache.paimon.utils.RandomUtil.randomString;
+
/* This file is based on source code from the Iceberg Project
(http://iceberg.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
* additional information regarding copyright ownership. */
@@ -352,8 +355,8 @@ public class TestZOrderByteUtil {
ByteBuffer aBuffer = ByteBuffer.allocate(128);
ByteBuffer bBuffer = ByteBuffer.allocate(128);
for (int i = 0; i < NUM_TESTS; i++) {
- String aString = randomString();
- String bString = randomString();
+ String aString = randomString(50);
+ String bString = randomString(50);
int stringCompare = Integer.signum(aString.compareTo(bString));
byte[] aBytes = ZOrderByteUtils.stringToOrderedBytes(aString, 128,
aBuffer).array();
byte[] bBytes = ZOrderByteUtils.stringToOrderedBytes(bString, 128,
bBuffer).array();
@@ -380,8 +383,8 @@ public class TestZOrderByteUtil {
ByteBuffer aBuffer = ByteBuffer.allocate(128);
ByteBuffer bBuffer = ByteBuffer.allocate(128);
for (int i = 0; i < NUM_TESTS; i++) {
- byte[] aBytesRaw = randomBytes();
- byte[] bBytesRaw = randomBytes();
+ byte[] aBytesRaw = randomBytes(50);
+ byte[] bBytesRaw = randomBytes(50);
int stringCompare =
Integer.signum(
UnsignedBytes.lexicographicalComparator()
@@ -405,21 +408,4 @@ public class TestZOrderByteUtil {
byteCompare);
}
}
-
- private byte[] randomBytes() {
- byte[] binary = new byte[random.nextInt(50)];
- random.nextBytes(binary);
- return binary;
- }
-
- private String randomString() {
- int length = random.nextInt(50);
- byte[] buffer = new byte[length];
-
- for (int i = 0; i < length; i += 1) {
- buffer[i] = (byte) ('a' + random.nextInt(26));
- }
-
- return new String(buffer);
- }
}