This is an automated email from the ASF dual-hosted git repository.

clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a9cde30903 feat: Wire format and data model for partial-projection 
load rules (#19409)
4a9cde30903 is described below

commit 4a9cde30903e5ecf872214710faf3c8ecd9927f8
Author: Clint Wylie <[email protected]>
AuthorDate: Fri May 8 13:53:53 2026 -0700

    feat: Wire format and data model for partial-projection load rules (#19409)
    
    * add `PartialProjectionLoadSpec`, a `LoadSpec` wrapper for 
partial-projection loads with lazy delegate materialization
    * add `PartialLoadSpecModule` and register it in `CoreInjectorBuilder` so 
historicals always have the wrapper
    * add `supportsPartialLoad` capability flag to 
`SegmentLoadingCapabilities`; historicals (will eventually) advertise true, 
coordinator defaults to false for unknown servers
    * add optional `fingerprint` and `loadedBytes` wire fields to 
`SegmentChangeRequestLoad` for historical partial-load announcements (NON_NULL 
include)
    * add `PartialLoadProfile` generic data model (wrappedLoadSpec, 
fingerprint, loadedBytes) with `forRequest` / `forLoaded` / `forFullFallback` 
factories
    * add `SegmentActionHandler.replicateSegmentPartially` default throwing 
`UnsupportedOperationException`
    * wire `PartialLoadRule.run()` to route through `replicateSegmentPartially` 
when the matcher resolves; fall back to `replicateSegment` for 
FULL_LOAD-on-cannot-match
---
 .../segment/loading/PartialProjectionLoadSpec.java | 139 +++++++++++++
 .../loading/PartialProjectionLoadSpecTest.java     | 222 +++++++++++++++++++++
 .../apache/druid/guice/PartialLoadSpecModule.java  |  48 +++++
 .../druid/initialization/CoreInjectorBuilder.java  |   2 +
 .../coordination/SegmentChangeRequestLoad.java     |  51 ++++-
 .../coordinator/loading/PartialLoadProfile.java    | 107 ++++++++++
 .../coordinator/rules/PartialLoadMatcher.java      |   3 +-
 .../server/coordinator/rules/PartialLoadRule.java  |  19 +-
 .../coordinator/rules/SegmentActionHandler.java    |  21 ++
 .../druid/server/http/SegmentListerResource.java   |   6 +-
 .../server/http/SegmentLoadingCapabilities.java    |  27 ++-
 .../coordination/SegmentChangeRequestLoadTest.java |  76 +++++++
 .../loading/PartialLoadProfileTest.java            | 138 +++++++++++++
 .../coordinator/rules/PartialLoadRuleTest.java     |  49 ++++-
 .../http/SegmentLoadingCapabilitiesTest.java       |  28 +++
 15 files changed, 921 insertions(+), 15 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java
 
b/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java
new file mode 100644
index 00000000000..160945a8ce7
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import org.apache.druid.utils.CollectionUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A {@link LoadSpec} wrapper that carries partial-projection metadata from 
the coordinator to a historical alongside
+ * the original backend-specific load spec. The wrapped {@code delegate} is 
held as a raw {@link Map} so that the
+ * concrete backend type (e.g. {@code s3}, {@code local}, {@code hdfs}) is 
materialized only when needed; this avoids
+ * pulling backend-specific dependencies onto every node that touches the wire 
form.
+ * <p>
+ * Both {@link #loadSegment(File)} and {@link #openRangeReader()} delegate 
verbatim to the inner load spec. The
+ * historical-side partial-load path inspects this wrapper at mount time to 
learn which projections to range-read and
+ * the fingerprint identifying the request the coordinator made.
+ */
+@JsonTypeName(PartialProjectionLoadSpec.TYPE)
+public class PartialProjectionLoadSpec implements LoadSpec
+{
+  public static final String TYPE = "partialProjection";
+
+  private final Map<String, Object> delegate;
+  private final List<String> projections;
+  private final String fingerprint;
+  private final Supplier<LoadSpec> materializedDelegateSupplier;
+
+  @JsonCreator
+  public PartialProjectionLoadSpec(
+      @JsonProperty("delegate") Map<String, Object> delegate,
+      @JsonProperty("projections") List<String> projections,
+      @JsonProperty("fingerprint") String fingerprint,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    Preconditions.checkNotNull(jsonMapper, "jsonMapper");
+    this.delegate = Preconditions.checkNotNull(delegate, "delegate");
+    Preconditions.checkArgument(
+        !CollectionUtils.isNullOrEmpty(projections),
+        "projections must not be null or empty"
+    );
+    this.projections = List.copyOf(projections);
+    this.fingerprint = Preconditions.checkNotNull(fingerprint, "fingerprint");
+    this.materializedDelegateSupplier = Suppliers.memoize(() -> 
jsonMapper.convertValue(delegate, LoadSpec.class));
+  }
+
+  @JsonProperty
+  public Map<String, Object> getDelegate()
+  {
+    return delegate;
+  }
+
+  @JsonProperty
+  public List<String> getProjections()
+  {
+    return projections;
+  }
+
+  @JsonProperty
+  public String getFingerprint()
+  {
+    return fingerprint;
+  }
+
+  @Override
+  public LoadSpecResult loadSegment(File destDir) throws 
SegmentLoadingException
+  {
+    return materializedDelegateSupplier.get().loadSegment(destDir);
+  }
+
+  @Override
+  @Nullable
+  public SegmentRangeReader openRangeReader() throws IOException
+  {
+    return materializedDelegateSupplier.get().openRangeReader();
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PartialProjectionLoadSpec that = (PartialProjectionLoadSpec) o;
+    return Objects.equals(delegate, that.delegate)
+        && Objects.equals(projections, that.projections)
+        && Objects.equals(fingerprint, that.fingerprint);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(delegate, projections, fingerprint);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "PartialProjectionLoadSpec{" +
+           "delegate=" + delegate +
+           ", projections=" + projections +
+           ", fingerprint=" + fingerprint +
+           '}';
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/segment/loading/PartialProjectionLoadSpecTest.java
 
b/processing/src/test/java/org/apache/druid/segment/loading/PartialProjectionLoadSpecTest.java
new file mode 100644
index 00000000000..b64a0d0b7c1
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/loading/PartialProjectionLoadSpecTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PartialProjectionLoadSpecTest
+{
+  private static final Map<String, Object> DELEGATE = ImmutableMap.of(
+      "type", "stub",
+      "path", "/var/druid/segments/foo"
+  );
+  private static final String FINGERPRINT = "v1:abcdef0123456789";
+
+  private static ObjectMapper configuredMapper()
+  {
+    final ObjectMapper m = new DefaultObjectMapper();
+    final SimpleModule module = new SimpleModule();
+    module.registerSubtypes(PartialProjectionLoadSpec.class, 
StubLoadSpec.class);
+    m.registerModule(module);
+    m.setInjectableValues(new 
InjectableValues.Std().addValue(ObjectMapper.class, m));
+    return m;
+  }
+
+  private final ObjectMapper jsonMapper = configuredMapper();
+
+  @Test
+  void testJsonRoundTrip() throws Exception
+  {
+    PartialProjectionLoadSpec spec = new PartialProjectionLoadSpec(
+        DELEGATE,
+        List.of("user_daily", "user_hourly"),
+        FINGERPRINT,
+        jsonMapper
+    );
+    String json = jsonMapper.writeValueAsString(spec);
+    LoadSpec reread = jsonMapper.readValue(json, LoadSpec.class);
+    Assertions.assertInstanceOf(PartialProjectionLoadSpec.class, reread);
+    Assertions.assertEquals(spec, reread);
+  }
+
+  @Test
+  void testWireFormHasPartialProjectionType() throws Exception
+  {
+    PartialProjectionLoadSpec spec = new PartialProjectionLoadSpec(
+        DELEGATE,
+        List.of("a"),
+        FINGERPRINT,
+        jsonMapper
+    );
+    Map<String, Object> wireForm = jsonMapper.readValue(
+        jsonMapper.writeValueAsString(spec),
+        new TypeReference<>()
+        {
+        }
+    );
+    Assertions.assertEquals("partialProjection", wireForm.get("type"));
+    Assertions.assertEquals(DELEGATE, wireForm.get("delegate"));
+    Assertions.assertEquals(List.of("a"), wireForm.get("projections"));
+    Assertions.assertEquals(FINGERPRINT, wireForm.get("fingerprint"));
+  }
+
+  @Test
+  void testLoadSegmentDelegatesToInner() throws Exception
+  {
+    PartialProjectionLoadSpec spec = new PartialProjectionLoadSpec(
+        DELEGATE,
+        List.of("a"),
+        FINGERPRINT,
+        jsonMapper
+    );
+    StubLoadSpec.LOAD_CALLS.set(0);
+    LoadSpec.LoadSpecResult result = spec.loadSegment(new File("/tmp/dest"));
+    Assertions.assertEquals(1, StubLoadSpec.LOAD_CALLS.get());
+    Assertions.assertEquals(42L, result.getSize());
+  }
+
+  @Test
+  void testOpenRangeReaderDelegatesToInner() throws Exception
+  {
+    PartialProjectionLoadSpec spec = new PartialProjectionLoadSpec(
+        DELEGATE,
+        List.of("a"),
+        FINGERPRINT,
+        jsonMapper
+    );
+    StubLoadSpec.RANGE_CALLS.set(0);
+    SegmentRangeReader reader = spec.openRangeReader();
+    Assertions.assertNotNull(reader);
+    Assertions.assertEquals(1, StubLoadSpec.RANGE_CALLS.get());
+  }
+
+  @Test
+  void testOpenRangeReaderReturnsNullWhenInnerDoesNotSupport() throws Exception
+  {
+    PartialProjectionLoadSpec spec = new PartialProjectionLoadSpec(
+        ImmutableMap.of("type", "stub", "path", "/", "supportsRange", false),
+        List.of("a"),
+        FINGERPRINT,
+        jsonMapper
+    );
+    Assertions.assertNull(spec.openRangeReader());
+  }
+
+  @Test
+  void testRejectsNullDelegate()
+  {
+    Assertions.assertThrows(
+        NullPointerException.class,
+        () -> new PartialProjectionLoadSpec(null, List.of("a"), "v1:x", 
jsonMapper)
+    );
+  }
+
+  @Test
+  void testRejectsNullOrEmptyProjections()
+  {
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> new PartialProjectionLoadSpec(DELEGATE, null, "v1:x", jsonMapper)
+    );
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> new PartialProjectionLoadSpec(DELEGATE, List.of(), "v1:x", 
jsonMapper)
+    );
+  }
+
+  @Test
+  void testRejectsNullFingerprint()
+  {
+    Assertions.assertThrows(
+        NullPointerException.class,
+        () -> new PartialProjectionLoadSpec(DELEGATE, List.of("a"), null, 
jsonMapper)
+    );
+  }
+
+  /**
+   * Stub LoadSpec used to verify delegation. Uses the same JSON 
"type"=="stub" key as the test {@link #DELEGATE}.
+   */
+  @JsonTypeName("stub")
+  public static class StubLoadSpec implements LoadSpec
+  {
+    static final AtomicInteger LOAD_CALLS = new AtomicInteger(0);
+    static final AtomicInteger RANGE_CALLS = new AtomicInteger(0);
+
+    private final String path;
+    private final boolean supportsRange;
+
+    @JsonCreator
+    public StubLoadSpec(
+        @JsonProperty("path") String path,
+        @JsonProperty("supportsRange") @Nullable Boolean supportsRange
+    )
+    {
+      this.path = path;
+      this.supportsRange = supportsRange == null || supportsRange;
+    }
+
+    @JsonProperty
+    public String getPath()
+    {
+      return path;
+    }
+
+    @JsonProperty
+    public boolean isSupportsRange()
+    {
+      return supportsRange;
+    }
+
+    @Override
+    public LoadSpecResult loadSegment(File destDir)
+    {
+      LOAD_CALLS.incrementAndGet();
+      return new LoadSpecResult(42L);
+    }
+
+    @Override
+    @Nullable
+    public SegmentRangeReader openRangeReader()
+    {
+      if (!supportsRange) {
+        return null;
+      }
+      RANGE_CALLS.incrementAndGet();
+      return (filename, offset, length) -> new ByteArrayInputStream(new 
byte[0]);
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/guice/PartialLoadSpecModule.java 
b/server/src/main/java/org/apache/druid/guice/PartialLoadSpecModule.java
new file mode 100644
index 00000000000..d1bc533269c
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/guice/PartialLoadSpecModule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.segment.loading.LoadSpec;
+import org.apache.druid.segment.loading.PartialProjectionLoadSpec;
+
+import java.util.List;
+
+/**
+ * Registers {@link PartialProjectionLoadSpec} as a {@link LoadSpec} subtype 
for serde of partial load rules. This
+ * module is added to the always-loaded core list so it is available alongside 
any other deep-storage load spec modules.
+ */
+public class PartialLoadSpecModule implements DruidModule
+{
+  @Override
+  public void configure(Binder binder)
+  {
+    // nothing to do
+  }
+
+  @Override
+  public List<? extends Module> getJacksonModules()
+  {
+    return List.of(new 
SimpleModule().registerSubtypes(PartialProjectionLoadSpec.class));
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java 
b/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
index 88cd33444c8..aaaf56923bc 100644
--- 
a/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
+++ 
b/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
@@ -36,6 +36,7 @@ import org.apache.druid.guice.JavaScriptModule;
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.LocalDataStorageDruidModule;
 import org.apache.druid.guice.MetadataConfigModule;
+import org.apache.druid.guice.PartialLoadSpecModule;
 import org.apache.druid.guice.ServerModule;
 import org.apache.druid.guice.ServerViewModule;
 import org.apache.druid.guice.StartupLoggingModule;
@@ -122,6 +123,7 @@ public class CoreInjectorBuilder extends 
DruidInjectorBuilder
         new DerbyMetadataStorageDruidModule(),
         new JacksonConfigManagerModule(),
         new LocalDataStorageDruidModule(),
+        new PartialLoadSpecModule(),
         new TombstoneDataStorageModule(),
         new JavaScriptModule(),
         new AuthenticatorModule(),
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
index 1bb9997980c..dffeac9fb53 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
@@ -20,6 +20,7 @@
 package org.apache.druid.server.coordination;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonUnwrapped;
 import org.apache.druid.java.util.common.StringUtils;
@@ -29,28 +30,46 @@ import javax.annotation.Nullable;
 import java.util.Objects;
 
 /**
+ * Wire form for a coordinator load request and a historical's load 
announcement.
+ * <p>
+ * The {@code fingerprint} and {@code loadedBytes} fields are optional and 
only populated by historicals when
+ * announcing a partial load. Coordinator-issued load requests leave them 
null; partial-load metadata rides inside the
+ * wrapped {@code LoadSpec} on outbound requests instead.
  */
 public class SegmentChangeRequestLoad implements DataSegmentChangeRequest
 {
   private final DataSegment segment;
+  @Nullable private final String fingerprint;
+  @Nullable private final Long loadedBytes;
 
   /**
    * To avoid pruning of the loadSpec on the broker, needed when the broker is 
loading broadcast segments,
-   * we deserialize into an {@link LoadableDataSegment}, which never removes 
the loadSpec.
+   * we deserialize into a {@link LoadableDataSegment}, which never removes 
the loadSpec.
    */
   @JsonCreator
   public SegmentChangeRequestLoad(
-      @JsonUnwrapped LoadableDataSegment segment
+      @JsonUnwrapped LoadableDataSegment segment,
+      @JsonProperty("fingerprint") @Nullable String fingerprint,
+      @JsonProperty("loadedBytes") @Nullable Long loadedBytes
   )
   {
-    this.segment = segment;
+    this((DataSegment) segment, fingerprint, loadedBytes);
+  }
+
+  public SegmentChangeRequestLoad(DataSegment segment)
+  {
+    this(segment, null, null);
   }
 
   public SegmentChangeRequestLoad(
-      DataSegment segment
+      DataSegment segment,
+      @Nullable String fingerprint,
+      @Nullable Long loadedBytes
   )
   {
     this.segment = segment;
+    this.fingerprint = fingerprint;
+    this.loadedBytes = loadedBytes;
   }
 
 
@@ -67,6 +86,22 @@ public class SegmentChangeRequestLoad implements 
DataSegmentChangeRequest
     return segment;
   }
 
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  @Nullable
+  public String getFingerprint()
+  {
+    return fingerprint;
+  }
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  @Nullable
+  public Long getLoadedBytes()
+  {
+    return loadedBytes;
+  }
+
   @Override
   public String asString()
   {
@@ -83,13 +118,15 @@ public class SegmentChangeRequestLoad implements 
DataSegmentChangeRequest
       return false;
     }
     SegmentChangeRequestLoad that = (SegmentChangeRequestLoad) o;
-    return Objects.equals(segment, that.segment);
+    return Objects.equals(segment, that.segment)
+        && Objects.equals(fingerprint, that.fingerprint)
+        && Objects.equals(loadedBytes, that.loadedBytes);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(segment);
+    return Objects.hash(segment, fingerprint, loadedBytes);
   }
 
   @Override
@@ -97,6 +134,8 @@ public class SegmentChangeRequestLoad implements 
DataSegmentChangeRequest
   {
     return "SegmentChangeRequestLoad{" +
            "segment=" + segment +
+           (fingerprint != null ? ", fingerprint=" + fingerprint : "") +
+           (loadedBytes != null ? ", loadedBytes=" + loadedBytes : "") +
            '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/PartialLoadProfile.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/PartialLoadProfile.java
new file mode 100644
index 00000000000..a220c6f3fc3
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/PartialLoadProfile.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import org.apache.druid.error.InvalidInput;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Bundles the partial-load metadata that travels alongside a segment between 
the coordinator and the historicals.
+ * Generic across partial-load schemes: scheme-specific data lives inside the 
{@code wrappedLoadSpec} map, which
+ * matches the shape of the wire-form load-spec wrapper that gets stamped onto 
outbound load requests via
+ * {@code DataSegment.withLoadSpec(...)}.
+ * <p>
+ * Used in three distinct states:
+ * <ul>
+ *   <li>{@link #forRequest(Map, String) forRequest}: outbound from 
coordinator to historical. Carries the wrapped
+ *       load-spec map identifying what to load and the fingerprint 
identifying that request. {@code loadedBytes} is
+ *       null because the on-disk footprint is only known after the historical 
has parsed segment metadata.</li>
+ *   <li>{@link #forLoaded(Map, String, long) forLoaded}: inbound on a 
historical announcement after a successful
+ *       partial load. {@code wrappedLoadSpec} echoes the request and {@code 
loadedBytes} reports the realized
+ *       footprint.</li>
+ *   <li>{@link #forFullFallback(String, long) forFullFallback}: inbound on a 
historical announcement when partial
+ *       loading was requested but the historical fell back to a full download 
(zipped-V10 segment, capability
+ *       mismatch on a partially-upgraded server, etc.). {@code 
wrappedLoadSpec} is null to signal the fallback;
+ *       {@code loadedBytes} equals the full segment size. The matching 
fingerprint still satisfies the rule that
+ *       requested the load, so no reload-thrash occurs.</li>
+ * </ul>
+ * The fingerprint is derived from the resolved scheme-specific data (e.g., 
for projections, the sorted/deduped name
+ * list — see {@code ProjectionPartialLoadMatcher}) so that two rule 
configurations that resolve to the same set on a
+ * segment produce the same fingerprint and don't churn replicas across 
coordinator runs.
+ */
+public record PartialLoadProfile(
+    @Nullable Map<String, Object> wrappedLoadSpec,
+    String fingerprint,
+    @Nullable Long loadedBytes
+)
+{
+  public PartialLoadProfile
+  {
+    Objects.requireNonNull(fingerprint, "fingerprint");
+    if (wrappedLoadSpec != null) {
+      wrappedLoadSpec = Map.copyOf(wrappedLoadSpec);
+    }
+  }
+
+  /**
+   * Build the outbound (coordinator → historical) profile for a partial-load 
request.
+   * {@code wrappedLoadSpec} must be non-empty: an empty wrapped load spec is 
the matcher's "does not apply" signal
+   * and should never produce a request profile.
+   */
+  public static PartialLoadProfile forRequest(Map<String, Object> 
wrappedLoadSpec, String fingerprint)
+  {
+    if (wrappedLoadSpec == null || wrappedLoadSpec.isEmpty()) {
+      throw InvalidInput.exception("wrappedLoadSpec must not be null or empty 
for an outbound load request");
+    }
+    return new PartialLoadProfile(wrappedLoadSpec, fingerprint, null);
+  }
+
+  /**
+   * Build the inbound profile for a successful partial load announcement.
+   */
+  public static PartialLoadProfile forLoaded(Map<String, Object> 
wrappedLoadSpec, String fingerprint, long loadedBytes)
+  {
+    if (wrappedLoadSpec == null || wrappedLoadSpec.isEmpty()) {
+      throw InvalidInput.exception("wrappedLoadSpec must not be null or empty 
for a loaded announcement");
+    }
+    return new PartialLoadProfile(wrappedLoadSpec, fingerprint, loadedBytes);
+  }
+
+  /**
+   * Build the inbound profile for a historical that was asked to partial-load 
but fell back to a full download.
+   * {@code wrappedLoadSpec} is null as a sentinel; {@code loadedBytes} should 
be the full segment size so that
+   * inventory accounting reflects actual on-disk footprint.
+   */
+  public static PartialLoadProfile forFullFallback(String fingerprint, long 
fullBytes)
+  {
+    return new PartialLoadProfile(null, fingerprint, fullBytes);
+  }
+
+  /**
+   * Whether this profile represents a historical's full-fallback (i.e., it 
was asked to partial-load but couldn't).
+   */
+  public boolean isFullFallback()
+  {
+    return wrappedLoadSpec == null;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java
index fd7cfc3cc5b..b1c9c78ace1 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java
@@ -52,7 +52,8 @@ public interface PartialLoadMatcher
   /**
    * Output of {@link #match(DataSegment, Map)} when the matcher applies. 
Carries the wrapped load-spec map (ready to
    * be stamped onto an outbound {@link SegmentChangeRequestLoad}) and the 
fingerprint used by the coordinator to
-   * reconcile loaded replicas against the rule that requested them.
+   * reconcile loaded replicas against the rule that requested them. 
Scheme-specific data lives inside
+   * {@code wrappedLoadSpec}; callers that need a typed view extract from the 
map themselves.
    */
   record MatchResult(Map<String, Object> wrappedLoadSpec, String fingerprint)
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadRule.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadRule.java
index 052389d72f4..b16942df6d5 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadRule.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadRule.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.rules;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.common.config.Configs;
 import org.apache.druid.error.InvalidInput;
+import org.apache.druid.server.coordinator.loading.PartialLoadProfile;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.DateTime;
 
@@ -89,9 +90,21 @@ public abstract class PartialLoadRule extends LoadRule
   @Override
   public void run(DataSegment segment, SegmentActionHandler handler)
   {
-    // Partial plumbing is added in future work. For now, a partial rule that 
applies to a segment full-loads it,
-    // identical behavior to the corresponding non-partial rule
-    handler.replicateSegment(segment, getTieredReplicants());
+    final PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    if (result != null) {
+      // Matcher resolved: route through the partial-load handler. The 
wrappedLoadSpec map carries scheme-specific
+      // data that the historical-side wrapper deserializes.
+      handler.replicateSegmentPartially(
+          segment,
+          PartialLoadProfile.forRequest(result.wrappedLoadSpec(), 
result.fingerprint()),
+          getTieredReplicants()
+      );
+    } else {
+      // Matcher does not apply, but the rule still applies because 
onCannotMatch == FULL_LOAD (FALL_THROUGH would
+      // have caused appliesTo to return false, so run wouldn't be invoked). 
Route through the regular full-load
+      // handler.
+      handler.replicateSegment(segment, getTieredReplicants());
+    }
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/SegmentActionHandler.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/SegmentActionHandler.java
index dc76735c135..f2cb52b14b9 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/SegmentActionHandler.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/SegmentActionHandler.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.server.coordinator.rules;
 
+import org.apache.druid.server.coordinator.loading.PartialLoadProfile;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Map;
@@ -36,6 +37,26 @@ public interface SegmentActionHandler
    */
   void replicateSegment(DataSegment segment, Map<String, Integer> 
tierToReplicaCount);
 
+  /**
+   * Like {@link #replicateSegment(DataSegment, Map)} but for a partial-load 
rule. The given {@code profile} carries
+   * the partial load spec and rule fingerprint that the historicals are being 
asked to load and that the
+   * coordinator uses to reconcile already-loaded replicas against the rule's 
request.
+   * <p>
+   * Default implementation throws {@link UnsupportedOperationException}: 
callers that don't intend to support partial
+   * load rules (e.g., minimal mock handlers used in unit tests) can leave it 
unimplemented. The production handler
+   * {@code StrategicSegmentAssigner} overrides this to do fingerprint-aware 
replica counting.
+   */
+  default void replicateSegmentPartially(
+      DataSegment segment,
+      PartialLoadProfile profile,
+      Map<String, Integer> tierToReplicaCount
+  )
+  {
+    throw new UnsupportedOperationException(
+        "replicateSegmentPartially is not supported by this 
SegmentActionHandler implementation"
+    );
+  }
+
   /**
    * Marks the given segment as unused. Unused segments are eventually unloaded
    * from all servers and deleted from metadata as well as deep storage.
diff --git 
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java 
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
index 9a402fddf3e..37fe42db8e0 100644
--- 
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
+++ 
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
@@ -298,8 +298,12 @@ public class SegmentListerResource
     }
 
     SegmentLoaderConfig config = 
loadDropRequestHandler.getSegmentLoaderConfig();
+    // supportsPartialLoad is hard-coded false until the historical-side load 
handler actually honors the
+    // partial-load LoadSpec wrapper and announces the realized fingerprint 
and loadedBytes back. Until then, the
+    // wire format and coordinator routing exist but the historical performs a 
regular full load on any request, so
+    // advertising the capability would lie to the coordinator and cause it to 
thrash partial-load assignments.
     SegmentLoadingCapabilities capabilitiesResponse =
-        new SegmentLoadingCapabilities(config.getNumLoadingThreads(), 
config.getNumBootstrapThreads());
+        new SegmentLoadingCapabilities(config.getNumLoadingThreads(), 
config.getNumBootstrapThreads(), false);
 
     return 
Response.status(Response.Status.OK).entity(capabilitiesResponse).build();
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java
 
b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java
index d74101482fa..f8ca94435b8 100644
--- 
a/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java
+++ 
b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java
@@ -30,15 +30,27 @@ public class SegmentLoadingCapabilities
 {
   private final int numLoadingThreads;
   private final int numTurboLoadingThreads;
+  private final boolean supportsPartialLoad;
 
   @JsonCreator
   public SegmentLoadingCapabilities(
       @JsonProperty("numLoadingThreads") int numLoadingThreads,
-      @JsonProperty("numTurboLoadingThreads") int numTurboLoadingThreads
+      @JsonProperty("numTurboLoadingThreads") int numTurboLoadingThreads,
+      @JsonProperty("supportsPartialLoad") boolean supportsPartialLoad
   )
   {
     this.numLoadingThreads = numLoadingThreads;
     this.numTurboLoadingThreads = numTurboLoadingThreads;
+    this.supportsPartialLoad = supportsPartialLoad;
+  }
+
+  /**
+   * Convenience for callers that don't care about the partial-load capability 
(older code paths and tests).
+   * Defaults the capability to {@code false}.
+   */
+  public SegmentLoadingCapabilities(int numLoadingThreads, int 
numTurboLoadingThreads)
+  {
+    this(numLoadingThreads, numTurboLoadingThreads, false);
   }
 
   @JsonProperty
@@ -53,12 +65,25 @@ public class SegmentLoadingCapabilities
     return numTurboLoadingThreads;
   }
 
+  /**
+   * Whether the server understands the partial-load {@code LoadSpec} wrapper 
wire formats this Druid version ships
+   * with (e.g., {@code PartialProjectionLoadSpec}). Older historicals 
deserialize an unknown wrapper type and fail;
+   * the coordinator consults this flag and degrades partial-load requests to 
full-loads when it is {@code false}.
+   * Treated as a single capability across all built-in partial-load schemes 
since they are all core-loaded.
+   */
+  @JsonProperty
+  public boolean isSupportsPartialLoad()
+  {
+    return supportsPartialLoad;
+  }
+
   @Override
   public String toString()
   {
     return "SegmentLoadingCapabilities{" +
            "numLoadingThreads=" + numLoadingThreads +
            ", numTurboLoadingThreads=" + numTurboLoadingThreads +
+           ", supportsPartialLoad=" + supportsPartialLoad +
            '}';
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestLoadTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestLoadTest.java
index d165e3480f5..9667f7f7481 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestLoadTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestLoadTest.java
@@ -32,6 +32,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -76,4 +77,79 @@ public class SegmentChangeRequestLoadTest
     Assert.assertEquals(IndexIO.CURRENT_VERSION_ID, 
objectMap.get("binaryVersion"));
     Assert.assertEquals(1, objectMap.get("size"));
   }
+
+  @Test
+  public void testPartialLoadFieldsOmittedWhenNull() throws Exception
+  {
+    // Coordinator-issued load requests leave the partial-load fields null; 
the wire payload should not include them.
+    ObjectMapper mapper = new DefaultObjectMapper();
+    DataSegment segment = new DataSegment(
+        "ds",
+        Intervals.of("2024-01-01/2024-02-01"),
+        "v1",
+        Map.of("type", "local"),
+        List.of("d"),
+        List.of("m"),
+        NoneShardSpec.instance(),
+        IndexIO.CURRENT_VERSION_ID,
+        1
+    );
+    SegmentChangeRequestLoad load = new SegmentChangeRequestLoad(segment);
+    Map<String, Object> objectMap = mapper.readValue(
+        mapper.writeValueAsString(load),
+        JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+    );
+    Assert.assertFalse(objectMap.containsKey("fingerprint"));
+    Assert.assertFalse(objectMap.containsKey("loadedBytes"));
+  }
+
+  @Test
+  public void testPartialLoadFieldsRoundTrip() throws Exception
+  {
+    // Historical announcement with partial-load metadata: round-trip 
preserves both fields.
+    ObjectMapper mapper = new DefaultObjectMapper();
+    DataSegment segment = new DataSegment(
+        "ds",
+        Intervals.of("2024-01-01/2024-02-01"),
+        "v1",
+        Map.of("type", "local"),
+        List.of("d"),
+        List.of("m"),
+        NoneShardSpec.instance(),
+        IndexIO.CURRENT_VERSION_ID,
+        1
+    );
+    SegmentChangeRequestLoad load = new SegmentChangeRequestLoad(
+        segment,
+        "v1:abcdef0123456789",
+        12345L
+    );
+    String json = mapper.writeValueAsString(load);
+    SegmentChangeRequestLoad reread = mapper.readValue(json, 
SegmentChangeRequestLoad.class);
+    Assert.assertEquals(load, reread);
+    Assert.assertEquals("v1:abcdef0123456789", reread.getFingerprint());
+    Assert.assertEquals(Long.valueOf(12345L), reread.getLoadedBytes());
+  }
+
+  @Test
+  public void testOldPayloadDeserializesWithoutPartialFields() throws Exception
+  {
+    // An old-version payload with no partial-load fields should deserialize 
cleanly; the partial fields are null.
+    ObjectMapper mapper = new DefaultObjectMapper();
+    DataSegment segment = new DataSegment(
+        "ds",
+        Intervals.of("2024-01-01/2024-02-01"),
+        "v1",
+        Map.of("type", "local"),
+        List.of("d"),
+        List.of("m"),
+        NoneShardSpec.instance(),
+        IndexIO.CURRENT_VERSION_ID,
+        1
+    );
+    String oldJson = mapper.writeValueAsString(new 
SegmentChangeRequestLoad(segment));
+    SegmentChangeRequestLoad reread = mapper.readValue(oldJson, 
SegmentChangeRequestLoad.class);
+    Assert.assertNull(reread.getFingerprint());
+    Assert.assertNull(reread.getLoadedBytes());
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/PartialLoadProfileTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/PartialLoadProfileTest.java
new file mode 100644
index 00000000000..37df6344581
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/PartialLoadProfileTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import com.google.common.collect.ImmutableMap;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.hamcrest.MatcherAssert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialLoadProfileTest
+{
+  private static final String FINGERPRINT = "v1:0123456789abcdef";
+
+  private static final Map<String, Object> WRAPPED = ImmutableMap.of(
+      "type", "partialProjection",
+      "delegate", ImmutableMap.of("type", "local", "path", 
"/var/druid/segments/foo"),
+      "projections", List.of("user_daily", "user_hourly"),
+      "fingerprint", FINGERPRINT
+  );
+
+  @Test
+  public void testForRequest()
+  {
+    PartialLoadProfile profile = PartialLoadProfile.forRequest(WRAPPED, 
FINGERPRINT);
+    Assertions.assertEquals(WRAPPED, profile.wrappedLoadSpec());
+    Assertions.assertEquals(FINGERPRINT, profile.fingerprint());
+    Assertions.assertNull(profile.loadedBytes());
+    Assertions.assertFalse(profile.isFullFallback());
+  }
+
+  @Test
+  public void testForRequestRejectsNullWrappedLoadSpec()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> PartialLoadProfile.forRequest(null, FINGERPRINT)
+        ),
+        
DruidExceptionMatcher.invalidInput().expectMessageContains("wrappedLoadSpec 
must not be null or empty")
+    );
+  }
+
+  @Test
+  public void testForRequestRejectsEmptyWrappedLoadSpec()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> PartialLoadProfile.forRequest(Map.of(), FINGERPRINT)
+        ),
+        
DruidExceptionMatcher.invalidInput().expectMessageContains("wrappedLoadSpec 
must not be null or empty")
+    );
+  }
+
+  @Test
+  public void testForLoaded()
+  {
+    PartialLoadProfile profile = PartialLoadProfile.forLoaded(WRAPPED, 
FINGERPRINT, 12345L);
+    Assertions.assertEquals(WRAPPED, profile.wrappedLoadSpec());
+    Assertions.assertEquals(FINGERPRINT, profile.fingerprint());
+    Assertions.assertEquals(12345L, profile.loadedBytes());
+    Assertions.assertFalse(profile.isFullFallback());
+  }
+
+  @Test
+  public void testForLoadedRejectsEmptyWrappedLoadSpec()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> PartialLoadProfile.forLoaded(Map.of(), FINGERPRINT, 100L)
+        ),
+        
DruidExceptionMatcher.invalidInput().expectMessageContains("wrappedLoadSpec 
must not be null or empty")
+    );
+  }
+
+  @Test
+  public void testForFullFallback()
+  {
+    PartialLoadProfile profile = 
PartialLoadProfile.forFullFallback(FINGERPRINT, 99999L);
+    Assertions.assertNull(profile.wrappedLoadSpec());
+    Assertions.assertEquals(FINGERPRINT, profile.fingerprint());
+    Assertions.assertEquals(99999L, profile.loadedBytes());
+    Assertions.assertTrue(profile.isFullFallback());
+  }
+
+  @Test
+  public void testFingerprintRequired()
+  {
+    Assertions.assertThrows(
+        NullPointerException.class,
+        () -> PartialLoadProfile.forRequest(WRAPPED, null)
+    );
+  }
+
+  @Test
+  public void testDefensiveCopyOfWrappedLoadSpec()
+  {
+    Map<String, Object> mutable = new HashMap<>();
+    mutable.put("type", "partialProjection");
+    PartialLoadProfile profile = PartialLoadProfile.forRequest(mutable, 
FINGERPRINT);
+    mutable.put("extra", "added-after");
+    Assertions.assertFalse(profile.wrappedLoadSpec().containsKey("extra"));
+  }
+
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(PartialLoadProfile.class)
+                  .withNonnullFields("fingerprint")
+                  .usingGetClass()
+                  .verify();
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/PartialLoadRuleTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/PartialLoadRuleTest.java
index f7cf0f2bc07..35113529c01 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/PartialLoadRuleTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/PartialLoadRuleTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.server.coordinator.loading.PartialLoadProfile;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -291,8 +292,10 @@ public class PartialLoadRuleTest
   }
 
   @Test
-  void testRunForwardsToReplicateSegment()
+  void testRunWithMatchRoutesToReplicateSegmentPartially()
   {
+    // Matcher resolves to a non-empty set on the segment, so run() routes 
through the partial-load handler with a
+    // PartialLoadProfile carrying the resolved set + fingerprint.
     PeriodPartialLoadRule rule = new PeriodPartialLoadRule(
         new Period("P30D"),
         false,
@@ -304,10 +307,36 @@ public class PartialLoadRuleTest
     DataSegment segment = segmentWithProjections(IN_WINDOW, List.of("a", "b"));
     RecordingHandler handler = new RecordingHandler();
     rule.run(segment, handler);
+    Assertions.assertEquals(0, handler.replicateCalls);
+    Assertions.assertEquals(1, handler.replicatePartialCalls);
+    Assertions.assertEquals(tier(2), handler.lastTierToReplicaCount);
+    Assertions.assertNotNull(handler.lastProfile);
+    Assertions.assertNotNull(handler.lastProfile.wrappedLoadSpec());
+    Assertions.assertEquals("partialProjection", 
handler.lastProfile.wrappedLoadSpec().get("type"));
+    Assertions.assertEquals(List.of("a"), 
handler.lastProfile.wrappedLoadSpec().get("projections"));
+    Assertions.assertTrue(handler.lastProfile.fingerprint().startsWith("v1:"));
+  }
+
+  @Test
+  void testRunWithFullLoadFallbackRoutesToReplicateSegment()
+  {
+    // Matcher does not apply but the rule's onCannotMatch is FULL_LOAD, so 
run() falls back to the regular full-load
+    // handler.
+    PeriodPartialLoadRule rule = new PeriodPartialLoadRule(
+        new Period("P30D"),
+        false,
+        tier(2),
+        null,
+        exact("nonexistent"),
+        CannotMatchBehavior.FULL_LOAD
+    );
+    DataSegment segment = segmentWithProjections(IN_WINDOW, List.of("a", "b"));
+    RecordingHandler handler = new RecordingHandler();
+    rule.run(segment, handler);
     Assertions.assertEquals(1, handler.replicateCalls);
-    Assertions.assertEquals(0, handler.broadcastCalls);
-    Assertions.assertEquals(0, handler.deleteCalls);
+    Assertions.assertEquals(0, handler.replicatePartialCalls);
     Assertions.assertEquals(tier(2), handler.lastTierToReplicaCount);
+    Assertions.assertNull(handler.lastProfile);
   }
 
   @Test
@@ -363,9 +392,11 @@ public class PartialLoadRuleTest
   private static class RecordingHandler implements SegmentActionHandler
   {
     int replicateCalls;
+    int replicatePartialCalls;
     int broadcastCalls;
     int deleteCalls;
     Map<String, Integer> lastTierToReplicaCount;
+    PartialLoadProfile lastProfile;
 
     @Override
     public void replicateSegment(DataSegment segment, Map<String, Integer> 
tierToReplicaCount)
@@ -374,6 +405,18 @@ public class PartialLoadRuleTest
       lastTierToReplicaCount = tierToReplicaCount;
     }
 
+    @Override
+    public void replicateSegmentPartially(
+        DataSegment segment,
+        PartialLoadProfile profile,
+        Map<String, Integer> tierToReplicaCount
+    )
+    {
+      replicatePartialCalls++;
+      lastProfile = profile;
+      lastTierToReplicaCount = tierToReplicaCount;
+    }
+
     @Override
     public void deleteSegment(DataSegment segment)
     {
diff --git 
a/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java
 
b/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java
index 0819439a0aa..2a21bee73c0 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java
@@ -49,4 +49,32 @@ public class SegmentLoadingCapabilitiesTest
     Assert.assertEquals(3, reread.getNumLoadingThreads());
     Assert.assertEquals(5, reread.getNumTurboLoadingThreads());
   }
+
+  @Test
+  public void testTwoArgConstructorDefaultsPartialLoadToFalse()
+  {
+    SegmentLoadingCapabilities capabilities = new 
SegmentLoadingCapabilities(1, 4);
+    Assert.assertFalse(capabilities.isSupportsPartialLoad());
+  }
+
+  @Test
+  public void testSerdeWithPartialLoadTrue() throws Exception
+  {
+    SegmentLoadingCapabilities capabilities = new 
SegmentLoadingCapabilities(1, 4, true);
+    SegmentLoadingCapabilities reread = jsonMapper.readValue(
+        jsonMapper.writeValueAsString(capabilities),
+        SegmentLoadingCapabilities.class
+    );
+    Assert.assertTrue(reread.isSupportsPartialLoad());
+  }
+
+  @Test
+  public void testOldPayloadDeserializesWithPartialLoadFalse() throws 
JsonProcessingException
+  {
+    // An older historical that doesn't include the new field should 
deserialize cleanly with the field defaulting to
+    // false. The coordinator then conservatively avoids sending wrapped 
LoadSpecs to that historical.
+    String json = "{\"numLoadingThreads\":3,\"numTurboLoadingThreads\":5}";
+    SegmentLoadingCapabilities reread = jsonMapper.readValue(json, 
SegmentLoadingCapabilities.class);
+    Assert.assertFalse(reread.isSupportsPartialLoad());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to