This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a423d05c839 [FLINK-39603][s3] Optimize NativeS3InputStream via seeks 
and reduce IOPS
a423d05c839 is described below

commit a423d05c839e71bd49a6ae6bf831161e1af2893c
Author: Samrat <[email protected]>
AuthorDate: Mon May 11 19:17:40 2026 +0530

    [FLINK-39603][s3] Optimize NativeS3InputStream via seeks and reduce IOPS
---
 .../fs/s3native/NativeS3FileSystemFactory.java     |   8 +-
 .../flink/fs/s3native/NativeS3InputStream.java     | 168 ++++----
 .../flink/fs/s3native/NativeS3InputStreamTest.java | 469 +++++++++++++++++----
 3 files changed, 481 insertions(+), 164 deletions(-)

diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index b7c0de3acfc..f141125b625 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -371,15 +371,11 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                 maxConcurrentUploads);
 
         boolean useAsyncOperations = config.get(USE_ASYNC_OPERATIONS);
-
-        // Validate and clamp read buffer size to sensible range [64KB, 4MB]
-        // We clip rather than throw to provide flexibility while preventing 
extreme values
         int configuredReadBufferSize = config.get(READ_BUFFER_SIZE);
-        int readBufferSize =
-                Math.max(64 * 1024, Math.min(configuredReadBufferSize, 4 * 
1024 * 1024));
+        int readBufferSize = Math.max(256 * 1024, configuredReadBufferSize);
         if (readBufferSize != configuredReadBufferSize) {
             LOG.warn(
-                    "{} value {} was outside valid range [64KB, 4MB]. Using {} 
instead.",
+                    "{} value {} was below 64KB. Using {} instead.",
                     READ_BUFFER_SIZE.key(),
                     configuredReadBufferSize,
                     readBufferSize);
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java
index 32368dd94f6..dd2a80e4a70 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java
@@ -35,22 +35,20 @@ import java.io.IOException;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * S3 input stream with configurable read-ahead buffer, range-based requests 
for seek operations,
- * automatic stream reopening on errors, and lazy initialization to minimize 
memory footprint.
+ * S3 input stream with configurable read-ahead buffer, lazy seek, and 
automatic stream reopening.
  *
- * <p><b>Thread Safety:</b> Internal state is guarded by a lock to ensure safe 
concurrent access and
- * resource cleanup.
+ * <p>{@link #seek(long)} only records the desired position without performing 
any I/O. All HTTP
+ * work is deferred to the next {@link #read()} call via {@link #lazySeek()}, 
so multiple seeks
+ * between reads coalesce. When the seek is forward and within {@code 
readBufferSize}, bytes are
+ * skipped in-buffer instead of reopening the HTTP connection.
  */
 class NativeS3InputStream extends FSDataInputStream {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(NativeS3InputStream.class);
 
-    /** Default read-ahead buffer size: 256KB. */
+    /** Default read-ahead buffer size. */
     private static final int DEFAULT_READ_BUFFER_SIZE = 256 * 1024;
 
-    /** Maximum buffer size for very large sequential reads. */
-    private static final int MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB
-
     private final ReentrantLock lock = new ReentrantLock();
 
     private final S3Client s3Client;
@@ -65,8 +63,19 @@ class NativeS3InputStream extends FSDataInputStream {
     @GuardedBy("lock")
     private BufferedInputStream bufferedStream;
 
+    /**
+     * The position the caller expects to read from next. Updated by {@link 
#seek(long)}, {@link
+     * #skip(long)}, and after every successful {@link #read()}.
+     */
+    @GuardedBy("lock")
+    private long nextReadPos;
+
+    /**
+     * The actual byte offset of the underlying stream cursor, reconciled 
lazily via {@link
+     * #lazySeek()}.
+     */
     @GuardedBy("lock")
-    private long position;
+    private long streamPos;
 
     @GuardedBy("lock")
     private volatile boolean closed;
@@ -86,8 +95,9 @@ class NativeS3InputStream extends FSDataInputStream {
         this.bucketName = bucketName;
         this.key = key;
         this.contentLength = contentLength;
-        this.readBufferSize = Math.min(readBufferSize, MAX_READ_BUFFER_SIZE);
-        this.position = 0;
+        this.readBufferSize = readBufferSize;
+        this.nextReadPos = 0;
+        this.streamPos = 0;
         this.closed = false;
 
         LOG.debug(
@@ -98,39 +108,63 @@ class NativeS3InputStream extends FSDataInputStream {
                 this.readBufferSize / 1024);
     }
 
+    /** Reconciles {@link #nextReadPos} and {@link #streamPos} before reading 
bytes. */
     @GuardedBy("lock")
-    private void lazyInitialize() throws IOException {
-        assert lock.isHeldByCurrentThread() : "lazyInitialize() requires lock 
to be held";
-        if (currentStream == null && !closed) {
-            openStreamAtCurrentPosition();
-        }
-    }
+    private void lazySeek() throws IOException {
+        assert lock.isHeldByCurrentThread() : "lazySeek() requires lock to be 
held";
+        long targetPos = nextReadPos;
 
-    /** At EOF, release instead of reopening: {@code bytes=contentLength-} 
returns S3 416. */
-    @GuardedBy("lock")
-    private void repositionOpenStream() throws IOException {
-        assert lock.isHeldByCurrentThread() : "repositionOpenStream() requires 
lock to be held";
         if (currentStream == null) {
+            streamPos = targetPos;
             return;
         }
-        if (position >= contentLength) {
+
+        if (targetPos == streamPos) {
+            return;
+        }
+
+        long diff = targetPos - streamPos;
+        streamPos = targetPos;
+
+        if (targetPos >= contentLength) {
             releaseStreams();
-        } else {
+            return;
+        }
+
+        // BufferedInputStream does not expose how many bytes are in its local 
array, so
+        // readBufferSize is used as the skip threshold: at most 
readBufferSize bytes may be
+        // consumed from the live HTTP connection before a range request is 
preferred instead.
+        if (diff > 0 && diff <= (long) readBufferSize) {
+            skipBytesInBuffer(diff);
+            return;
+        }
+
+        openStreamAtCurrentPosition();
+    }
+
+    @GuardedBy("lock")
+    private void ensureStreamOpen() throws IOException {
+        assert lock.isHeldByCurrentThread() : "ensureStreamOpen() requires 
lock to be held";
+        if (currentStream == null && !closed) {
             openStreamAtCurrentPosition();
         }
     }
 
-    /**
-     * Opens (or reopens) the S3 stream at the current position.
-     *
-     * <p>This method:
-     *
-     * <ul>
-     *   <li>Closes any existing stream
-     *   <li>Opens a new stream starting at {@link #position}
-     *   <li>Uses HTTP range requests for non-zero positions
-     * </ul>
-     */
+    @GuardedBy("lock")
+    private void skipBytesInBuffer(long n) throws IOException {
+        assert lock.isHeldByCurrentThread() : "skipBytesInBuffer() requires 
lock to be held";
+        long remaining = n;
+        while (remaining > 0) {
+            long skipped = bufferedStream.skip(remaining);
+            if (skipped <= 0) {
+                openStreamAtCurrentPosition();
+                return;
+            }
+            remaining -= skipped;
+        }
+    }
+
+    /** Opens (or reopens) the S3 stream at {@link #streamPos}. */
     private void openStreamAtCurrentPosition() throws IOException {
         lock.lock();
         try {
@@ -140,11 +174,11 @@ class NativeS3InputStream extends FSDataInputStream {
                 GetObjectRequest.Builder requestBuilder =
                         GetObjectRequest.builder().bucket(bucketName).key(key);
 
-                if (position > 0) {
-                    requestBuilder.range(String.format("bytes=%d-", position));
+                if (streamPos > 0) {
+                    requestBuilder.range(String.format("bytes=%d-", 
streamPos));
                     LOG.debug(
                             "Opening S3 stream with range: bytes={}-{}",
-                            position,
+                            streamPos,
                             contentLength - 1);
                 } else {
                     LOG.debug("Opening S3 stream for full object: {} bytes", 
contentLength);
@@ -160,12 +194,7 @@ class NativeS3InputStream extends FSDataInputStream {
         }
     }
 
-    /**
-     * Aborts the in-flight HTTP connection so that subsequent {@code close()} 
calls on the stream
-     * do not drain remaining bytes over the network.
-     *
-     * @see ResponseInputStream#abort()
-     */
+    /** Aborts the in-flight HTTP connection to avoid draining remaining bytes 
on close. */
     @GuardedBy("lock")
     private void abortCurrentStream() {
         assert lock.isHeldByCurrentThread() : "abortCurrentStream() requires 
lock to be held";
@@ -179,11 +208,10 @@ class NativeS3InputStream extends FSDataInputStream {
     }
 
     /**
-     * Aborts and closes both streams, nulling the references. The abort is 
called first to prevent
-     * {@link ResponseInputStream#close()} from draining remaining bytes over 
the network.
+     * Aborts and closes both streams, nulling the references.
      *
      * @return the first {@link IOException} encountered (with subsequent ones 
added as suppressed),
-     *     or {@code null} if cleanup succeeded without errors
+     *     or {@code null} if cleanup succeeded
      */
     @GuardedBy("lock")
     private IOException releaseStreams() {
@@ -236,10 +264,7 @@ class NativeS3InputStream extends FSDataInputStream {
                                 + contentLength);
             }
 
-            if (desired != position) {
-                position = desired;
-                repositionOpenStream();
-            }
+            nextReadPos = desired;
         } finally {
             lock.unlock();
         }
@@ -249,7 +274,7 @@ class NativeS3InputStream extends FSDataInputStream {
     public long getPos() throws IOException {
         lock();
         try {
-            return position;
+            return nextReadPos;
         } finally {
             lock.unlock();
         }
@@ -262,13 +287,15 @@ class NativeS3InputStream extends FSDataInputStream {
             if (closed) {
                 throw new IOException("Stream is closed");
             }
-            if (position >= contentLength) {
+            if (nextReadPos >= contentLength) {
                 return -1;
             }
-            lazyInitialize();
+            lazySeek();
+            ensureStreamOpen();
             int data = bufferedStream.read();
             if (data != -1) {
-                position++;
+                nextReadPos++;
+                streamPos++;
             }
             return data;
         } finally {
@@ -295,15 +322,17 @@ class NativeS3InputStream extends FSDataInputStream {
             if (closed) {
                 throw new IOException("Stream is closed");
             }
-            if (position >= contentLength) {
+            if (nextReadPos >= contentLength) {
                 return -1;
             }
-            lazyInitialize();
-            long remaining = contentLength - position;
+            lazySeek();
+            ensureStreamOpen();
+            long remaining = contentLength - nextReadPos;
             int toRead = (int) Math.min(len, remaining);
             int bytesRead = bufferedStream.read(b, off, toRead);
             if (bytesRead > 0) {
-                position += bytesRead;
+                nextReadPos += bytesRead;
+                streamPos += bytesRead;
             }
             return bytesRead;
         } finally {
@@ -336,7 +365,7 @@ class NativeS3InputStream extends FSDataInputStream {
                     "Closed S3 input stream - bucket: {}, key: {}, final 
position: {}/{}",
                     bucketName,
                     key,
-                    position,
+                    nextReadPos,
                     contentLength);
             if (exception != null) {
                 throw exception;
@@ -346,16 +375,6 @@ class NativeS3InputStream extends FSDataInputStream {
         }
     }
 
-    /**
-     * Returns an estimate of the number of bytes that can be read without 
blocking.
-     *
-     * <p>This implementation returns the remaining bytes in the object based 
on content length and
-     * current position. Note that actual reads may still block due to network 
I/O, but this
-     * indicates how much data is logically available.
-     *
-     * @return the number of remaining bytes (capped at Integer.MAX_VALUE)
-     * @throws IOException if the stream has been closed
-     */
     @Override
     public int available() throws IOException {
         lock();
@@ -363,7 +382,7 @@ class NativeS3InputStream extends FSDataInputStream {
             if (closed) {
                 throw new IOException("Stream is closed");
             }
-            long remaining = contentLength - position;
+            long remaining = contentLength - nextReadPos;
             return (int) Math.min(remaining, Integer.MAX_VALUE);
         } finally {
             lock.unlock();
@@ -380,12 +399,9 @@ class NativeS3InputStream extends FSDataInputStream {
             if (n <= 0) {
                 return 0;
             }
-            long newPos = Math.min(position + n, contentLength);
-            long skipped = newPos - position;
-            if (newPos != position) {
-                position = newPos;
-                repositionOpenStream();
-            }
+            long newPos = Math.min(nextReadPos + n, contentLength);
+            long skipped = newPos - nextReadPos;
+            nextReadPos = newPos;
             return skipped;
         } finally {
             lock.unlock();
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java
 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java
index 90591fcd94a..32316e18bbc 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java
@@ -33,6 +33,7 @@ import java.io.InputStream;
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -57,23 +58,47 @@ class NativeS3InputStreamTest {
         private final AtomicBoolean aborted = new AtomicBoolean();
         private final AtomicBoolean closed = new AtomicBoolean();
         private volatile boolean abortedBeforeClose;
+        private final int maxAvailable;
+        private final AtomicLong bytesRead = new AtomicLong();
+        private final AtomicLong bytesSkipped = new AtomicLong();
 
         TrackingInputStream(byte[] data, int offset) {
-            this.delegate = new ByteArrayInputStream(data, offset, data.length 
- offset);
+            this(data, offset, Integer.MAX_VALUE);
         }
 
-        TrackingInputStream(byte[] data) {
-            this(data, 0);
+        TrackingInputStream(byte[] data, int offset, int maxAvailable) {
+            this.delegate = new ByteArrayInputStream(data, offset, data.length 
- offset);
+            this.maxAvailable = maxAvailable;
         }
 
         @Override
         public int read() {
-            return delegate.read();
+            int b = delegate.read();
+            if (b != -1) {
+                bytesRead.incrementAndGet();
+            }
+            return b;
         }
 
         @Override
         public int read(byte[] b, int off, int len) {
-            return delegate.read(b, off, len);
+            int n = delegate.read(b, off, len);
+            if (n > 0) {
+                bytesRead.addAndGet(n);
+            }
+            return n;
+        }
+
+        @Override
+        public long skip(long n) {
+            long skipped = delegate.skip(n);
+            bytesSkipped.addAndGet(skipped);
+            return skipped;
+        }
+
+        @Override
+        public int available() {
+            return Math.min(delegate.available(), maxAvailable);
         }
 
         @Override
@@ -101,6 +126,39 @@ class NativeS3InputStreamTest {
         boolean wasAbortedBeforeClose() {
             return abortedBeforeClose;
         }
+
+        long bytesReadFromUnderlying() {
+            return bytesRead.get();
+        }
+
+        long bytesSkippedFromUnderlying() {
+            return bytesSkipped.get();
+        }
+    }
+
+    /**
+     * A {@link TrackingInputStream} whose first {@link #skip} call returns 
{@code 0}, simulating a
+     * stalled/closed underlying HTTP connection during in-buffer skipping.
+     */
+    private static final class FailFirstSkipTrackingInputStream extends 
TrackingInputStream {
+        private final AtomicBoolean firstSkipFailed = new AtomicBoolean();
+
+        FailFirstSkipTrackingInputStream(byte[] data, int offset, int 
maxAvailable) {
+            super(data, offset, maxAvailable);
+        }
+
+        @Override
+        public long skip(long n) {
+            if (firstSkipFailed.compareAndSet(false, true)) {
+                return 0;
+            }
+            return super.skip(n);
+        }
+    }
+
+    @FunctionalInterface
+    private interface StreamFactory {
+        TrackingInputStream create(byte[] data, int offset, int maxAvailable);
     }
 
     /** {@link S3Client} stub. */
@@ -108,9 +166,21 @@ class NativeS3InputStreamTest {
         private final byte[] data;
         private final AtomicInteger getObjectCalls = new AtomicInteger();
         private volatile TrackingInputStream lastStream;
+        private final int maxAvailable;
+        private final StreamFactory factory;
 
         StubS3Client(byte[] data) {
+            this(data, Integer.MAX_VALUE);
+        }
+
+        StubS3Client(byte[] data, int maxAvailable) {
+            this(data, maxAvailable, TrackingInputStream::new);
+        }
+
+        StubS3Client(byte[] data, int maxAvailable, StreamFactory factory) {
             this.data = data;
+            this.maxAvailable = maxAvailable;
+            this.factory = factory;
         }
 
         @Override
@@ -121,7 +191,7 @@ class NativeS3InputStreamTest {
             if (range != null && range.startsWith("bytes=")) {
                 offset = Integer.parseInt(range.substring(6, 
range.indexOf('-')));
             }
-            TrackingInputStream tracking = new TrackingInputStream(data, 
offset);
+            TrackingInputStream tracking = factory.create(data, offset, 
maxAvailable);
             lastStream = tracking;
             AbortableInputStream abortable = 
AbortableInputStream.create(tracking, tracking);
             return new ResponseInputStream<>(
@@ -145,6 +215,8 @@ class NativeS3InputStreamTest {
         }
     }
 
+    // --- close behavior ---
+
     @Test
     void closeAbortsUnderlyingStream() throws Exception {
         StubS3Client client = new StubS3Client(DATA);
@@ -162,116 +234,200 @@ class NativeS3InputStreamTest {
             in.read();
         }
         TrackingInputStream stream = client.lastStream();
-        // abort() must be called to kill the HTTP connection (prevents drain)
         assertThat(stream.wasAborted()).isTrue();
-        // close() must still be called for SDK resource cleanup (connection 
pool return, etc.)
         assertThat(stream.wasClosed()).isTrue();
-        // abort() must happen BEFORE close() - otherwise close() drains 
remaining bytes
         assertThat(stream.wasAbortedBeforeClose()).isTrue();
     }
 
     @Test
-    void seekAbortsAndClosesOldStreamBeforeOpeningNew() throws Exception {
+    void closeWithoutReadNeverOpensStream() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {}
+        assertThat(client.getObjectCalls()).isEqualTo(0);
+    }
+
+    @Test
+    void doubleCloseIsIdempotent() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, 
DATA.length);
+        in.read();
+
+        in.close();
+        assertThat(client.getObjectCalls()).isEqualTo(1);
+        assertThat(client.lastStream().wasAborted()).isTrue();
+
+        in.close();
+        assertThat(client.getObjectCalls()).isEqualTo(1);
+        assertThat(client.lastStream().wasAborted()).isTrue();
+    }
+
+    // --- lazy seek: seek() does no I/O ---
+
+    @Test
+    void seekBeforeFirstReadUpdatesPositionOnly() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            in.seek(50);
+            assertThat(in.getPos()).isEqualTo(50);
+            assertThat(client.getObjectCalls()).isEqualTo(0);
+        }
+    }
+
+    @Test
+    void seekDoesNotTriggerStreamOperations() throws Exception {
         StubS3Client client = new StubS3Client(DATA);
         try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
             in.read();
             TrackingInputStream first = client.lastStream();
+            assertThat(client.getObjectCalls()).isEqualTo(1);
 
-            in.seek(100);
+            in.seek(0);
+            assertThat(first.wasAborted()).isFalse();
+            assertThat(first.wasClosed()).isFalse();
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+            assertThat(in.getPos()).isEqualTo(0);
+        }
+    }
 
-            // old stream must be aborted, closed, and in the correct order
-            assertThat(first.wasAborted()).isTrue();
-            assertThat(first.wasClosed()).isTrue();
-            assertThat(first.wasAbortedBeforeClose()).isTrue();
-            assertThat(client.getObjectCalls()).isEqualTo(2);
-            assertThat(in.getPos()).isEqualTo(100);
+    @Test
+    void multipleSeeksWithoutReadCoalesce() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            in.read();
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+
+            in.seek(200);
+            in.seek(0);
             in.seek(100);
-            assertThat(client.getObjectCalls()).isEqualTo(2);
+            in.seek(50);
+
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+
+            assertThat(in.read()).isEqualTo(50);
+            assertThat(in.getPos()).isEqualTo(51);
         }
     }
 
+    // --- seek + read: forward within buffer ---
+
     @Test
-    void skipAbortsOldStreamAndOpensNew() throws Exception {
+    void seekForwardWithinBufferReusesStream() throws Exception {
         StubS3Client client = new StubS3Client(DATA);
         try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
             in.read();
             TrackingInputStream first = client.lastStream();
-            assertThat(in.skip(100)).isEqualTo(100);
-            assertThat(first.wasAborted()).isTrue();
-            assertThat(first.wasClosed()).isTrue();
-            assertThat(first.wasAbortedBeforeClose()).isTrue();
-            assertThat(client.getObjectCalls()).isEqualTo(2);
-            // skip(0) and skip(negative) are no-ops
-            assertThat(in.skip(0)).isZero();
-            assertThat(in.skip(-5)).isZero();
-            assertThat(client.getObjectCalls()).isEqualTo(2);
+
+            in.seek(100);
+            assertThat(in.read()).isEqualTo(100);
+
+            assertThat(first.wasAborted()).isFalse();
+            assertThat(first.wasClosed()).isFalse();
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+            assertThat(in.getPos()).isEqualTo(101);
+
+            in.seek(101);
+            assertThat(in.read()).isEqualTo(101);
+            assertThat(client.getObjectCalls()).isEqualTo(1);
         }
     }
 
     @Test
-    void closeWithoutReadNeverOpensStream() throws Exception {
+    void seekForwardWithinBufferReturnsCorrectData() throws Exception {
         StubS3Client client = new StubS3Client(DATA);
         try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
-            // lazy init means no getObject call
+            assertThat(in.read()).isEqualTo(0);
+            assertThat(in.read()).isEqualTo(1);
+
+            in.seek(10);
+            assertThat(in.read()).isEqualTo(10);
+
+            in.seek(50);
+            byte[] buf = new byte[5];
+            assertThat(in.read(buf, 0, 5)).isEqualTo(5);
+            for (int i = 0; i < 5; i++) {
+                assertThat(buf[i]).isEqualTo(DATA[50 + i]);
+            }
+
+            in.seek(200);
+            assertThat(in.read()).isEqualTo(200 & 0xFF);
+
+            assertThat(client.getObjectCalls()).isEqualTo(1);
         }
-        assertThat(client.getObjectCalls()).isEqualTo(0);
     }
 
     @Test
-    void doubleCloseIsIdempotent() throws Exception {
+    void sequentialSmallSeeksReuseStream() throws Exception {
         StubS3Client client = new StubS3Client(DATA);
-        NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, 
DATA.length);
-        in.read();
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            for (int i = 0; i < 100; i++) {
+                in.seek(i * 2);
+                assertThat(in.read()).isEqualTo((i * 2) & 0xFF);
+            }
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+        }
+    }
 
-        // Verify state after first close
-        in.close();
-        assertThat(client.getObjectCalls()).isEqualTo(1);
-        assertThat(client.lastStream().wasAborted()).isTrue();
+    // --- seek + read: forward beyond buffer ---
 
-        // Second close should be a no-op
-        in.close();
-        assertThat(client.getObjectCalls()).isEqualTo(1);
-        assertThat(client.lastStream().wasAborted()).isTrue();
+    @Test
+    void seekForwardBeyondBufferReopensStream() throws Exception {
+        StubS3Client client = new StubS3Client(DATA, 0);
+        int smallBuffer = 16;
+        try (NativeS3InputStream in =
+                new NativeS3InputStream(client, BUCKET, KEY, DATA.length, 
smallBuffer)) {
+            in.read();
+            TrackingInputStream first = client.lastStream();
+
+            in.seek(1 + smallBuffer + 1);
+            assertThat(first.wasAborted()).isFalse();
+
+            assertThat(in.read()).isEqualTo(1 + smallBuffer + 1);
+            assertThat(first.wasAborted()).isTrue();
+            assertThat(first.wasClosed()).isTrue();
+            assertThat(first.wasAbortedBeforeClose()).isTrue();
+            assertThat(client.getObjectCalls()).isEqualTo(2);
+        }
     }
 
+    // --- seek + read: backward ---
+
     @Test
-    void seekBeforeFirstReadUpdatesPositionOnly() throws Exception {
+    void seekBackwardReopensStreamOnRead() throws Exception {
         StubS3Client client = new StubS3Client(DATA);
         try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
-            in.seek(50);
-            assertThat(in.getPos()).isEqualTo(50);
-            assertThat(client.getObjectCalls()).isEqualTo(0);
+            byte[] buf = new byte[50];
+            in.read(buf, 0, 50);
+            TrackingInputStream first = client.lastStream();
+
+            in.seek(10);
+            assertThat(first.wasAborted()).isFalse();
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+
+            assertThat(in.read()).isEqualTo(10);
+            assertThat(first.wasAborted()).isTrue();
+            assertThat(first.wasClosed()).isTrue();
+            assertThat(first.wasAbortedBeforeClose()).isTrue();
+            assertThat(client.getObjectCalls()).isEqualTo(2);
+            assertThat(in.getPos()).isEqualTo(11);
         }
     }
 
+    // --- seek to EOF ---
+
     @Test
-    void readAndSeekReturnCorrectData() throws Exception {
+    void seekToContentLengthReleasesStreamOnRead() throws Exception {
         StubS3Client client = new StubS3Client(DATA);
         try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
-            // single-byte read
-            assertThat(in.read()).isEqualTo(0);
-            assertThat(in.getPos()).isEqualTo(1);
-            // bulk read returns correct bytes and advances position
-            byte[] buf = new byte[10];
-            assertThat(in.read(buf, 0, 10)).isEqualTo(10);
-            assertThat(in.getPos()).isEqualTo(11);
-            for (int i = 0; i < 10; i++) {
-                assertThat(buf[i]).isEqualTo(DATA[i + 1]);
-            }
-            // available() reflects remaining bytes
-            assertThat(in.available()).isEqualTo(DATA.length - 11);
-            // seek then read returns data at the seeked position
-            in.seek(200);
-            assertThat(in.read()).isEqualTo(200);
-            assertThat(in.getPos()).isEqualTo(201);
-            // partial read at EOF returns only remaining bytes
-            in.seek(250);
-            byte[] tail = new byte[20];
-            assertThat(in.read(tail, 0, 20)).isEqualTo(6);
-            assertThat(in.getPos()).isEqualTo(256);
-            // read past EOF
+            in.read();
+            TrackingInputStream first = client.lastStream();
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+
+            in.seek(DATA.length);
+            assertThat(first.wasAborted()).isFalse();
+
             assertThat(in.read()).isEqualTo(-1);
-            assertThat(in.read(new byte[1], 0, 1)).isEqualTo(-1);
+            assertThat(in.read(new byte[4], 0, 4)).isEqualTo(-1);
+            assertThat(client.getObjectCalls()).isEqualTo(1);
         }
     }
 
@@ -297,34 +453,53 @@ class NativeS3InputStreamTest {
             assertThat(in.read(new byte[8], 0, 8)).isEqualTo(-1);
             assertThat(in.getPos()).isEqualTo(DATA.length);
             assertThat(in.available()).isZero();
-            // EOF short-circuits before lazyInitialize, so no range request 
is issued.
             assertThat(client.getObjectCalls()).isZero();
         }
     }
 
+    // --- skip (also lazy) ---
+
     @Test
-    void seekToContentLengthWithOpenStreamReleasesStream() throws Exception {
+    void skipForwardWithinBufferReusesStream() throws Exception {
         StubS3Client client = new StubS3Client(DATA);
         try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
             in.read();
             TrackingInputStream first = client.lastStream();
+
+            assertThat(in.skip(50)).isEqualTo(50);
+            assertThat(in.getPos()).isEqualTo(51);
+
+            assertThat(first.wasAborted()).isFalse();
             assertThat(client.getObjectCalls()).isEqualTo(1);
 
-            in.seek(DATA.length);
+            assertThat(in.read()).isEqualTo(51);
+            assertThat(first.wasAborted()).isFalse();
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+        }
+    }
 
+    @Test
+    void skipForwardBeyondBufferReopensStreamOnRead() throws Exception {
+        StubS3Client client = new StubS3Client(DATA, 0);
+        int smallBuffer = 16;
+        try (NativeS3InputStream in =
+                new NativeS3InputStream(client, BUCKET, KEY, DATA.length, 
smallBuffer)) {
+            in.read();
+            TrackingInputStream first = client.lastStream();
+
+            assertThat(in.skip(smallBuffer + 1)).isEqualTo(smallBuffer + 1);
+            assertThat(first.wasAborted()).isFalse();
+
+            assertThat(in.read()).isEqualTo(1 + smallBuffer + 1);
             assertThat(first.wasAborted()).isTrue();
             assertThat(first.wasClosed()).isTrue();
             assertThat(first.wasAbortedBeforeClose()).isTrue();
-            // bytes=contentLength- is unsatisfiable, so we must not reopen.
-            assertThat(client.getObjectCalls()).isEqualTo(1);
-            assertThat(in.read()).isEqualTo(-1);
-            assertThat(in.read(new byte[4], 0, 4)).isEqualTo(-1);
-            assertThat(client.getObjectCalls()).isEqualTo(1);
+            assertThat(client.getObjectCalls()).isEqualTo(2);
         }
     }
 
     @Test
-    void skipToEofWithOpenStreamReleasesStream() throws Exception {
+    void skipToEofReleasesStreamOnRead() throws Exception {
         StubS3Client client = new StubS3Client(DATA);
         try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
             in.read();
@@ -333,15 +508,145 @@ class NativeS3InputStreamTest {
 
             assertThat(in.skip(DATA.length)).isEqualTo(DATA.length - 1);
             assertThat(in.getPos()).isEqualTo(DATA.length);
+            assertThat(first.wasAborted()).isFalse();
 
-            assertThat(first.wasAborted()).isTrue();
-            assertThat(first.wasClosed()).isTrue();
-            assertThat(first.wasAbortedBeforeClose()).isTrue();
+            assertThat(in.read()).isEqualTo(-1);
+        }
+    }
+
+    @Test
+    void skipZeroAndNegativeAreNoOps() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            in.read();
+            assertThat(in.skip(0)).isZero();
+            assertThat(in.skip(-5)).isZero();
             assertThat(client.getObjectCalls()).isEqualTo(1);
+        }
+    }
+
+    // --- read + seek integration ---
+
+    @Test
+    void readAndSeekReturnCorrectData() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            assertThat(in.read()).isEqualTo(0);
+            assertThat(in.getPos()).isEqualTo(1);
+            byte[] buf = new byte[10];
+            assertThat(in.read(buf, 0, 10)).isEqualTo(10);
+            assertThat(in.getPos()).isEqualTo(11);
+            for (int i = 0; i < 10; i++) {
+                assertThat(buf[i]).isEqualTo(DATA[i + 1]);
+            }
+            assertThat(in.available()).isEqualTo(DATA.length - 11);
+            in.seek(200);
+            assertThat(in.read()).isEqualTo(200);
+            assertThat(in.getPos()).isEqualTo(201);
+            in.seek(250);
+            byte[] tail = new byte[20];
+            assertThat(in.read(tail, 0, 20)).isEqualTo(6);
+            assertThat(in.getPos()).isEqualTo(256);
             assertThat(in.read()).isEqualTo(-1);
+            assertThat(in.read(new byte[1], 0, 1)).isEqualTo(-1);
+        }
+    }
+
+    // --- buffer-efficiency: skip stays in local buffer vs. underlying stream 
---
+
+    @Test
+    void seekWithinBuffer_afterSmallRead_doesNotTouchUnderlyingStream() throws 
Exception {
+        int smallBuffer = 32;
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in =
+                new NativeS3InputStream(client, BUCKET, KEY, DATA.length, 
smallBuffer)) {
+            // single-byte read opens the S3 stream and advances position to 1
+            assertThat(in.read()).isEqualTo(0);
+            TrackingInputStream underlying = client.lastStream();
+            long initialBytesRead = underlying.bytesReadFromUnderlying();
+
+            // forward seek of 9 bytes — fits in the local buffer, no 
underlying access needed
+            in.seek(10);
+            assertThat(in.read()).isEqualTo(10);
+            assertThat(in.getPos()).isEqualTo(11);
+
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+            assertThat(underlying.bytesSkippedFromUnderlying()).isEqualTo(0);
+            
assertThat(underlying.bytesReadFromUnderlying()).isEqualTo(initialBytesRead);
         }
     }
 
+    @Test
+    void seekWithinBuffer_afterLargeRead_touchesUnderlyingStream() throws 
Exception {
+        int smallBuffer = 16;
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in =
+                new NativeS3InputStream(client, BUCKET, KEY, DATA.length, 
smallBuffer)) {
+            // bulk read >= bufferSize bypasses BufferedInputStream's local 
array entirely,
+            // leaving the local buffer empty afterward
+            byte[] buf = new byte[smallBuffer];
+            in.read(buf, 0, smallBuffer);
+            TrackingInputStream underlying = client.lastStream();
+
+            // forward seek of 10 bytes — within readBufferSize but buffer is 
empty, so
+            // BufferedInputStream.skip() delegates directly to the underlying 
stream
+            in.seek((long) smallBuffer + 10);
+            assertThat(in.read()).isEqualTo(DATA[smallBuffer + 10] & 0xFF);
+
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+            assertThat(underlying.bytesSkippedFromUnderlying()).isEqualTo(10);
+        }
+    }
+
+    @Test
+    void 
seekBeyondReadBufferSize_inflatedByUnderlyingAvailable_reopensWithRangeRequest()
+            throws Exception {
+        int smallBuffer = 16;
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in =
+                new NativeS3InputStream(client, BUCKET, KEY, DATA.length, 
smallBuffer)) {
+            in.read();
+            TrackingInputStream first = client.lastStream();
+
+            // seek 50 bytes forward — exceeds readBufferSize=16, must reopen 
with range request
+            in.seek(51);
+            assertThat(in.read()).isEqualTo(51);
+
+            assertThat(client.getObjectCalls()).isEqualTo(2);
+            assertThat(first.bytesSkippedFromUnderlying()).isEqualTo(0);
+        }
+    }
+
+    @Test
+    void skipBytesInBuffer_skipFailureTriggersRangeRequest() throws Exception {
+        int smallBuffer = 32;
+        // FailFirstSkipTrackingInputStream returns 0 on the first skip() 
call, simulating a
+        // stalled/closed HTTP connection. BufferedInputStream propagates 0 
when its internal
+        // buffer is empty, so skipBytesInBuffer() falls back to 
openStreamAtCurrentPosition().
+        StubS3Client client =
+                new StubS3Client(DATA, Integer.MAX_VALUE, 
FailFirstSkipTrackingInputStream::new);
+        try (NativeS3InputStream in =
+                new NativeS3InputStream(client, BUCKET, KEY, DATA.length, 
smallBuffer)) {
+            // Read exactly one buffer-worth: BufferedInputStream bypasses its 
internal array for
+            // len >= bufSize reads, leaving the buffer empty after the call.
+            byte[] buf = new byte[smallBuffer];
+            in.read(buf, 0, smallBuffer);
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+
+            // Seek 10 bytes forward — within readBufferSize, so 
skipBytesInBuffer() is chosen.
+            // With the buffer empty, BufferedInputStream delegates to the 
underlying skip(),
+            // which returns 0 on the first call, triggering 
openStreamAtCurrentPosition().
+            int seekTarget = smallBuffer + 10;
+            in.seek(seekTarget);
+
+            assertThat(in.read()).isEqualTo(DATA[seekTarget] & 0xFF);
+            assertThat(in.getPos()).isEqualTo(seekTarget + 1);
+            assertThat(client.getObjectCalls()).isEqualTo(2);
+        }
+    }
+
+    // --- argument validation ---
+
     @Test
     void rejectsInvalidArguments() throws Exception {
         StubS3Client client = new StubS3Client(DATA);


Reply via email to