This is an automated email from the ASF dual-hosted git repository.

clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f1f0b770ac feat: S3 SegmentRangeReader for partial segment downloads 
(#19560)
2f1f0b770ac is described below

commit 2f1f0b770ac3d289b8c2782bc0fc10865fce9737
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Jun 9 11:06:19 2026 -0700

    feat: S3 SegmentRangeReader for partial segment downloads (#19560)
    
    changes:                                                                    
                                                                                
                                                                             * 
adds new `S3SegmentRangeReader` that wraps `ServerSideEncryptingAmazonS3` + 
bucket + key prefix and issues closed-range `GetObjectRequests` against 
`keyPrefix + filename`. Returned stream is wrapped in a `RetryingInputStream` 
with the `S3Utils.S3RETRY` p [...]
    * New `rangeable` boolean on `S3LoadSpec` stamped by the pusher at write 
time. `S3LoadSpec.openRangeReader()` returns a reader iff the flag is true and 
the key isn't .zip
    * `S3DataSegmentPusher.pushNoZip` stamps rangeable=true when binaryVersion 
is `V10_VERSION`, false otherwise. `pushZip` omits the field
---
 extensions-core/s3-extensions/pom.xml              |  10 +
 .../druid/storage/s3/S3DataSegmentPusher.java      |  29 ++-
 .../org/apache/druid/storage/s3/S3LoadSpec.java    |  43 +++-
 .../druid/storage/s3/S3SegmentRangeReader.java     | 151 +++++++++++++
 .../druid/storage/s3/S3DataSegmentPusherTest.java  |  97 ++++++++-
 .../apache/druid/storage/s3/S3LoadSpecTest.java    |  79 +++++++
 .../druid/storage/s3/S3SegmentRangeReaderTest.java | 240 +++++++++++++++++++++
 7 files changed, 638 insertions(+), 11 deletions(-)

diff --git a/extensions-core/s3-extensions/pom.xml 
b/extensions-core/s3-extensions/pom.xml
index 15c8fd1773f..4bd06a6c655 100644
--- a/extensions-core/s3-extensions/pom.xml
+++ b/extensions-core/s3-extensions/pom.xml
@@ -216,6 +216,16 @@
       <artifactId>junit-jupiter-api</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-junit-jupiter</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.testcontainers</groupId>
       <artifactId>testcontainers-minio</artifactId>
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java
index bd9176842ab..12d2cde8f65 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java
@@ -24,6 +24,7 @@ import com.google.inject.Inject;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.IOE;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.timeline.DataSegment;
@@ -135,9 +136,13 @@ public class S3DataSegmentPusher implements 
DataSegmentPusher
       }
     }
 
+    final int binaryVersion = SegmentUtils.getVersionFromDir(indexFilesDir);
+    // V10 unzipped is rangeable: a single druid.segment with a range-readable 
header. V9 unzipped is a directory of
+    // separate smoosh files the range-read path can't consume.
+    final boolean rangeable = binaryVersion == IndexIO.V10_VERSION;
     return baseSegment.withSize(size)
-                      .withLoadSpec(makeLoadSpec(config.getBucket(), s3Path))
-                      
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
+                      .withLoadSpec(makeLoadSpec(config.getBucket(), s3Path, 
rangeable))
+                      .withBinaryVersion(binaryVersion);
   }
 
   @Override
@@ -165,6 +170,26 @@ public class S3DataSegmentPusher implements 
DataSegmentPusher
     );
   }
 
