This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 76c4f55c57b1 fix: ensure that InlineFS is seeked to the correct offset
upon init (#14178)
76c4f55c57b1 is described below
commit 76c4f55c57b14d372f2bd92f2aa4001e3ac0e75f
Author: voonhous <[email protected]>
AuthorDate: Thu Oct 30 17:18:37 2025 +0800
fix: ensure that InlineFS is seeked to the correct offset upon init (#14178)
---
.../hudi/hadoop/fs/inline/InLineFileSystem.java | 23 ++
.../apache/hudi/storage/inline/InLineFSUtils.java | 9 +
.../hudi/io/InlineSeekableDataInputStream.java | 22 ++
.../hudi/io/TestInlineSeekableDataInputStream.java | 244 +++++++++++++++++++++
4 files changed, 298 insertions(+)
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/inline/InLineFileSystem.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/inline/InLineFileSystem.java
index 9296b7178999..dab1bc9f90e3 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/inline/InLineFileSystem.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/inline/InLineFileSystem.java
@@ -43,6 +43,29 @@ import static
org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
* - Reading an inlined file at a given offset, length, read it out as if it
were an independent file of that length
* - Inlined path is of the form
"inlinefs:///path/to/outer/file/<outer_file_scheme>/?start_offset=<start_offset>&length=<length>
* <p>
+ * Example:
+ * <pre>
+ *
inlinefs://tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/column_stats/
+ *
.col-stats-0001-0_20250429145946675.log.1_1-120-382/local/?start_offset=8036&length=6959
+ * </pre>
+ * <p>
+ * In this example:
+ * <ul>
+ * <li>The outer file path is: {@code
tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/
+ * column_stats/.col-stats-0001-0_20250429145946675.log.1_1-120-382}</li>
+ * <li>The outer file scheme is: {@code local} (representing local file
system)</li>
+ * <li>The inline content starts at byte offset: {@code 8036}</li>
+ * <li>The inline content length is: {@code 6959} bytes</li>
+ * </ul>
+ * <p>
+ * When this path is opened, the file system will:
+ * <ol>
+ * <li>Extract the outer file path and scheme from the URL</li>
+ * <li>Open the outer file using the appropriate file system</li>
+ * <li>Seek to the start_offset (8036 in the example)</li>
+ * <li>Present the next 'length' bytes (6959 in the example) as if they were
an independent file</li>
+ * </ol>
+ * <p>
* TODO: The reader/writer may try to use relative paths based on the
inlinepath and it may not work. Need to handle
* this gracefully eg. the parquet summary metadata reading. TODO: If this
shows promise, also support directly writing
* the inlined file to the underneath file without buffer
diff --git
a/hudi-io/src/main/java/org/apache/hudi/storage/inline/InLineFSUtils.java
b/hudi-io/src/main/java/org/apache/hudi/storage/inline/InLineFSUtils.java
index 97b8de500509..13aade72865a 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/inline/InLineFSUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/inline/InLineFSUtils.java
@@ -38,9 +38,18 @@ public class InLineFSUtils {
* Get the InlineFS Path for a given schema and its Path.
* <p>
* Examples:
+ * <pre>
* Input Path: s3a://file1, origScheme: file, startOffset = 20, length = 40
* Output: "inlinefs://file1/s3a/?start_offset=20&length=40"
*
+ * Real-world example:
+ * Input Path:
tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/column_stats/
+ * .col-stats-0001-0_20250429145946675.log.1_1-120-382
+ * origScheme: local, startOffset = 8036, length = 6959
+ * Output:
"inlinefs://tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/column_stats/
+ *
.col-stats-0001-0_20250429145946675.log.1_1-120-382/local/?start_offset=8036&length=6959"
+ * </pre>
+ *
* @param outerPath The outer file path
* @param origScheme The file schema
* @param inLineStartOffset Start offset for the inline file
diff --git
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java
index 5b1c322e0a39..f75ec7a55f90 100644
---
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java
+++
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java
@@ -17,6 +17,26 @@ import io.trino.filesystem.TrinoInputStream;
import java.io.IOException;
+/**
+ * A seekable data input stream that reads inline content from an outer file
at a specific offset and length.
+ * <p>
+ * This class is used when reading inline files stored within larger files
(e.g., log blocks embedded in Hudi log files).
+ * It provides a view of a segment of the underlying stream as if it were an
independent file.
+ * <p>
+ * Example InlineFS URL:
+ * <pre>
+ *
inlinefs://tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/column_stats/
+ *
.col-stats-0001-0_20250429145946675.log.1_1-120-382/local/?start_offset=8036&length=6959
+ * </pre>
+ * <p>
+ * Key behaviors:
+ * <ul>
+ * <li>Upon initialization, the underlying stream is immediately seeked to
the start offset to ensure correct positioning</li>
+ * <li>{@link #getPos()} returns positions relative to the start offset
(0-based from the inline content start)</li>
+ * <li>{@link #seek(long)} accepts positions relative to the start offset
and translates them to absolute positions</li>
+ * <li>Attempting to seek beyond the length throws an {@link
IOException}</li>
+ * </ul>
+ */
public class InlineSeekableDataInputStream
extends TrinoSeekableDataInputStream
{
@@ -24,10 +44,12 @@ public class InlineSeekableDataInputStream
private final long length;
public InlineSeekableDataInputStream(TrinoInputStream stream, long
startOffset, long length)
+ throws IOException
{
super(stream);
this.startOffset = startOffset;
this.length = length;
+ stream.seek(startOffset);
}
@Override
diff --git
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/io/TestInlineSeekableDataInputStream.java
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/io/TestInlineSeekableDataInputStream.java
new file mode 100644
index 000000000000..b4656834a6af
--- /dev/null
+++
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/io/TestInlineSeekableDataInputStream.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi.io;
+
+import io.trino.filesystem.TrinoInputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class TestInlineSeekableDataInputStream
+{
+ public static final String CONST_STR_FOR_BYTES = "0123456789ABCDEFGHIJ";
+
+ @Test
+ void testStreamIsSeekableToStartOffsetUponInitialization()
+ throws IOException
+ {
+ // Create a test stream with data at various positions
+ byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
+ TestTrinoInputStream stream = new TestTrinoInputStream(data);
+
+ long startOffset = 5;
+ long length = 10;
+
+ // Initialize InlineSeekableDataInputStream
+ InlineSeekableDataInputStream inlineStream = new
InlineSeekableDataInputStream(stream, startOffset, length);
+
+ // Verify the stream was seeked to the startOffset during
initialization
+ assertThat(stream.getPosition()).isEqualTo(startOffset);
+
+ // Verify getPos() returns 0 (relative to startOffset)
+ assertThat(inlineStream.getPos()).isEqualTo(0);
+ }
+
+ @Test
+ void testGetPosReturnsRelativePosition()
+ throws IOException
+ {
+ byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
+ TestTrinoInputStream stream = new TestTrinoInputStream(data);
+
+ long startOffset = 3;
+ long length = 10;
+
+ InlineSeekableDataInputStream inlineStream = new
InlineSeekableDataInputStream(stream, startOffset, length);
+
+ // Initially at position 0 (relative)
+ assertThat(inlineStream.getPos()).isEqualTo(0);
+
+ // Seek to position 5 (relative)
+ inlineStream.seek(5);
+ assertThat(inlineStream.getPos()).isEqualTo(5);
+
+ // Verify underlying stream is at startOffset + 5
+ assertThat(stream.getPosition()).isEqualTo(startOffset + 5);
+ }
+
+ @Test
+ void testSeekWithinBounds()
+ throws IOException
+ {
+ byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
+ TestTrinoInputStream stream = new TestTrinoInputStream(data);
+
+ long startOffset = 2;
+ long length = 8;
+
+ InlineSeekableDataInputStream inlineStream = new
InlineSeekableDataInputStream(stream, startOffset, length);
+
+ // Seek to the middle
+ inlineStream.seek(4);
+ assertThat(inlineStream.getPos()).isEqualTo(4);
+ assertThat(stream.getPosition()).isEqualTo(startOffset + 4);
+
+ // Seek to the end (length is exclusive, so seeking to length should
work)
+ inlineStream.seek(8);
+ assertThat(inlineStream.getPos()).isEqualTo(8);
+ assertThat(stream.getPosition()).isEqualTo(startOffset + 8);
+ }
+
+ @Test
+ void testSeekPastLengthThrowsException()
+ throws IOException
+ {
+ byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
+ TestTrinoInputStream stream = new TestTrinoInputStream(data);
+
+ long startOffset = 5;
+ long length = 10;
+
+ InlineSeekableDataInputStream inlineStream = new
InlineSeekableDataInputStream(stream, startOffset, length);
+
+ // Attempting to seek past the length should throw IOException
+ assertThatThrownBy(() -> inlineStream.seek(11))
+ .isInstanceOf(IOException.class)
+ .hasMessageContaining("Attempting to seek past inline content")
+ .hasMessageContaining("position to seek to is 11")
+ .hasMessageContaining("but the length is 10");
+ }
+
+ @Test
+ void testReadDataAtCorrectOffset()
+ throws IOException
+ {
+ byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
+ TestTrinoInputStream stream = new TestTrinoInputStream(data);
+
+ long startOffset = 5; // Start at '5'
+ long length = 5; // Read 5 bytes: "56789"
+
+ InlineSeekableDataInputStream inlineStream = new
InlineSeekableDataInputStream(stream, startOffset, length);
+
+ // Read first byte should be '5' (byte at position startOffset)
+ int firstByte = inlineStream.read();
+ assertThat(firstByte).isEqualTo('5');
+ assertThat(inlineStream.getPos()).isEqualTo(1);
+
+ // Read next byte should be '6'
+ int secondByte = inlineStream.read();
+ assertThat(secondByte).isEqualTo('6');
+ assertThat(inlineStream.getPos()).isEqualTo(2);
+ }
+
+ @Test
+ void testSeekAndRead()
+ throws IOException
+ {
+ byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
+ TestTrinoInputStream stream = new TestTrinoInputStream(data);
+
+ long startOffset = 10; // Start at 'A'
+ long length = 10; // Length covers "ABCDEFGHIJ"
+
+ InlineSeekableDataInputStream inlineStream = new
InlineSeekableDataInputStream(stream, startOffset, length);
+
+ // Seek to position 3 (relative) - should be at 'D' (position 13
absolute)
+ inlineStream.seek(3);
+ assertThat(inlineStream.getPos()).isEqualTo(3);
+
+ // Read should get 'D'
+ int readByte = inlineStream.read();
+ assertThat(readByte).isEqualTo('D');
+ assertThat(inlineStream.getPos()).isEqualTo(4);
+ }
+
+ @Test
+ void testZeroLengthInlineStream()
+ throws IOException
+ {
+ byte[] data = "0123456789".getBytes(StandardCharsets.UTF_8);
+ TestTrinoInputStream stream = new TestTrinoInputStream(data);
+
+ long startOffset = 5;
+ long length = 0;
+
+ InlineSeekableDataInputStream inlineStream = new
InlineSeekableDataInputStream(stream, startOffset, length);
+
+ // Position should be 0
+ assertThat(inlineStream.getPos()).isEqualTo(0);
+
+ // Seeking to any position > 0 should fail
+ assertThatThrownBy(() -> inlineStream.seek(1))
+ .isInstanceOf(IOException.class)
+ .hasMessageContaining("Attempting to seek past inline
content");
+ }
+
+ /**
+ * Test implementation of TrinoInputStream for unit testing.
+ */
+ private static class TestTrinoInputStream
+ extends TrinoInputStream
+ {
+ private final byte[] data;
+ private int position;
+
+ public TestTrinoInputStream(byte[] data)
+ {
+ this.data = data;
+ this.position = 0;
+ }
+
+ @Override
+ public long getPosition()
+ {
+ return position;
+ }
+
+ @Override
+ public void seek(long position)
+ throws IOException
+ {
+ if (position < 0 || position > data.length) {
+ throw new IOException("Invalid seek position: " + position);
+ }
+ this.position = (int) position;
+ }
+
+ @Override
+ public int read()
+ throws IOException
+ {
+ if (position >= data.length) {
+ return -1;
+ }
+ return data[position++] & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len)
+ throws IOException
+ {
+ if (position >= data.length) {
+ return -1;
+ }
+ int available = data.length - position;
+ int toRead = Math.min(len, available);
+ System.arraycopy(data, position, b, off, toRead);
+ position += toRead;
+ return toRead;
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ // No-op for test
+ }
+ }
+}