This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a423d05c839 [FLINK-39603][s3] Optimize NativeS3InputStream via seeks
and reduce IOPS
a423d05c839 is described below
commit a423d05c839e71bd49a6ae6bf831161e1af2893c
Author: Samrat <[email protected]>
AuthorDate: Mon May 11 19:17:40 2026 +0530
[FLINK-39603][s3] Optimize NativeS3InputStream via seeks and reduce IOPS
---
.../fs/s3native/NativeS3FileSystemFactory.java | 8 +-
.../flink/fs/s3native/NativeS3InputStream.java | 168 ++++----
.../flink/fs/s3native/NativeS3InputStreamTest.java | 469 +++++++++++++++++----
3 files changed, 481 insertions(+), 164 deletions(-)
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index b7c0de3acfc..f141125b625 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -371,15 +371,11 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
maxConcurrentUploads);
boolean useAsyncOperations = config.get(USE_ASYNC_OPERATIONS);
-
- // Validate and clamp read buffer size to sensible range [64KB, 4MB]
- // We clip rather than throw to provide flexibility while preventing
extreme values
int configuredReadBufferSize = config.get(READ_BUFFER_SIZE);
- int readBufferSize =
- Math.max(64 * 1024, Math.min(configuredReadBufferSize, 4 *
1024 * 1024));
+ int readBufferSize = Math.max(256 * 1024, configuredReadBufferSize);
if (readBufferSize != configuredReadBufferSize) {
LOG.warn(
- "{} value {} was outside valid range [64KB, 4MB]. Using {}
instead.",
+ "{} value {} was below 64KB. Using {} instead.",
READ_BUFFER_SIZE.key(),
configuredReadBufferSize,
readBufferSize);
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java
index 32368dd94f6..dd2a80e4a70 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java
@@ -35,22 +35,20 @@ import java.io.IOException;
import java.util.concurrent.locks.ReentrantLock;
/**
- * S3 input stream with configurable read-ahead buffer, range-based requests
for seek operations,
- * automatic stream reopening on errors, and lazy initialization to minimize
memory footprint.
+ * S3 input stream with configurable read-ahead buffer, lazy seek, and
automatic stream reopening.
*
- * <p><b>Thread Safety:</b> Internal state is guarded by a lock to ensure safe
concurrent access and
- * resource cleanup.
+ * <p>{@link #seek(long)} only records the desired position without performing
any I/O. All HTTP
+ * work is deferred to the next {@link #read()} call via {@link #lazySeek()},
so multiple seeks
+ * between reads coalesce. When the seek is forward and within {@code
readBufferSize}, bytes are
+ * skipped in-buffer instead of reopening the HTTP connection.
*/
class NativeS3InputStream extends FSDataInputStream {
private static final Logger LOG =
LoggerFactory.getLogger(NativeS3InputStream.class);
- /** Default read-ahead buffer size: 256KB. */
+ /** Default read-ahead buffer size. */
private static final int DEFAULT_READ_BUFFER_SIZE = 256 * 1024;
- /** Maximum buffer size for very large sequential reads. */
- private static final int MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB
-
private final ReentrantLock lock = new ReentrantLock();
private final S3Client s3Client;
@@ -65,8 +63,19 @@ class NativeS3InputStream extends FSDataInputStream {
@GuardedBy("lock")
private BufferedInputStream bufferedStream;
+ /**
+ * The position the caller expects to read from next. Updated by {@link
#seek(long)}, {@link
+ * #skip(long)}, and after every successful {@link #read()}.
+ */
+ @GuardedBy("lock")
+ private long nextReadPos;
+
+ /**
+ * The actual byte offset of the underlying stream cursor, reconciled
lazily via {@link
+ * #lazySeek()}.
+ */
@GuardedBy("lock")
- private long position;
+ private long streamPos;
@GuardedBy("lock")
private volatile boolean closed;
@@ -86,8 +95,9 @@ class NativeS3InputStream extends FSDataInputStream {
this.bucketName = bucketName;
this.key = key;
this.contentLength = contentLength;
- this.readBufferSize = Math.min(readBufferSize, MAX_READ_BUFFER_SIZE);
- this.position = 0;
+ this.readBufferSize = readBufferSize;
+ this.nextReadPos = 0;
+ this.streamPos = 0;
this.closed = false;
LOG.debug(
@@ -98,39 +108,63 @@ class NativeS3InputStream extends FSDataInputStream {
this.readBufferSize / 1024);
}
+ /** Reconciles {@link #nextReadPos} and {@link #streamPos} before reading
bytes. */
@GuardedBy("lock")
- private void lazyInitialize() throws IOException {
- assert lock.isHeldByCurrentThread() : "lazyInitialize() requires lock
to be held";
- if (currentStream == null && !closed) {
- openStreamAtCurrentPosition();
- }
- }
+ private void lazySeek() throws IOException {
+ assert lock.isHeldByCurrentThread() : "lazySeek() requires lock to be
held";
+ long targetPos = nextReadPos;
- /** At EOF, release instead of reopening: {@code bytes=contentLength-}
returns S3 416. */
- @GuardedBy("lock")
- private void repositionOpenStream() throws IOException {
- assert lock.isHeldByCurrentThread() : "repositionOpenStream() requires
lock to be held";
if (currentStream == null) {
+ streamPos = targetPos;
return;
}
- if (position >= contentLength) {
+
+ if (targetPos == streamPos) {
+ return;
+ }
+
+ long diff = targetPos - streamPos;
+ streamPos = targetPos;
+
+ if (targetPos >= contentLength) {
releaseStreams();
- } else {
+ return;
+ }
+
+ // BufferedInputStream does not expose how many bytes are in its local
array, so
+ // readBufferSize is used as the skip threshold: at most
readBufferSize bytes may be
+ // consumed from the live HTTP connection before a range request is
preferred instead.
+ if (diff > 0 && diff <= (long) readBufferSize) {
+ skipBytesInBuffer(diff);
+ return;
+ }
+
+ openStreamAtCurrentPosition();
+ }
+
+ @GuardedBy("lock")
+ private void ensureStreamOpen() throws IOException {
+ assert lock.isHeldByCurrentThread() : "ensureStreamOpen() requires
lock to be held";
+ if (currentStream == null && !closed) {
openStreamAtCurrentPosition();
}
}
- /**
- * Opens (or reopens) the S3 stream at the current position.
- *
- * <p>This method:
- *
- * <ul>
- * <li>Closes any existing stream
- * <li>Opens a new stream starting at {@link #position}
- * <li>Uses HTTP range requests for non-zero positions
- * </ul>
- */
+ @GuardedBy("lock")
+ private void skipBytesInBuffer(long n) throws IOException {
+ assert lock.isHeldByCurrentThread() : "skipBytesInBuffer() requires
lock to be held";
+ long remaining = n;
+ while (remaining > 0) {
+ long skipped = bufferedStream.skip(remaining);
+ if (skipped <= 0) {
+ openStreamAtCurrentPosition();
+ return;
+ }
+ remaining -= skipped;
+ }
+ }
+
+ /** Opens (or reopens) the S3 stream at {@link #streamPos}. */
private void openStreamAtCurrentPosition() throws IOException {
lock.lock();
try {
@@ -140,11 +174,11 @@ class NativeS3InputStream extends FSDataInputStream {
GetObjectRequest.Builder requestBuilder =
GetObjectRequest.builder().bucket(bucketName).key(key);
- if (position > 0) {
- requestBuilder.range(String.format("bytes=%d-", position));
+ if (streamPos > 0) {
+ requestBuilder.range(String.format("bytes=%d-",
streamPos));
LOG.debug(
"Opening S3 stream with range: bytes={}-{}",
- position,
+ streamPos,
contentLength - 1);
} else {
LOG.debug("Opening S3 stream for full object: {} bytes",
contentLength);
@@ -160,12 +194,7 @@ class NativeS3InputStream extends FSDataInputStream {
}
}
- /**
- * Aborts the in-flight HTTP connection so that subsequent {@code close()}
calls on the stream
- * do not drain remaining bytes over the network.
- *
- * @see ResponseInputStream#abort()
- */
+ /** Aborts the in-flight HTTP connection to avoid draining remaining bytes
on close. */
@GuardedBy("lock")
private void abortCurrentStream() {
assert lock.isHeldByCurrentThread() : "abortCurrentStream() requires
lock to be held";
@@ -179,11 +208,10 @@ class NativeS3InputStream extends FSDataInputStream {
}
/**
- * Aborts and closes both streams, nulling the references. The abort is
called first to prevent
- * {@link ResponseInputStream#close()} from draining remaining bytes over
the network.
+ * Aborts and closes both streams, nulling the references.
*
* @return the first {@link IOException} encountered (with subsequent ones
added as suppressed),
- * or {@code null} if cleanup succeeded without errors
+ * or {@code null} if cleanup succeeded
*/
@GuardedBy("lock")
private IOException releaseStreams() {
@@ -236,10 +264,7 @@ class NativeS3InputStream extends FSDataInputStream {
+ contentLength);
}
- if (desired != position) {
- position = desired;
- repositionOpenStream();
- }
+ nextReadPos = desired;
} finally {
lock.unlock();
}
@@ -249,7 +274,7 @@ class NativeS3InputStream extends FSDataInputStream {
public long getPos() throws IOException {
lock();
try {
- return position;
+ return nextReadPos;
} finally {
lock.unlock();
}
@@ -262,13 +287,15 @@ class NativeS3InputStream extends FSDataInputStream {
if (closed) {
throw new IOException("Stream is closed");
}
- if (position >= contentLength) {
+ if (nextReadPos >= contentLength) {
return -1;
}
- lazyInitialize();
+ lazySeek();
+ ensureStreamOpen();
int data = bufferedStream.read();
if (data != -1) {
- position++;
+ nextReadPos++;
+ streamPos++;
}
return data;
} finally {
@@ -295,15 +322,17 @@ class NativeS3InputStream extends FSDataInputStream {
if (closed) {
throw new IOException("Stream is closed");
}
- if (position >= contentLength) {
+ if (nextReadPos >= contentLength) {
return -1;
}
- lazyInitialize();
- long remaining = contentLength - position;
+ lazySeek();
+ ensureStreamOpen();
+ long remaining = contentLength - nextReadPos;
int toRead = (int) Math.min(len, remaining);
int bytesRead = bufferedStream.read(b, off, toRead);
if (bytesRead > 0) {
- position += bytesRead;
+ nextReadPos += bytesRead;
+ streamPos += bytesRead;
}
return bytesRead;
} finally {
@@ -336,7 +365,7 @@ class NativeS3InputStream extends FSDataInputStream {
"Closed S3 input stream - bucket: {}, key: {}, final
position: {}/{}",
bucketName,
key,
- position,
+ nextReadPos,
contentLength);
if (exception != null) {
throw exception;
@@ -346,16 +375,6 @@ class NativeS3InputStream extends FSDataInputStream {
}
}
- /**
- * Returns an estimate of the number of bytes that can be read without
blocking.
- *
- * <p>This implementation returns the remaining bytes in the object based
on content length and
- * current position. Note that actual reads may still block due to network
I/O, but this
- * indicates how much data is logically available.
- *
- * @return the number of remaining bytes (capped at Integer.MAX_VALUE)
- * @throws IOException if the stream has been closed
- */
@Override
public int available() throws IOException {
lock();
@@ -363,7 +382,7 @@ class NativeS3InputStream extends FSDataInputStream {
if (closed) {
throw new IOException("Stream is closed");
}
- long remaining = contentLength - position;
+ long remaining = contentLength - nextReadPos;
return (int) Math.min(remaining, Integer.MAX_VALUE);
} finally {
lock.unlock();
@@ -380,12 +399,9 @@ class NativeS3InputStream extends FSDataInputStream {
if (n <= 0) {
return 0;
}
- long newPos = Math.min(position + n, contentLength);
- long skipped = newPos - position;
- if (newPos != position) {
- position = newPos;
- repositionOpenStream();
- }
+ long newPos = Math.min(nextReadPos + n, contentLength);
+ long skipped = newPos - nextReadPos;
+ nextReadPos = newPos;
return skipped;
} finally {
lock.unlock();
diff --git
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java
index 90591fcd94a..32316e18bbc 100644
---
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java
+++
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java
@@ -33,6 +33,7 @@ import java.io.InputStream;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -57,23 +58,47 @@ class NativeS3InputStreamTest {
private final AtomicBoolean aborted = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private volatile boolean abortedBeforeClose;
+ private final int maxAvailable;
+ private final AtomicLong bytesRead = new AtomicLong();
+ private final AtomicLong bytesSkipped = new AtomicLong();
TrackingInputStream(byte[] data, int offset) {
- this.delegate = new ByteArrayInputStream(data, offset, data.length
- offset);
+ this(data, offset, Integer.MAX_VALUE);
}
- TrackingInputStream(byte[] data) {
- this(data, 0);
+ TrackingInputStream(byte[] data, int offset, int maxAvailable) {
+ this.delegate = new ByteArrayInputStream(data, offset, data.length
- offset);
+ this.maxAvailable = maxAvailable;
}
@Override
public int read() {
- return delegate.read();
+ int b = delegate.read();
+ if (b != -1) {
+ bytesRead.incrementAndGet();
+ }
+ return b;
}
@Override
public int read(byte[] b, int off, int len) {
- return delegate.read(b, off, len);
+ int n = delegate.read(b, off, len);
+ if (n > 0) {
+ bytesRead.addAndGet(n);
+ }
+ return n;
+ }
+
+ @Override
+ public long skip(long n) {
+ long skipped = delegate.skip(n);
+ bytesSkipped.addAndGet(skipped);
+ return skipped;
+ }
+
+ @Override
+ public int available() {
+ return Math.min(delegate.available(), maxAvailable);
}
@Override
@@ -101,6 +126,39 @@ class NativeS3InputStreamTest {
boolean wasAbortedBeforeClose() {
return abortedBeforeClose;
}
+
+ long bytesReadFromUnderlying() {
+ return bytesRead.get();
+ }
+
+ long bytesSkippedFromUnderlying() {
+ return bytesSkipped.get();
+ }
+ }
+
+ /**
+ * A {@link TrackingInputStream} whose first {@link #skip} call returns
{@code 0}, simulating a
+ * stalled/closed underlying HTTP connection during in-buffer skipping.
+ */
+ private static final class FailFirstSkipTrackingInputStream extends
TrackingInputStream {
+ private final AtomicBoolean firstSkipFailed = new AtomicBoolean();
+
+ FailFirstSkipTrackingInputStream(byte[] data, int offset, int
maxAvailable) {
+ super(data, offset, maxAvailable);
+ }
+
+ @Override
+ public long skip(long n) {
+ if (firstSkipFailed.compareAndSet(false, true)) {
+ return 0;
+ }
+ return super.skip(n);
+ }
+ }
+
+ @FunctionalInterface
+ private interface StreamFactory {
+ TrackingInputStream create(byte[] data, int offset, int maxAvailable);
}
/** {@link S3Client} stub. */
@@ -108,9 +166,21 @@ class NativeS3InputStreamTest {
private final byte[] data;
private final AtomicInteger getObjectCalls = new AtomicInteger();
private volatile TrackingInputStream lastStream;
+ private final int maxAvailable;
+ private final StreamFactory factory;
StubS3Client(byte[] data) {
+ this(data, Integer.MAX_VALUE);
+ }
+
+ StubS3Client(byte[] data, int maxAvailable) {
+ this(data, maxAvailable, TrackingInputStream::new);
+ }
+
+ StubS3Client(byte[] data, int maxAvailable, StreamFactory factory) {
this.data = data;
+ this.maxAvailable = maxAvailable;
+ this.factory = factory;
}
@Override
@@ -121,7 +191,7 @@ class NativeS3InputStreamTest {
if (range != null && range.startsWith("bytes=")) {
offset = Integer.parseInt(range.substring(6,
range.indexOf('-')));
}
- TrackingInputStream tracking = new TrackingInputStream(data,
offset);
+ TrackingInputStream tracking = factory.create(data, offset,
maxAvailable);
lastStream = tracking;
AbortableInputStream abortable =
AbortableInputStream.create(tracking, tracking);
return new ResponseInputStream<>(
@@ -145,6 +215,8 @@ class NativeS3InputStreamTest {
}
}
+ // --- close behavior ---
+
@Test
void closeAbortsUnderlyingStream() throws Exception {
StubS3Client client = new StubS3Client(DATA);
@@ -162,116 +234,200 @@ class NativeS3InputStreamTest {
in.read();
}
TrackingInputStream stream = client.lastStream();
- // abort() must be called to kill the HTTP connection (prevents drain)
assertThat(stream.wasAborted()).isTrue();
- // close() must still be called for SDK resource cleanup (connection
pool return, etc.)
assertThat(stream.wasClosed()).isTrue();
- // abort() must happen BEFORE close() - otherwise close() drains
remaining bytes
assertThat(stream.wasAbortedBeforeClose()).isTrue();
}
@Test
- void seekAbortsAndClosesOldStreamBeforeOpeningNew() throws Exception {
+ void closeWithoutReadNeverOpensStream() throws Exception {
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET,
KEY, DATA.length)) {}
+ assertThat(client.getObjectCalls()).isEqualTo(0);
+ }
+
+ @Test
+ void doubleCloseIsIdempotent() throws Exception {
+ StubS3Client client = new StubS3Client(DATA);
+ NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY,
DATA.length);
+ in.read();
+
+ in.close();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ assertThat(client.lastStream().wasAborted()).isTrue();
+
+ in.close();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ assertThat(client.lastStream().wasAborted()).isTrue();
+ }
+
+ // --- lazy seek: seek() does no I/O ---
+
+ @Test
+ void seekBeforeFirstReadUpdatesPositionOnly() throws Exception {
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET,
KEY, DATA.length)) {
+ in.seek(50);
+ assertThat(in.getPos()).isEqualTo(50);
+ assertThat(client.getObjectCalls()).isEqualTo(0);
+ }
+ }
+
+ @Test
+ void seekDoesNotTriggerStreamOperations() throws Exception {
StubS3Client client = new StubS3Client(DATA);
try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET,
KEY, DATA.length)) {
in.read();
TrackingInputStream first = client.lastStream();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
- in.seek(100);
+ in.seek(0);
+ assertThat(first.wasAborted()).isFalse();
+ assertThat(first.wasClosed()).isFalse();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ assertThat(in.getPos()).isEqualTo(0);
+ }
+ }
- // old stream must be aborted, closed, and in the correct order
- assertThat(first.wasAborted()).isTrue();
- assertThat(first.wasClosed()).isTrue();
- assertThat(first.wasAbortedBeforeClose()).isTrue();
- assertThat(client.getObjectCalls()).isEqualTo(2);
- assertThat(in.getPos()).isEqualTo(100);
+ @Test
+ void multipleSeeksWithoutReadCoalesce() throws Exception {
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET,
KEY, DATA.length)) {
+ in.read();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+
+ in.seek(200);
+ in.seek(0);
in.seek(100);
- assertThat(client.getObjectCalls()).isEqualTo(2);
+ in.seek(50);
+
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+
+ assertThat(in.read()).isEqualTo(50);
+ assertThat(in.getPos()).isEqualTo(51);
}
}
+ // --- seek + read: forward within buffer ---
+
@Test
- void skipAbortsOldStreamAndOpensNew() throws Exception {
+ void seekForwardWithinBufferReusesStream() throws Exception {
StubS3Client client = new StubS3Client(DATA);
try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET,
KEY, DATA.length)) {
in.read();
TrackingInputStream first = client.lastStream();
- assertThat(in.skip(100)).isEqualTo(100);
- assertThat(first.wasAborted()).isTrue();
- assertThat(first.wasClosed()).isTrue();
- assertThat(first.wasAbortedBeforeClose()).isTrue();
- assertThat(client.getObjectCalls()).isEqualTo(2);
- // skip(0) and skip(negative) are no-ops
- assertThat(in.skip(0)).isZero();
- assertThat(in.skip(-5)).isZero();
- assertThat(client.getObjectCalls()).isEqualTo(2);
+
+ in.seek(100);
+ assertThat(in.read()).isEqualTo(100);
+
+ assertThat(first.wasAborted()).isFalse();
+ assertThat(first.wasClosed()).isFalse();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ assertThat(in.getPos()).isEqualTo(101);
+
+ in.seek(101);
+ assertThat(in.read()).isEqualTo(101);
+ assertThat(client.getObjectCalls()).isEqualTo(1);
}
}
@Test
- void closeWithoutReadNeverOpensStream() throws Exception {
+ void seekForwardWithinBufferReturnsCorrectData() throws Exception {
StubS3Client client = new StubS3Client(DATA);
try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET,
KEY, DATA.length)) {
- // lazy init means no getObject call
+ assertThat(in.read()).isEqualTo(0);
+ assertThat(in.read()).isEqualTo(1);
+
+ in.seek(10);
+ assertThat(in.read()).isEqualTo(10);
+
+ in.seek(50);
+ byte[] buf = new byte[5];
+ assertThat(in.read(buf, 0, 5)).isEqualTo(5);
+ for (int i = 0; i < 5; i++) {
+ assertThat(buf[i]).isEqualTo(DATA[50 + i]);
+ }
+
+ in.seek(200);
+ assertThat(in.read()).isEqualTo(200 & 0xFF);
+
+ assertThat(client.getObjectCalls()).isEqualTo(1);
}
- assertThat(client.getObjectCalls()).isEqualTo(0);
}
@Test
- void doubleCloseIsIdempotent() throws Exception {
+ void sequentialSmallSeeksReuseStream() throws Exception {
StubS3Client client = new StubS3Client(DATA);
- NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY,
DATA.length);
- in.read();
+ try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET,
KEY, DATA.length)) {
+ for (int i = 0; i < 100; i++) {
+ in.seek(i * 2);
+ assertThat(in.read()).isEqualTo((i * 2) & 0xFF);
+ }
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ }
+ }
- // Verify state after first close
- in.close();
- assertThat(client.getObjectCalls()).isEqualTo(1);
- assertThat(client.lastStream().wasAborted()).isTrue();
+ // --- seek + read: forward beyond buffer ---
- // Second close should be a no-op
- in.close();
- assertThat(client.getObjectCalls()).isEqualTo(1);
- assertThat(client.lastStream().wasAborted()).isTrue();
+ @Test
+ void seekForwardBeyondBufferReopensStream() throws Exception {
+ StubS3Client client = new StubS3Client(DATA, 0);
+ int smallBuffer = 16;
+ try (NativeS3InputStream in =
+ new NativeS3InputStream(client, BUCKET, KEY, DATA.length,
smallBuffer)) {
+ in.read();
+ TrackingInputStream first = client.lastStream();
+
+ in.seek(1 + smallBuffer + 1);
+ assertThat(first.wasAborted()).isFalse();
+
+ assertThat(in.read()).isEqualTo(1 + smallBuffer + 1);
+ assertThat(first.wasAborted()).isTrue();
+ assertThat(first.wasClosed()).isTrue();
+ assertThat(first.wasAbortedBeforeClose()).isTrue();
+ assertThat(client.getObjectCalls()).isEqualTo(2);
+ }
}
+ // --- seek + read: backward ---
+
@Test
- void seekBeforeFirstReadUpdatesPositionOnly() throws Exception {
+ void seekBackwardReopensStreamOnRead() throws Exception {
StubS3Client client = new StubS3Client(DATA);
try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET,
KEY, DATA.length)) {
- in.seek(50);
- assertThat(in.getPos()).isEqualTo(50);
- assertThat(client.getObjectCalls()).isEqualTo(0);
+ byte[] buf = new byte[50];
+ in.read(buf, 0, 50);
+ TrackingInputStream first = client.lastStream();
+
+ in.seek(10);
+ assertThat(first.wasAborted()).isFalse();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+
+ assertThat(in.read()).isEqualTo(10);
+ assertThat(first.wasAborted()).isTrue();
+ assertThat(first.wasClosed()).isTrue();
+ assertThat(first.wasAbortedBeforeClose()).isTrue();
+ assertThat(client.getObjectCalls()).isEqualTo(2);
+ assertThat(in.getPos()).isEqualTo(11);
}
}
+ // --- seek to EOF ---
+
@Test
- void readAndSeekReturnCorrectData() throws Exception {
+ void seekToContentLengthReleasesStreamOnRead() throws Exception {
StubS3Client client = new StubS3Client(DATA);
try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET,
KEY, DATA.length)) {
- // single-byte read
- assertThat(in.read()).isEqualTo(0);
- assertThat(in.getPos()).isEqualTo(1);
- // bulk read returns correct bytes and advances position
- byte[] buf = new byte[10];
- assertThat(in.read(buf, 0, 10)).isEqualTo(10);
- assertThat(in.getPos()).isEqualTo(11);
- for (int i = 0; i < 10; i++) {
- assertThat(buf[i]).isEqualTo(DATA[i + 1]);
- }
- // available() reflects remaining bytes
- assertThat(in.available()).isEqualTo(DATA.length - 11);
- // seek then read returns data at the seeked position
- in.seek(200);
- assertThat(in.read()).isEqualTo(200);
- assertThat(in.getPos()).isEqualTo(201);
- // partial read at EOF returns only remaining bytes
- in.seek(250);
- byte[] tail = new byte[20];
- assertThat(in.read(tail, 0, 20)).isEqualTo(6);
- assertThat(in.getPos()).isEqualTo(256);
- // read past EOF
+ in.read();
+ TrackingInputStream first = client.lastStream();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+
+ in.seek(DATA.length);
+ assertThat(first.wasAborted()).isFalse();
+
assertThat(in.read()).isEqualTo(-1);
- assertThat(in.read(new byte[1], 0, 1)).isEqualTo(-1);
+ assertThat(in.read(new byte[4], 0, 4)).isEqualTo(-1);
+ assertThat(client.getObjectCalls()).isEqualTo(1);
}
}
@@ -297,34 +453,53 @@ class NativeS3InputStreamTest {
assertThat(in.read(new byte[8], 0, 8)).isEqualTo(-1);
assertThat(in.getPos()).isEqualTo(DATA.length);
assertThat(in.available()).isZero();
- // EOF short-circuits before lazyInitialize, so no range request
is issued.
assertThat(client.getObjectCalls()).isZero();
}
}
+ // --- skip (also lazy) ---
+
@Test
- void seekToContentLengthWithOpenStreamReleasesStream() throws Exception {
+ void skipForwardWithinBufferReusesStream() throws Exception {
StubS3Client client = new StubS3Client(DATA);
try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET,
KEY, DATA.length)) {
in.read();
TrackingInputStream first = client.lastStream();
+
+ assertThat(in.skip(50)).isEqualTo(50);
+ assertThat(in.getPos()).isEqualTo(51);
+
+ assertThat(first.wasAborted()).isFalse();
assertThat(client.getObjectCalls()).isEqualTo(1);
- in.seek(DATA.length);
+ assertThat(in.read()).isEqualTo(51);
+ assertThat(first.wasAborted()).isFalse();
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ }
+ }
+ @Test
+ void skipForwardBeyondBufferReopensStreamOnRead() throws Exception {
+ StubS3Client client = new StubS3Client(DATA, 0);
+ int smallBuffer = 16;
+ try (NativeS3InputStream in =
+ new NativeS3InputStream(client, BUCKET, KEY, DATA.length,
smallBuffer)) {
+ in.read();
+ TrackingInputStream first = client.lastStream();
+
+ assertThat(in.skip(smallBuffer + 1)).isEqualTo(smallBuffer + 1);
+ assertThat(first.wasAborted()).isFalse();
+
+ assertThat(in.read()).isEqualTo(1 + smallBuffer + 1);
assertThat(first.wasAborted()).isTrue();
assertThat(first.wasClosed()).isTrue();
assertThat(first.wasAbortedBeforeClose()).isTrue();
- // bytes=contentLength- is unsatisfiable, so we must not reopen.
- assertThat(client.getObjectCalls()).isEqualTo(1);
- assertThat(in.read()).isEqualTo(-1);
- assertThat(in.read(new byte[4], 0, 4)).isEqualTo(-1);
- assertThat(client.getObjectCalls()).isEqualTo(1);
+ assertThat(client.getObjectCalls()).isEqualTo(2);
}
}
@Test
- void skipToEofWithOpenStreamReleasesStream() throws Exception {
+ void skipToEofReleasesStreamOnRead() throws Exception {
StubS3Client client = new StubS3Client(DATA);
try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET,
KEY, DATA.length)) {
in.read();
@@ -333,15 +508,145 @@ class NativeS3InputStreamTest {
assertThat(in.skip(DATA.length)).isEqualTo(DATA.length - 1);
assertThat(in.getPos()).isEqualTo(DATA.length);
+ assertThat(first.wasAborted()).isFalse();
- assertThat(first.wasAborted()).isTrue();
- assertThat(first.wasClosed()).isTrue();
- assertThat(first.wasAbortedBeforeClose()).isTrue();
+ assertThat(in.read()).isEqualTo(-1);
+ }
+ }
+
+ @Test
+ void skipZeroAndNegativeAreNoOps() throws Exception {
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET,
KEY, DATA.length)) {
+ in.read();
+ assertThat(in.skip(0)).isZero();
+ assertThat(in.skip(-5)).isZero();
assertThat(client.getObjectCalls()).isEqualTo(1);
+ }
+ }
+
+ // --- read + seek integration ---
+
+ @Test
+ void readAndSeekReturnCorrectData() throws Exception {
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET,
KEY, DATA.length)) {
+ assertThat(in.read()).isEqualTo(0);
+ assertThat(in.getPos()).isEqualTo(1);
+ byte[] buf = new byte[10];
+ assertThat(in.read(buf, 0, 10)).isEqualTo(10);
+ assertThat(in.getPos()).isEqualTo(11);
+ for (int i = 0; i < 10; i++) {
+ assertThat(buf[i]).isEqualTo(DATA[i + 1]);
+ }
+ assertThat(in.available()).isEqualTo(DATA.length - 11);
+ in.seek(200);
+ assertThat(in.read()).isEqualTo(200);
+ assertThat(in.getPos()).isEqualTo(201);
+ in.seek(250);
+ byte[] tail = new byte[20];
+ assertThat(in.read(tail, 0, 20)).isEqualTo(6);
+ assertThat(in.getPos()).isEqualTo(256);
assertThat(in.read()).isEqualTo(-1);
+ assertThat(in.read(new byte[1], 0, 1)).isEqualTo(-1);
+ }
+ }
+
+ // --- buffer-efficiency: skip stays in local buffer vs. underlying stream
---
+
+ @Test
+ void seekWithinBuffer_afterSmallRead_doesNotTouchUnderlyingStream() throws
Exception {
+ int smallBuffer = 32;
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in =
+ new NativeS3InputStream(client, BUCKET, KEY, DATA.length,
smallBuffer)) {
+ // single-byte read opens the S3 stream and advances position to 1
+ assertThat(in.read()).isEqualTo(0);
+ TrackingInputStream underlying = client.lastStream();
+ long initialBytesRead = underlying.bytesReadFromUnderlying();
+
+ // forward seek of 9 bytes — fits in the local buffer, no
underlying access needed
+ in.seek(10);
+ assertThat(in.read()).isEqualTo(10);
+ assertThat(in.getPos()).isEqualTo(11);
+
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ assertThat(underlying.bytesSkippedFromUnderlying()).isEqualTo(0);
+
assertThat(underlying.bytesReadFromUnderlying()).isEqualTo(initialBytesRead);
}
}
+ @Test
+ void seekWithinBuffer_afterLargeRead_touchesUnderlyingStream() throws
Exception {
+ int smallBuffer = 16;
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in =
+ new NativeS3InputStream(client, BUCKET, KEY, DATA.length,
smallBuffer)) {
+ // bulk read >= bufferSize bypasses BufferedInputStream's local
array entirely,
+ // leaving the local buffer empty afterward
+ byte[] buf = new byte[smallBuffer];
+ in.read(buf, 0, smallBuffer);
+ TrackingInputStream underlying = client.lastStream();
+
+ // forward seek of 10 bytes — within readBufferSize but buffer is
empty, so
+ // BufferedInputStream.skip() delegates directly to the underlying
stream
+ in.seek((long) smallBuffer + 10);
+ assertThat(in.read()).isEqualTo(DATA[smallBuffer + 10] & 0xFF);
+
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+ assertThat(underlying.bytesSkippedFromUnderlying()).isEqualTo(10);
+ }
+ }
+
+ @Test
+ void
seekBeyondReadBufferSize_inflatedByUnderlyingAvailable_reopensWithRangeRequest()
+ throws Exception {
+ int smallBuffer = 16;
+ StubS3Client client = new StubS3Client(DATA);
+ try (NativeS3InputStream in =
+ new NativeS3InputStream(client, BUCKET, KEY, DATA.length,
smallBuffer)) {
+ in.read();
+ TrackingInputStream first = client.lastStream();
+
+ // seek 50 bytes forward — exceeds readBufferSize=16, must reopen
with range request
+ in.seek(51);
+ assertThat(in.read()).isEqualTo(51);
+
+ assertThat(client.getObjectCalls()).isEqualTo(2);
+ assertThat(first.bytesSkippedFromUnderlying()).isEqualTo(0);
+ }
+ }
+
+ @Test
+ void skipBytesInBuffer_skipFailureTriggersRangeRequest() throws Exception {
+ int smallBuffer = 32;
+ // FailFirstSkipTrackingInputStream returns 0 on the first skip()
call, simulating a
+ // stalled/closed HTTP connection. BufferedInputStream propagates 0
when its internal
+ // buffer is empty, so skipBytesInBuffer() falls back to
openStreamAtCurrentPosition().
+ StubS3Client client =
+ new StubS3Client(DATA, Integer.MAX_VALUE,
FailFirstSkipTrackingInputStream::new);
+ try (NativeS3InputStream in =
+ new NativeS3InputStream(client, BUCKET, KEY, DATA.length,
smallBuffer)) {
+ // Read exactly one buffer-worth: BufferedInputStream bypasses its
internal array for
+ // len >= bufSize reads, leaving the buffer empty after the call.
+ byte[] buf = new byte[smallBuffer];
+ in.read(buf, 0, smallBuffer);
+ assertThat(client.getObjectCalls()).isEqualTo(1);
+
+ // Seek 10 bytes forward — within readBufferSize, so
skipBytesInBuffer() is chosen.
+ // With the buffer empty, BufferedInputStream delegates to the
underlying skip(),
+ // which returns 0 on the first call, triggering
openStreamAtCurrentPosition().
+ int seekTarget = smallBuffer + 10;
+ in.seek(seekTarget);
+
+ assertThat(in.read()).isEqualTo(DATA[seekTarget] & 0xFF);
+ assertThat(in.getPos()).isEqualTo(seekTarget + 1);
+ assertThat(client.getObjectCalls()).isEqualTo(2);
+ }
+ }
+
+ // --- argument validation ---
+
@Test
void rejectsInvalidArguments() throws Exception {
StubS3Client client = new StubS3Client(DATA);