+  /**
+   * Variant that stamps {@link S3LoadSpec#RANGEABLE} so {@link 
S3LoadSpec#openRangeReader()} can decide range-read
+   * eligibility. Used by the unzipped push path where the binary version is 
known at write time.
+   */
+  private Map<String, Object> makeLoadSpec(String bucket, String key, boolean 
rangeable)
+  {
+    return ImmutableMap.of(
+        "type",
+        "s3_zip",
+        "bucket",
+        bucket,
+        "key",
+        key,
+        "S3Schema",
+        "s3n",
+        S3LoadSpec.RANGEABLE,
+        rangeable
+    );
+  }
+
   private static IOException handlePushServiceException(S3Exception e, long 
indexSize)
   {
     if (S3Utils.ERROR_ENTITY_TOO_LARGE.equals(S3Utils.getS3ErrorCode(e))) {
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3LoadSpec.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3LoadSpec.java
index 5338abfe414..ec8fca91374 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3LoadSpec.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3LoadSpec.java
@@ -21,13 +21,17 @@ package org.apache.druid.storage.s3;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.impl.CloudObjectLocation;
 import org.apache.druid.segment.loading.LoadSpec;
 import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentRangeReader;
+import org.apache.druid.utils.CompressionUtils;
 
+import javax.annotation.Nullable;
 import java.io.File;
 
 /**
@@ -36,22 +40,33 @@ import java.io.File;
 @JsonTypeName(S3StorageDruidModule.SCHEME_S3_ZIP)
 public class S3LoadSpec implements LoadSpec
 {
+  static final String RANGEABLE = "rangeable";
+
   private final String bucket;
   private final String key;
 
+  /**
+   * Stamped at push time when {@link S3DataSegmentPusher} writes a segment in 
a layout that supports byte-range reads
+   * Only {@code Boolean.TRUE} enables {@link #openRangeReader()}; absence or 
{@code false} means full-download.
+   */
+  @Nullable
+  private final Boolean rangeable;
+
   private final S3DataSegmentPuller puller;
 
   @JsonCreator
   public S3LoadSpec(
       @JacksonInject S3DataSegmentPuller puller,
       @JsonProperty(S3DataSegmentPuller.BUCKET) String bucket,
-      @JsonProperty(S3DataSegmentPuller.KEY) String key
+      @JsonProperty(S3DataSegmentPuller.KEY) String key,
+      @JsonProperty(RANGEABLE) @Nullable Boolean rangeable
   )
   {
     Preconditions.checkNotNull(bucket);
     Preconditions.checkNotNull(key);
     this.bucket = bucket;
     this.key = key;
+    this.rangeable = rangeable;
     this.puller = puller;
   }
 
@@ -61,6 +76,20 @@ public class S3LoadSpec implements LoadSpec
     return new LoadSpecResult(puller.getSegmentFiles(new 
CloudObjectLocation(bucket, key), outDir).size());
   }
 
+  /**
+   * Returns a {@link S3SegmentRangeReader} when the segment was stamped 
{@link #rangeable} {@code true} at push time
+   * and isn't a zip; otherwise {@code null}.
+   */
+  @Nullable
+  @Override
+  public SegmentRangeReader openRangeReader()
+  {
+    if (CompressionUtils.isZip(key) || !Boolean.TRUE.equals(rangeable)) {
+      return null;
+    }
+    return new S3SegmentRangeReader(puller.s3Client, bucket, key);
+  }
+
   @JsonProperty(S3DataSegmentPuller.BUCKET)
   public String getBucket()
   {
@@ -72,4 +101,16 @@ public class S3LoadSpec implements LoadSpec
   {
     return key;
   }
+
+  /**
+   * Returns the range-reads-supported flag stamped at push time, or {@code 
null} for legacy segments pushed before
+   * this field existed (which will load via the full-download path).
+   */
+  @JsonProperty(RANGEABLE)
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  @Nullable
+  public Boolean getRangeable()
+  {
+    return rangeable;
+  }
 }
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3SegmentRangeReader.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3SegmentRangeReader.java
new file mode 100644
index 00000000000..9f88380c575
--- /dev/null
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3SegmentRangeReader.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.impl.RetryingInputStream;
+import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.loading.SegmentRangeReader;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * {@link SegmentRangeReader} backed by S3 HTTP {@code Range} requests. The 
segment is expected to be stored as raw
+ * (unzipped) files under a common key prefix, i.e. the layout produced by 
{@code S3DataSegmentPusher.pushNoZip} where
+ * each segment file is uploaded as {@code keyPrefix + file.getName()}. Each 
{@link #readRange} call resolves the
+ * target object key as {@code keyPrefix + filename} and issues a closed 
byte-range GET
+ * ({@code bytes=offset-(offset+length-1)}).
+ * <p>
+ * The returned stream is wrapped in a {@link RetryingInputStream} with the 
{@link S3Utils#S3RETRY} predicate, the
+ * same retry policy {@link S3DataSegmentPuller} uses for full-segment 
downloads. Segment loading from deep storage
+ * needs retry semantics built into the reader so callers don't each reinvent 
it. The retrying wrapper reopens at the
+ * byte offset where it failed, so a transient mid-stream error becomes a 
fresh range request for the remaining bytes
+ * rather than restarting the whole read.
+ */
+public class S3SegmentRangeReader implements SegmentRangeReader
+{
+  private final ServerSideEncryptingAmazonS3 s3Client;
+  private final String bucket;
+  private final String keyPrefix;
+
+  public S3SegmentRangeReader(ServerSideEncryptingAmazonS3 s3Client, String 
bucket, String keyPrefix)
+  {
+    this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client");
+    this.bucket = Preconditions.checkNotNull(bucket, "bucket");
+    this.keyPrefix = Preconditions.checkNotNull(keyPrefix, "keyPrefix");
+  }
+
+  @Override
+  public InputStream readRange(String filename, long offset, long length) 
throws IOException
+  {
+    Preconditions.checkNotNull(filename, "filename");
+    Preconditions.checkArgument(offset >= 0, "offset must be non-negative, got 
[%s]", offset);
+    Preconditions.checkArgument(length >= 0, "length must be non-negative, got 
[%s]", length);
+
+    if (length == 0) {
+      // SegmentFileBuilderV10 allows zero-length internal-file entries, 
short-circuit
+      return new ByteArrayInputStream(new byte[0]);
+    }
+    return new RetryingInputStream<>(
+        new RangeRequest(keyPrefix + filename, offset, length),
+        new RangeOpenFunction(s3Client, bucket),
+        S3Utils.S3RETRY,
+        null
+    );
+  }
+
+  /**
+   * Immutable description of a range read. Held as the {@code object} of 
{@link RetryingInputStream} so retries can
+   * reopen with knowledge of the original offset and length without 
rebuilding the request from scratch.
+   */
+  private static final class RangeRequest
+  {
+    final String objectKey;
+    final long offset;
+    final long length;
+
+    RangeRequest(String objectKey, long offset, long length)
+    {
+      this.objectKey = objectKey;
+      this.offset = offset;
+      this.length = length;
+    }
+  }
+
+  /**
+   * Opens (or reopens, on retry) an S3 range read for a {@link RangeRequest}. 
The {@code start} argument is the
+   * number of bytes already successfully consumed from the logical stream, so 
the absolute S3 byte range is
+   * {@code [request.offset + start, request.offset + request.length - 1]}.
+   */
+  private static final class RangeOpenFunction implements 
ObjectOpenFunction<RangeRequest>
+  {
+    private final ServerSideEncryptingAmazonS3 s3Client;
+    private final String bucket;
+
+    RangeOpenFunction(ServerSideEncryptingAmazonS3 s3Client, String bucket)
+    {
+      this.s3Client = s3Client;
+      this.bucket = bucket;
+    }
+
+    @Override
+    public InputStream open(RangeRequest request) throws IOException
+    {
+      return open(request, 0L);
+    }
+
+    @Override
+    public InputStream open(RangeRequest request, long start) throws 
IOException
+    {
+      final long absoluteStart = request.offset + start;
+      final long absoluteEnd = request.offset + request.length - 1;
+      if (absoluteStart > absoluteEnd) {
+        // Logically nothing left to read, only reachable if a retry fires 
after the consumer drained the entire
+        // range successfully, which shouldn't happen in practice. Returning 
empty keeps us robust either way.
+        return new ByteArrayInputStream(new byte[0]);
+      }
+      final GetObjectRequest.Builder requestBuilder = 
GetObjectRequest.builder()
+          .bucket(bucket)
+          .key(request.objectKey)
+          .range(AwsBytesRange.of(absoluteStart, absoluteEnd).getBytesRange());
+      try {
+        final InputStream s3Object = s3Client.getObject(requestBuilder);
+        if (s3Object == null) {
+          throw new ISE(
+              "Failed to get s3 object for bucket[%s], key[%s], 
range[bytes=%d-%d]",
+              bucket,
+              request.objectKey,
+              absoluteStart,
+              absoluteEnd
+          );
+        }
+        return s3Object;
+      }
+      catch (S3Exception e) {
+        throw new IOException(e);
+      }
+    }
+  }
+}
diff --git 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java
 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java
index 6bde494d900..590a36524cf 100644
--- 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java
+++ 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java
@@ -115,7 +115,80 @@ public class S3DataSegmentPusherTest
     };
     config.setBucket("bucket");
     config.setBaseKey("key");
-    validate(false, 
"key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/", s3Client, 
config);
+    DataSegment segment = validate(
+        false,
+        "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/",
+        s3Client,
+        config,
+        new byte[]{0x0, 0x0, 0x0, 0x1}
+    );
+    // V1 (test fixture) → not V10 → rangeable stamped as false (skips legacy 
HEAD probe).
+    Assert.assertEquals(Boolean.FALSE, segment.getLoadSpec().get("rangeable"));
+  }
+
+  @Test
+  public void testPushNoZipV10StampsRangeableTrue() throws Exception
+  {
+    ServerSideEncryptingAmazonS3 s3Client = 
EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
+
+    Grant grant = Grant.builder()
+        
.grantee(Grantee.builder().id("ownerId").type(Type.CANONICAL_USER).build())
+        .permission(Permission.FULL_CONTROL)
+        .build();
+    
EasyMock.expect(s3Client.getBucketOwnerGrant(EasyMock.eq("bucket"))).andReturn(grant).once();
+
+    s3Client.upload(EasyMock.anyString(), EasyMock.anyString(), 
EasyMock.anyObject(File.class), EasyMock.anyObject(Grant.class));
+    EasyMock.expectLastCall().once();
+
+    EasyMock.replay(s3Client);
+
+    S3DataSegmentPusherConfig config = new S3DataSegmentPusherConfig()
+    {
+      @Override
+      public boolean isZip()
+      {
+        return false;
+      }
+    };
+    config.setBucket("bucket");
+    config.setBaseKey("key");
+
+    // version.bin = [0, 0, 0, 0x0A] → IndexIO.V10_VERSION
+    DataSegment segment = validate(
+        false,
+        "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/",
+        s3Client,
+        config,
+        new byte[]{0x0, 0x0, 0x0, 0x0A}
+    );
+    Assert.assertEquals(10, (int) segment.getBinaryVersion());
+    Assert.assertEquals(Boolean.TRUE, segment.getLoadSpec().get("rangeable"));
+  }
+
+  @Test
+  public void testPushZipDoesNotStampRangeable() throws Exception
+  {
+    // Zip path uses the no-flag makeLoadSpec overload; openRangeReader 
returns null on the zip-key short circuit
+    // regardless, but we keep the loadSpec JSON compact for zipped segments 
by omitting the field entirely.
+    ServerSideEncryptingAmazonS3 s3Client = 
EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
+
+    Grant grant = Grant.builder()
+        
.grantee(Grantee.builder().id("ownerId").type(Type.CANONICAL_USER).build())
+        .permission(Permission.FULL_CONTROL)
+        .build();
+    
EasyMock.expect(s3Client.getBucketOwnerGrant(EasyMock.eq("bucket"))).andReturn(grant).once();
+
+    s3Client.upload(EasyMock.anyString(), EasyMock.anyString(), 
EasyMock.anyObject(File.class), EasyMock.anyObject(Grant.class));
+    EasyMock.expectLastCall().once();
+
+    EasyMock.replay(s3Client);
+
+    DataSegment segment = validate(
+        false,
+        
"key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/index\\.zip",
+        s3Client
+    );
+    Assert.assertFalse(segment.getLoadSpec().containsKey("rangeable"));
   }
 
   private void testPushInternal(boolean useUniquePath, String matcher) throws 
Exception
@@ -162,24 +235,32 @@ public class S3DataSegmentPusherTest
     validate(useUniquePath, matcher, s3Client);
   }
 
-  private void validate(boolean useUniquePath, String matcher, 
ServerSideEncryptingAmazonS3 s3Client) throws IOException
+  private DataSegment validate(boolean useUniquePath, String matcher, 
ServerSideEncryptingAmazonS3 s3Client) throws IOException
   {
     S3DataSegmentPusherConfig config = new S3DataSegmentPusherConfig();
     config.setBucket("bucket");
     config.setBaseKey("key");
-    validate(useUniquePath, matcher, s3Client, config);
+    // Default version.bin is V1 for historical reasons.
+    DataSegment segment = validate(useUniquePath, matcher, s3Client, config, 
new byte[]{0x0, 0x0, 0x0, 0x1});
+    Assert.assertEquals(1, (int) segment.getBinaryVersion());
+    return segment;
   }
 
-  private void validate(boolean useUniquePath, String matcher, 
ServerSideEncryptingAmazonS3 s3Client, S3DataSegmentPusherConfig config) throws 
IOException
+  private DataSegment validate(
+      boolean useUniquePath,
+      String matcher,
+      ServerSideEncryptingAmazonS3 s3Client,
+      S3DataSegmentPusherConfig config,
+      byte[] versionBytes
+  ) throws IOException
   {
     S3DataSegmentPusher pusher = new S3DataSegmentPusher(s3Client, config);
 
     // Create a mock segment on disk
     File tmp = tempFolder.newFile("version.bin");
 
-    final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
-    Files.write(data, tmp);
-    final long size = data.length;
+    Files.write(versionBytes, tmp);
+    final long size = versionBytes.length;
 
     DataSegment segmentToPush = new DataSegment(
             "foo",
@@ -196,7 +277,6 @@ public class S3DataSegmentPusherTest
     DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, 
useUniquePath);
 
     Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
-    Assert.assertEquals(1, (int) segment.getBinaryVersion());
     Assert.assertEquals("bucket", segment.getLoadSpec().get("bucket"));
     Assert.assertTrue(
             segment.getLoadSpec().get("key").toString(),
@@ -205,5 +285,6 @@ public class S3DataSegmentPusherTest
     Assert.assertEquals("s3_zip", segment.getLoadSpec().get("type"));
 
     EasyMock.verify(s3Client);
+    return segment;
   }
 }
diff --git 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3LoadSpecTest.java
 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3LoadSpecTest.java
new file mode 100644
index 00000000000..9280cdf068c
--- /dev/null
+++ 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3LoadSpecTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3;
+
+import org.apache.druid.segment.loading.SegmentRangeReader;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.verifyNoInteractions;
+
+@ExtendWith(MockitoExtension.class)
+public class S3LoadSpecTest
+{
+  private static final String BUCKET = "test-bucket";
+  private static final String RAW_KEY = "path/to/segment/";
+  private static final String ZIP_KEY = "path/to/index.zip";
+
+  @Mock
+  private ServerSideEncryptingAmazonS3 s3Client;
+
+  @Test
+  public void testOpenRangeReaderReturnsReaderWhenRangeableTrue()
+  {
+    final S3LoadSpec spec = new S3LoadSpec(new S3DataSegmentPuller(s3Client), 
BUCKET, RAW_KEY, true);
+    final SegmentRangeReader reader = spec.openRangeReader();
+    assertNotNull(reader);
+    assertInstanceOf(S3SegmentRangeReader.class, reader);
+    verifyNoInteractions(s3Client);
+  }
+
+  @Test
+  public void testOpenRangeReaderReturnsNullForZipKeyEvenWhenRangeableTrue()
+  {
+    // Defensive: a zip key can't be range-read; the zip check wins over the 
flag even if hand-crafted input claims
+    // the layout is rangeable.
+    final S3LoadSpec spec = new S3LoadSpec(new S3DataSegmentPuller(s3Client), 
BUCKET, ZIP_KEY, true);
+    assertNull(spec.openRangeReader());
+    verifyNoInteractions(s3Client);
+  }
+
+  @Test
+  public void testOpenRangeReaderReturnsNullWhenRangeableFalse()
+  {
+    final S3LoadSpec spec = new S3LoadSpec(new S3DataSegmentPuller(s3Client), 
BUCKET, RAW_KEY, false);
+    assertNull(spec.openRangeReader());
+    verifyNoInteractions(s3Client);
+  }
+
+  @Test
+  public void testOpenRangeReaderReturnsNullForLegacySegmentWithoutFlag()
+  {
+    // Legacy segment (pushed before this field existed) → null flag → 
full-download path.
+    final S3LoadSpec spec = new S3LoadSpec(new S3DataSegmentPuller(s3Client), 
BUCKET, RAW_KEY, null);
+    assertNull(spec.openRangeReader());
+    verifyNoInteractions(s3Client);
+  }
+}
diff --git 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3SegmentRangeReaderTest.java
 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3SegmentRangeReaderTest.java
new file mode 100644
index 00000000000..b219fb79392
--- /dev/null
+++ 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3SegmentRangeReaderTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3;
+
+import com.google.common.io.ByteStreams;
+import org.apache.druid.segment.loading.SegmentRangeReader;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class S3SegmentRangeReaderTest
+{
+  private static final String BUCKET = "test-bucket";
+  private static final String KEY_PREFIX = 
"ds/2024-01-01T00:00:00.000Z_2024-01-02T00:00:00.000Z/0/0/";
+
+  @Mock
+  private ServerSideEncryptingAmazonS3 s3Client;
+
+  private S3SegmentRangeReader reader;
+
+  @BeforeEach
+  public void setUp()
+  {
+    reader = new S3SegmentRangeReader(s3Client, BUCKET, KEY_PREFIX);
+  }
+
+  @Test
+  public void testReadRangeIssuesClosedRangeGetWithKeyPrefixPlusFilename() 
throws IOException
+  {
+    
when(s3Client.getObject(any(GetObjectRequest.Builder.class))).thenReturn(stubResponse(new
 byte[0]));
+
+    try (InputStream ignored = reader.readRange("000000.smoosh", 100, 250)) {
+      // open is performed in the RetryingInputStream constructor; the 
GetObject call should have already happened.
+    }
+
+    final GetObjectRequest request = captureRequest();
+    assertEquals(BUCKET, request.bucket());
+    assertEquals(KEY_PREFIX + "000000.smoosh", request.key());
+    // closed range: bytes=offset-(offset+length-1)
+    assertEquals("bytes=100-349", request.range());
+  }
+
+  @Test
+  public void testReadRangeBuildsDifferentKeysForDifferentFilenames() throws 
IOException
+  {
+    
when(s3Client.getObject(any(GetObjectRequest.Builder.class))).thenReturn(stubResponse(new
 byte[0]));
+
+    reader.readRange("file-a", 0, 16).close();
+    reader.readRange("file-b", 0, 16).close();
+
+    final ArgumentCaptor<GetObjectRequest.Builder> builderCaptor =
+        ArgumentCaptor.forClass(GetObjectRequest.Builder.class);
+    verify(s3Client, times(2)).getObject(builderCaptor.capture());
+    assertEquals(KEY_PREFIX + "file-a", 
builderCaptor.getAllValues().get(0).build().key());
+    assertEquals(KEY_PREFIX + "file-b", 
builderCaptor.getAllValues().get(1).build().key());
+  }
+
+  @Test
+  public void testReadRangeWithSingleByteUsesInclusiveRange() throws 
IOException
+  {
+    
when(s3Client.getObject(any(GetObjectRequest.Builder.class))).thenReturn(stubResponse(new
 byte[0]));
+
+    reader.readRange("f", 42, 1).close();
+
+    // bytes=offset-offset (length=1 → end = offset+0)
+    assertEquals("bytes=42-42", captureRequest().range());
+  }
+
+  @Test
+  public void testReadRangeWrapsNonRetryableS3ExceptionAsIOException()
+  {
+    // 403 isn't retryable per AWSClientUtil.isClientExceptionRecoverable → 
RetryingInputStream's open fails once,
+    // S3RETRY says no, the IOException(S3Exception) is propagated as-is by 
Throwables.propagateIfInstanceOf.
+    when(s3Client.getObject(any(GetObjectRequest.Builder.class)))
+        .thenThrow((S3Exception) 
S3Exception.builder().message("denied").statusCode(403).build());
+
+    final IOException thrown = assertThrows(IOException.class, () -> 
reader.readRange("f", 0, 1));
+    assertSame(S3Exception.class, thrown.getCause().getClass());
+  }
+
+  @Test
+  public void testReadRangeRetriesMidStreamFromBytesAlreadyConsumed() throws 
IOException
+  {
+    // First range read delivers the first 4 bytes then errors mid-stream with 
a retryable IOException.
+    // RetryingInputStream should reopen at the next byte (request.offset + 
4), exercising the offset math in
+    // RangeOpenFunction.open(request, start). NOTE: this test sleeps ~1s 
because RetryingInputStream's first retry
+    // uses RetryUtils.BASE_SLEEP_MILLIS exponential backoff (no 
@VisibleForTesting hook is accessible from here).
+    final byte[] firstChunk = {0x01, 0x02, 0x03, 0x04};
+    final byte[] secondChunk = {0x05, 0x06, 0x07, 0x08, 0x09, 0x0A};
+    final byte[] all = new byte[firstChunk.length + secondChunk.length];
+    System.arraycopy(firstChunk, 0, all, 0, firstChunk.length);
+    System.arraycopy(secondChunk, 0, all, firstChunk.length, 
secondChunk.length);
+
+    when(s3Client.getObject(any(GetObjectRequest.Builder.class)))
+        .thenReturn(stubResponseFailingAfter(firstChunk))
+        .thenReturn(stubResponse(secondChunk));
+
+    final byte[] read;
+    try (InputStream stream = reader.readRange("f", 100, 10)) {
+      read = ByteStreams.toByteArray(stream);
+    }
+    assertArrayEquals(all, read);
+
+    final ArgumentCaptor<GetObjectRequest.Builder> builderCaptor =
+        ArgumentCaptor.forClass(GetObjectRequest.Builder.class);
+    verify(s3Client, times(2)).getObject(builderCaptor.capture());
+    final List<GetObjectRequest> requests = Arrays.asList(
+        builderCaptor.getAllValues().get(0).build(),
+        builderCaptor.getAllValues().get(1).build()
+    );
+    // First: full requested range
+    assertEquals("bytes=100-109", requests.get(0).range());
+    // Retry: resume at (offset + bytes-already-consumed) through original end
+    assertEquals("bytes=104-109", requests.get(1).range());
+  }
+
+  @Test
+  public void testReadRangeRejectsNegativeOffset()
+  {
+    assertThrows(IllegalArgumentException.class, () -> reader.readRange("f", 
-1, 16));
+  }
+
+  @Test
+  public void 
testReadRangeReturnsEmptyStreamForZeroLengthWithoutContactingS3() throws 
IOException
+  {
+    // SegmentFileBuilderV10 allows zero-length internal-file entries; 
readRange must accept length=0 and return an
+    // empty stream without issuing an S3 GET (a closed range of bytes=N-(N-1) 
would 416).
+    try (InputStream stream = reader.readRange("f", 100, 0)) {
+      assertEquals(-1, stream.read());
+    }
+    verifyNoInteractions(s3Client);
+  }
+
+  @Test
+  public void testReadRangeRejectsNegativeLength()
+  {
+    assertThrows(IllegalArgumentException.class, () -> reader.readRange("f", 
0, -1));
+  }
+
+  @Test
+  public void testImplementsSegmentRangeReader()
+  {
+    // Compile-time and runtime guard: ensure the implements relationship 
survives refactors so callers can rely on
+    // returning S3SegmentRangeReader from openRangeReader().
+    final SegmentRangeReader downcast = reader;
+    assertSame(reader, downcast);
+  }
+
+  private GetObjectRequest captureRequest()
+  {
+    final ArgumentCaptor<GetObjectRequest.Builder> builderCaptor =
+        ArgumentCaptor.forClass(GetObjectRequest.Builder.class);
+    verify(s3Client).getObject(builderCaptor.capture());
+    return builderCaptor.getValue().build();
+  }
+
+  private static ResponseInputStream<GetObjectResponse> stubResponse(byte[] 
bytes)
+  {
+    return new ResponseInputStream<>(
+        GetObjectResponse.builder().build(),
+        new ByteArrayInputStream(bytes)
+    );
+  }
+
+  /**
+   * Returns a stream that delivers the given {@code head} bytes successfully, 
then raises a bare {@link IOException}
+   * (no cause) on the next read — which {@link S3Utils#S3RETRY} treats as 
retryable, so {@link
+   * org.apache.druid.data.input.impl.RetryingInputStream} will close this 
delegate and call the open function again
+   * with {@code start = head.length}.
+   */
+  private static ResponseInputStream<GetObjectResponse> 
stubResponseFailingAfter(byte[] head)
+  {
+    final InputStream delegate = new InputStream()
+    {
+      private final ByteArrayInputStream src = new ByteArrayInputStream(head);
+
+      @Override
+      public int read() throws IOException
+      {
+        final int b = src.read();
+        if (b == -1) {
+          throw new IOException("simulated mid-stream failure");
+        }
+        return b;
+      }
+
+      @Override
+      public int read(byte[] b, int off, int len) throws IOException
+      {
+        final int n = src.read(b, off, len);
+        if (n == -1) {
+          throw new IOException("simulated mid-stream failure");
+        }
+        return n;
+      }
+    };
+    return new ResponseInputStream<>(GetObjectResponse.builder().build(), 
delegate);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to