This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 76c4f55c57b1 fix: ensure that InlineFS is seeked to the correct offset 
upon init (#14178)
76c4f55c57b1 is described below

commit 76c4f55c57b14d372f2bd92f2aa4001e3ac0e75f
Author: voonhous <[email protected]>
AuthorDate: Thu Oct 30 17:18:37 2025 +0800

    fix: ensure that InlineFS is seeked to the correct offset upon init (#14178)
---
 .../hudi/hadoop/fs/inline/InLineFileSystem.java    |  23 ++
 .../apache/hudi/storage/inline/InLineFSUtils.java  |   9 +
 .../hudi/io/InlineSeekableDataInputStream.java     |  22 ++
 .../hudi/io/TestInlineSeekableDataInputStream.java | 244 +++++++++++++++++++++
 4 files changed, 298 insertions(+)

diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/inline/InLineFileSystem.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/inline/InLineFileSystem.java
index 9296b7178999..dab1bc9f90e3 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/inline/InLineFileSystem.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/inline/InLineFileSystem.java
@@ -43,6 +43,29 @@ import static 
org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
  * - Reading an inlined file at a given offset, length, read it out as if it 
were an independent file of that length
  * - Inlined path is of the form 
"inlinefs:///path/to/outer/file/<outer_file_scheme>/?start_offset=<start_offset>&length=<length>
  * <p>
+ * Example:
+ * <pre>
+ * 
inlinefs://tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/column_stats/
+ * 
.col-stats-0001-0_20250429145946675.log.1_1-120-382/local/?start_offset=8036&length=6959
+ * </pre>
+ * <p>
+ * In this example:
+ * <ul>
+ *   <li>The outer file path is: {@code 
tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/
+ *       column_stats/.col-stats-0001-0_20250429145946675.log.1_1-120-382}</li>
+ *   <li>The outer file scheme is: {@code local} (representing local file 
system)</li>
+ *   <li>The inline content starts at byte offset: {@code 8036}</li>
+ *   <li>The inline content length is: {@code 6959} bytes</li>
+ * </ul>
+ * <p>
+ * When this path is opened, the file system will:
+ * <ol>
+ *   <li>Extract the outer file path and scheme from the URL</li>
+ *   <li>Open the outer file using the appropriate file system</li>
+ *   <li>Seek to the start_offset (8036 in the example)</li>
+ *   <li>Present the next 'length' bytes (6959 in the example) as if they were 
an independent file</li>
+ * </ol>
+ * <p>
  * TODO: The reader/writer may try to use relative paths based on the 
inlinepath and it may not work. Need to handle
  * this gracefully eg. the parquet summary metadata reading. TODO: If this 
shows promise, also support directly writing
  * the inlined file to the underneath file without buffer
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/storage/inline/InLineFSUtils.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/inline/InLineFSUtils.java
index 97b8de500509..13aade72865a 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/inline/InLineFSUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/inline/InLineFSUtils.java
@@ -38,9 +38,18 @@ public class InLineFSUtils {
    * Get the InlineFS Path for a given schema and its Path.
    * <p>
    * Examples:
+   * <pre>
    * Input Path: s3a://file1, origScheme: file, startOffset = 20, length = 40
    * Output: "inlinefs://file1/s3a/?start_offset=20&length=40"
    *
+   * Real-world example:
+   * Input Path: 
tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/column_stats/
+   *             .col-stats-0001-0_20250429145946675.log.1_1-120-382
+   * origScheme: local, startOffset = 8036, length = 6959
+   * Output: 
"inlinefs://tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/column_stats/
+   *          
.col-stats-0001-0_20250429145946675.log.1_1-120-382/local/?start_offset=8036&length=6959"
+   * </pre>
+   *
    * @param outerPath         The outer file path
    * @param origScheme        The file schema
    * @param inLineStartOffset Start offset for the inline file
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java
 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java
index 5b1c322e0a39..f75ec7a55f90 100644
--- 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java
+++ 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java
@@ -17,6 +17,26 @@ import io.trino.filesystem.TrinoInputStream;
 
 import java.io.IOException;
 
+/**
+ * A seekable data input stream that reads inline content from an outer file 
at a specific offset and length.
+ * <p>
+ * This class is used when reading inline files stored within larger files 
(e.g., log blocks embedded in Hudi log files).
+ * It provides a view of a segment of the underlying stream as if it were an 
independent file.
+ * <p>
+ * Example InlineFS URL:
+ * <pre>
+ * 
inlinefs://tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/column_stats/
+ * 
.col-stats-0001-0_20250429145946675.log.1_1-120-382/local/?start_offset=8036&length=6959
+ * </pre>
+ * <p>
+ * Key behaviors:
+ * <ul>
+ *   <li>Upon initialization, the underlying stream is immediately seeked to 
the start offset to ensure correct positioning</li>
+ *   <li>{@link #getPos()} returns positions relative to the start offset 
(0-based from the inline content start)</li>
+ *   <li>{@link #seek(long)} accepts positions relative to the start offset 
and translates them to absolute positions</li>
+ *   <li>Attempting to seek beyond the length throws an {@link 
IOException}</li>
+ * </ul>
+ */
 public class InlineSeekableDataInputStream
         extends TrinoSeekableDataInputStream
 {
@@ -24,10 +44,12 @@ public class InlineSeekableDataInputStream
     private final long length;
 
     public InlineSeekableDataInputStream(TrinoInputStream stream, long 
startOffset, long length)
+            throws IOException
     {
         super(stream);
         this.startOffset = startOffset;
         this.length = length;
+        stream.seek(startOffset);
     }
 
     @Override
diff --git 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/io/TestInlineSeekableDataInputStream.java
 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/io/TestInlineSeekableDataInputStream.java
new file mode 100644
index 000000000000..b4656834a6af
--- /dev/null
+++ 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/io/TestInlineSeekableDataInputStream.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi.io;
+
+import io.trino.filesystem.TrinoInputStream;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class TestInlineSeekableDataInputStream
+{
+    public static final String CONST_STR_FOR_BYTES = "0123456789ABCDEFGHIJ";
+
+    @Test
+    void testStreamIsSeekableToStartOffsetUponInitialization()
+            throws IOException
+    {
+        // Create a test stream with data at various positions
+        byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
+        TestTrinoInputStream stream = new TestTrinoInputStream(data);
+
+        long startOffset = 5;
+        long length = 10;
+
+        // Initialize InlineSeekableDataInputStream
+        InlineSeekableDataInputStream inlineStream = new 
InlineSeekableDataInputStream(stream, startOffset, length);
+
+        // Verify the stream was seeked to the startOffset during 
initialization
+        assertThat(stream.getPosition()).isEqualTo(startOffset);
+
+        // Verify getPos() returns 0 (relative to startOffset)
+        assertThat(inlineStream.getPos()).isEqualTo(0);
+    }
+
+    @Test
+    void testGetPosReturnsRelativePosition()
+            throws IOException
+    {
+        byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
+        TestTrinoInputStream stream = new TestTrinoInputStream(data);
+
+        long startOffset = 3;
+        long length = 10;
+
+        InlineSeekableDataInputStream inlineStream = new 
InlineSeekableDataInputStream(stream, startOffset, length);
+
+        // Initially at position 0 (relative)
+        assertThat(inlineStream.getPos()).isEqualTo(0);
+
+        // Seek to position 5 (relative)
+        inlineStream.seek(5);
+        assertThat(inlineStream.getPos()).isEqualTo(5);
+
+        // Verify underlying stream is at startOffset + 5
+        assertThat(stream.getPosition()).isEqualTo(startOffset + 5);
+    }
+
+    @Test
+    void testSeekWithinBounds()
+            throws IOException
+    {
+        byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
+        TestTrinoInputStream stream = new TestTrinoInputStream(data);
+
+        long startOffset = 2;
+        long length = 8;
+
+        InlineSeekableDataInputStream inlineStream = new 
InlineSeekableDataInputStream(stream, startOffset, length);
+
+        // Seek to the middle
+        inlineStream.seek(4);
+        assertThat(inlineStream.getPos()).isEqualTo(4);
+        assertThat(stream.getPosition()).isEqualTo(startOffset + 4);
+
+        // Seek to the end (length is exclusive, so seeking to length should 
work)
+        inlineStream.seek(8);
+        assertThat(inlineStream.getPos()).isEqualTo(8);
+        assertThat(stream.getPosition()).isEqualTo(startOffset + 8);
+    }
+
+    @Test
+    void testSeekPastLengthThrowsException()
+            throws IOException
+    {
+        byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
+        TestTrinoInputStream stream = new TestTrinoInputStream(data);
+
+        long startOffset = 5;
+        long length = 10;
+
+        InlineSeekableDataInputStream inlineStream = new 
InlineSeekableDataInputStream(stream, startOffset, length);
+
+        // Attempting to seek past the length should throw IOException
+        assertThatThrownBy(() -> inlineStream.seek(11))
+                .isInstanceOf(IOException.class)
+                .hasMessageContaining("Attempting to seek past inline content")
+                .hasMessageContaining("position to seek to is 11")
+                .hasMessageContaining("but the length is 10");
+    }
+
+    @Test
+    void testReadDataAtCorrectOffset()
+            throws IOException
+    {
+        byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
+        TestTrinoInputStream stream = new TestTrinoInputStream(data);
+
+        long startOffset = 5; // Start at '5'
+        long length = 5;      // Read 5 bytes: "56789"
+
+        InlineSeekableDataInputStream inlineStream = new 
InlineSeekableDataInputStream(stream, startOffset, length);
+
+        // Read first byte should be '5' (byte at position startOffset)
+        int firstByte = inlineStream.read();
+        assertThat(firstByte).isEqualTo('5');
+        assertThat(inlineStream.getPos()).isEqualTo(1);
+
+        // Read next byte should be '6'
+        int secondByte = inlineStream.read();
+        assertThat(secondByte).isEqualTo('6');
+        assertThat(inlineStream.getPos()).isEqualTo(2);
+    }
+
+    @Test
+    void testSeekAndRead()
+            throws IOException
+    {
+        byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
+        TestTrinoInputStream stream = new TestTrinoInputStream(data);
+
+        long startOffset = 10; // Start at 'A'
+        long length = 10;      // Length covers "ABCDEFGHIJ"
+
+        InlineSeekableDataInputStream inlineStream = new 
InlineSeekableDataInputStream(stream, startOffset, length);
+
+        // Seek to position 3 (relative) - should be at 'D' (position 13 
absolute)
+        inlineStream.seek(3);
+        assertThat(inlineStream.getPos()).isEqualTo(3);
+
+        // Read should get 'D'
+        int readByte = inlineStream.read();
+        assertThat(readByte).isEqualTo('D');
+        assertThat(inlineStream.getPos()).isEqualTo(4);
+    }
+
+    @Test
+    void testZeroLengthInlineStream()
+            throws IOException
+    {
+        byte[] data = "0123456789".getBytes(StandardCharsets.UTF_8);
+        TestTrinoInputStream stream = new TestTrinoInputStream(data);
+
+        long startOffset = 5;
+        long length = 0;
+
+        InlineSeekableDataInputStream inlineStream = new 
InlineSeekableDataInputStream(stream, startOffset, length);
+
+        // Position should be 0
+        assertThat(inlineStream.getPos()).isEqualTo(0);
+
+        // Seeking to any position > 0 should fail
+        assertThatThrownBy(() -> inlineStream.seek(1))
+                .isInstanceOf(IOException.class)
+                .hasMessageContaining("Attempting to seek past inline 
content");
+    }
+
+    /**
+     * Test implementation of TrinoInputStream for unit testing.
+     */
+    private static class TestTrinoInputStream
+            extends TrinoInputStream
+    {
+        private final byte[] data;
+        private int position;
+
+        public TestTrinoInputStream(byte[] data)
+        {
+            this.data = data;
+            this.position = 0;
+        }
+
+        @Override
+        public long getPosition()
+        {
+            return position;
+        }
+
+        @Override
+        public void seek(long position)
+                throws IOException
+        {
+            if (position < 0 || position > data.length) {
+                throw new IOException("Invalid seek position: " + position);
+            }
+            this.position = (int) position;
+        }
+
+        @Override
+        public int read()
+                throws IOException
+        {
+            if (position >= data.length) {
+                return -1;
+            }
+            return data[position++] & 0xFF;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len)
+                throws IOException
+        {
+            if (position >= data.length) {
+                return -1;
+            }
+            int available = data.length - position;
+            int toRead = Math.min(len, available);
+            System.arraycopy(data, position, b, off, toRead);
+            position += toRead;
+            return toRead;
+        }
+
+        @Override
+        public void close()
+                throws IOException
+        {
+            // No-op for test
+        }
+    }
+}

Reply via email to