This is an automated email from the ASF dual-hosted git repository.
clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4a9cde30903 feat: Wire format and data model for partial-projection
load rules (#19409)
4a9cde30903 is described below
commit 4a9cde30903e5ecf872214710faf3c8ecd9927f8
Author: Clint Wylie <[email protected]>
AuthorDate: Fri May 8 13:53:53 2026 -0700
feat: Wire format and data model for partial-projection load rules (#19409)
* add `PartialProjectionLoadSpec`, a `LoadSpec` wrapper for
partial-projection loads with lazy delegate materialization
* add `PartialLoadSpecModule` and register it in `CoreInjectorBuilder` so
historicals always have the wrapper
* add `supportsPartialLoad` capability flag to
`SegmentLoadingCapabilities`; historicals (will eventually) advertise true,
coordinator defaults to false for unknown servers
* add optional `fingerprint` and `loadedBytes` wire fields to
`SegmentChangeRequestLoad` for historical partial-load announcements (NON_NULL
include)
* add `PartialLoadProfile` generic data model (wrappedLoadSpec,
fingerprint, loadedBytes) with `forRequest` / `forLoaded` / `forFullFallback`
factories
* add `SegmentActionHandler.replicateSegmentPartially` default throwing
`UnsupportedOperationException`
* wire `PartialLoadRule.run()` to route through `replicateSegmentPartially`
when the matcher resolves; fall back to `replicateSegment` for
FULL_LOAD-on-cannot-match
---
.../segment/loading/PartialProjectionLoadSpec.java | 139 +++++++++++++
.../loading/PartialProjectionLoadSpecTest.java | 222 +++++++++++++++++++++
.../apache/druid/guice/PartialLoadSpecModule.java | 48 +++++
.../druid/initialization/CoreInjectorBuilder.java | 2 +
.../coordination/SegmentChangeRequestLoad.java | 51 ++++-
.../coordinator/loading/PartialLoadProfile.java | 107 ++++++++++
.../coordinator/rules/PartialLoadMatcher.java | 3 +-
.../server/coordinator/rules/PartialLoadRule.java | 19 +-
.../coordinator/rules/SegmentActionHandler.java | 21 ++
.../druid/server/http/SegmentListerResource.java | 6 +-
.../server/http/SegmentLoadingCapabilities.java | 27 ++-
.../coordination/SegmentChangeRequestLoadTest.java | 76 +++++++
.../loading/PartialLoadProfileTest.java | 138 +++++++++++++
.../coordinator/rules/PartialLoadRuleTest.java | 49 ++++-
.../http/SegmentLoadingCapabilitiesTest.java | 28 +++
15 files changed, 921 insertions(+), 15 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java
b/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java
new file mode 100644
index 00000000000..160945a8ce7
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import org.apache.druid.utils.CollectionUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A {@link LoadSpec} wrapper that carries partial-projection metadata from
the coordinator to a historical alongside
+ * the original backend-specific load spec. The wrapped {@code delegate} is
held as a raw {@link Map} so that the
+ * concrete backend type (e.g. {@code s3}, {@code local}, {@code hdfs}) is
materialized only when needed; this avoids
+ * pulling backend-specific dependencies onto every node that touches the wire
form.
+ * <p>
+ * Both {@link #loadSegment(File)} and {@link #openRangeReader()} delegate
verbatim to the inner load spec. The
+ * historical-side partial-load path inspects this wrapper at mount time to
learn which projections to range-read and
+ * the fingerprint identifying the request the coordinator made.
+ */
+@JsonTypeName(PartialProjectionLoadSpec.TYPE)
+public class PartialProjectionLoadSpec implements LoadSpec
+{
+ public static final String TYPE = "partialProjection";
+
+ private final Map<String, Object> delegate;
+ private final List<String> projections;
+ private final String fingerprint;
+ private final Supplier<LoadSpec> materializedDelegateSupplier;
+
+ @JsonCreator
+ public PartialProjectionLoadSpec(
+ @JsonProperty("delegate") Map<String, Object> delegate,
+ @JsonProperty("projections") List<String> projections,
+ @JsonProperty("fingerprint") String fingerprint,
+ @JacksonInject ObjectMapper jsonMapper
+ )
+ {
+ Preconditions.checkNotNull(jsonMapper, "jsonMapper");
+ this.delegate = Preconditions.checkNotNull(delegate, "delegate");
+ Preconditions.checkArgument(
+ !CollectionUtils.isNullOrEmpty(projections),
+ "projections must not be null or empty"
+ );
+ this.projections = List.copyOf(projections);
+ this.fingerprint = Preconditions.checkNotNull(fingerprint, "fingerprint");
+ this.materializedDelegateSupplier = Suppliers.memoize(() ->
jsonMapper.convertValue(delegate, LoadSpec.class));
+ }
+
+ @JsonProperty
+ public Map<String, Object> getDelegate()
+ {
+ return delegate;
+ }
+
+ @JsonProperty
+ public List<String> getProjections()
+ {
+ return projections;
+ }
+
+ @JsonProperty
+ public String getFingerprint()
+ {
+ return fingerprint;
+ }
+
+ @Override
+ public LoadSpecResult loadSegment(File destDir) throws
SegmentLoadingException
+ {
+ return materializedDelegateSupplier.get().loadSegment(destDir);
+ }
+
+ @Override
+ @Nullable
+ public SegmentRangeReader openRangeReader() throws IOException
+ {
+ return materializedDelegateSupplier.get().openRangeReader();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PartialProjectionLoadSpec that = (PartialProjectionLoadSpec) o;
+ return Objects.equals(delegate, that.delegate)
+ && Objects.equals(projections, that.projections)
+ && Objects.equals(fingerprint, that.fingerprint);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(delegate, projections, fingerprint);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "PartialProjectionLoadSpec{" +
+ "delegate=" + delegate +
+ ", projections=" + projections +
+ ", fingerprint=" + fingerprint +
+ '}';
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/segment/loading/PartialProjectionLoadSpecTest.java
b/processing/src/test/java/org/apache/druid/segment/loading/PartialProjectionLoadSpecTest.java
new file mode 100644
index 00000000000..b64a0d0b7c1
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/loading/PartialProjectionLoadSpecTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PartialProjectionLoadSpecTest
+{
+ private static final Map<String, Object> DELEGATE = ImmutableMap.of(
+ "type", "stub",
+ "path", "/var/druid/segments/foo"
+ );
+ private static final String FINGERPRINT = "v1:abcdef0123456789";
+
+ private static ObjectMapper configuredMapper()
+ {
+ final ObjectMapper m = new DefaultObjectMapper();
+ final SimpleModule module = new SimpleModule();
+ module.registerSubtypes(PartialProjectionLoadSpec.class,
StubLoadSpec.class);
+ m.registerModule(module);
+ m.setInjectableValues(new
InjectableValues.Std().addValue(ObjectMapper.class, m));
+ return m;
+ }
+
+ private final ObjectMapper jsonMapper = configuredMapper();
+
+ @Test
+ void testJsonRoundTrip() throws Exception
+ {
+ PartialProjectionLoadSpec spec = new PartialProjectionLoadSpec(
+ DELEGATE,
+ List.of("user_daily", "user_hourly"),
+ FINGERPRINT,
+ jsonMapper
+ );
+ String json = jsonMapper.writeValueAsString(spec);
+ LoadSpec reread = jsonMapper.readValue(json, LoadSpec.class);
+ Assertions.assertInstanceOf(PartialProjectionLoadSpec.class, reread);
+ Assertions.assertEquals(spec, reread);
+ }
+
+ @Test
+ void testWireFormHasPartialProjectionType() throws Exception
+ {
+ PartialProjectionLoadSpec spec = new PartialProjectionLoadSpec(
+ DELEGATE,
+ List.of("a"),
+ FINGERPRINT,
+ jsonMapper
+ );
+ Map<String, Object> wireForm = jsonMapper.readValue(
+ jsonMapper.writeValueAsString(spec),
+ new TypeReference<>()
+ {
+ }
+ );
+ Assertions.assertEquals("partialProjection", wireForm.get("type"));
+ Assertions.assertEquals(DELEGATE, wireForm.get("delegate"));
+ Assertions.assertEquals(List.of("a"), wireForm.get("projections"));
+ Assertions.assertEquals(FINGERPRINT, wireForm.get("fingerprint"));
+ }
+
+ @Test
+ void testLoadSegmentDelegatesToInner() throws Exception
+ {
+ PartialProjectionLoadSpec spec = new PartialProjectionLoadSpec(
+ DELEGATE,
+ List.of("a"),
+ FINGERPRINT,
+ jsonMapper
+ );
+ StubLoadSpec.LOAD_CALLS.set(0);
+ LoadSpec.LoadSpecResult result = spec.loadSegment(new File("/tmp/dest"));
+ Assertions.assertEquals(1, StubLoadSpec.LOAD_CALLS.get());
+ Assertions.assertEquals(42L, result.getSize());
+ }
+
+ @Test
+ void testOpenRangeReaderDelegatesToInner() throws Exception
+ {
+ PartialProjectionLoadSpec spec = new PartialProjectionLoadSpec(
+ DELEGATE,
+ List.of("a"),
+ FINGERPRINT,
+ jsonMapper
+ );
+ StubLoadSpec.RANGE_CALLS.set(0);
+ SegmentRangeReader reader = spec.openRangeReader();
+ Assertions.assertNotNull(reader);
+ Assertions.assertEquals(1, StubLoadSpec.RANGE_CALLS.get());
+ }
+
+ @Test
+ void testOpenRangeReaderReturnsNullWhenInnerDoesNotSupport() throws Exception
+ {
+ PartialProjectionLoadSpec spec = new PartialProjectionLoadSpec(
+ ImmutableMap.of("type", "stub", "path", "/", "supportsRange", false),
+ List.of("a"),
+ FINGERPRINT,
+ jsonMapper
+ );
+ Assertions.assertNull(spec.openRangeReader());
+ }
+
+ @Test
+ void testRejectsNullDelegate()
+ {
+ Assertions.assertThrows(
+ NullPointerException.class,
+ () -> new PartialProjectionLoadSpec(null, List.of("a"), "v1:x",
jsonMapper)
+ );
+ }
+
+ @Test
+ void testRejectsNullOrEmptyProjections()
+ {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new PartialProjectionLoadSpec(DELEGATE, null, "v1:x", jsonMapper)
+ );
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new PartialProjectionLoadSpec(DELEGATE, List.of(), "v1:x",
jsonMapper)
+ );
+ }
+
+ @Test
+ void testRejectsNullFingerprint()
+ {
+ Assertions.assertThrows(
+ NullPointerException.class,
+ () -> new PartialProjectionLoadSpec(DELEGATE, List.of("a"), null,
jsonMapper)
+ );
+ }
+
+ /**
+ * Stub LoadSpec used to verify delegation. Uses the same JSON
"type"=="stub" key as the test {@link #DELEGATE}.
+ */
+ @JsonTypeName("stub")
+ public static class StubLoadSpec implements LoadSpec
+ {
+ static final AtomicInteger LOAD_CALLS = new AtomicInteger(0);
+ static final AtomicInteger RANGE_CALLS = new AtomicInteger(0);
+
+ private final String path;
+ private final boolean supportsRange;
+
+ @JsonCreator
+ public StubLoadSpec(
+ @JsonProperty("path") String path,
+ @JsonProperty("supportsRange") @Nullable Boolean supportsRange
+ )
+ {
+ this.path = path;
+ this.supportsRange = supportsRange == null || supportsRange;
+ }
+
+ @JsonProperty
+ public String getPath()
+ {
+ return path;
+ }
+
+ @JsonProperty
+ public boolean isSupportsRange()
+ {
+ return supportsRange;
+ }
+
+ @Override
+ public LoadSpecResult loadSegment(File destDir)
+ {
+ LOAD_CALLS.incrementAndGet();
+ return new LoadSpecResult(42L);
+ }
+
+ @Override
+ @Nullable
+ public SegmentRangeReader openRangeReader()
+ {
+ if (!supportsRange) {
+ return null;
+ }
+ RANGE_CALLS.incrementAndGet();
+ return (filename, offset, length) -> new ByteArrayInputStream(new
byte[0]);
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/guice/PartialLoadSpecModule.java
b/server/src/main/java/org/apache/druid/guice/PartialLoadSpecModule.java
new file mode 100644
index 00000000000..d1bc533269c
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/guice/PartialLoadSpecModule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.segment.loading.LoadSpec;
+import org.apache.druid.segment.loading.PartialProjectionLoadSpec;
+
+import java.util.List;
+
+/**
+ * Registers {@link PartialProjectionLoadSpec} as a {@link LoadSpec} subtype
for serde of partial load rules. This
+ * module is added to the always-loaded core list so it is available alongside
any other deep-storage load spec modules.
+ */
+public class PartialLoadSpecModule implements DruidModule
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ // nothing to do
+ }
+
+ @Override
+ public List<? extends Module> getJacksonModules()
+ {
+ return List.of(new
SimpleModule().registerSubtypes(PartialProjectionLoadSpec.class));
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
b/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
index 88cd33444c8..aaaf56923bc 100644
---
a/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
+++
b/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
@@ -36,6 +36,7 @@ import org.apache.druid.guice.JavaScriptModule;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.LocalDataStorageDruidModule;
import org.apache.druid.guice.MetadataConfigModule;
+import org.apache.druid.guice.PartialLoadSpecModule;
import org.apache.druid.guice.ServerModule;
import org.apache.druid.guice.ServerViewModule;
import org.apache.druid.guice.StartupLoggingModule;
@@ -122,6 +123,7 @@ public class CoreInjectorBuilder extends
DruidInjectorBuilder
new DerbyMetadataStorageDruidModule(),
new JacksonConfigManagerModule(),
new LocalDataStorageDruidModule(),
+ new PartialLoadSpecModule(),
new TombstoneDataStorageModule(),
new JavaScriptModule(),
new AuthenticatorModule(),
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
index 1bb9997980c..dffeac9fb53 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
@@ -20,6 +20,7 @@
package org.apache.druid.server.coordination;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonUnwrapped;
import org.apache.druid.java.util.common.StringUtils;
@@ -29,28 +30,46 @@ import javax.annotation.Nullable;
import java.util.Objects;
/**
+ * Wire form for a coordinator load request and a historical's load
announcement.
+ * <p>
+ * The {@code fingerprint} and {@code loadedBytes} fields are optional and
only populated by historicals when
+ * announcing a partial load. Coordinator-issued load requests leave them
null; partial-load metadata rides inside the
+ * wrapped {@code LoadSpec} on outbound requests instead.
*/
public class SegmentChangeRequestLoad implements DataSegmentChangeRequest
{
private final DataSegment segment;
+ @Nullable private final String fingerprint;
+ @Nullable private final Long loadedBytes;
/**
* To avoid pruning of the loadSpec on the broker, needed when the broker is
loading broadcast segments,
- * we deserialize into an {@link LoadableDataSegment}, which never removes
the loadSpec.
+ * we deserialize into a {@link LoadableDataSegment}, which never removes
the loadSpec.
*/
@JsonCreator
public SegmentChangeRequestLoad(
- @JsonUnwrapped LoadableDataSegment segment
+ @JsonUnwrapped LoadableDataSegment segment,
+ @JsonProperty("fingerprint") @Nullable String fingerprint,
+ @JsonProperty("loadedBytes") @Nullable Long loadedBytes
)
{
- this.segment = segment;
+ this((DataSegment) segment, fingerprint, loadedBytes);
+ }
+
+ public SegmentChangeRequestLoad(DataSegment segment)
+ {
+ this(segment, null, null);
}
public SegmentChangeRequestLoad(
- DataSegment segment
+ DataSegment segment,
+ @Nullable String fingerprint,
+ @Nullable Long loadedBytes
)
{
this.segment = segment;
+ this.fingerprint = fingerprint;
+ this.loadedBytes = loadedBytes;
}
@@ -67,6 +86,22 @@ public class SegmentChangeRequestLoad implements
DataSegmentChangeRequest
return segment;
}
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public String getFingerprint()
+ {
+ return fingerprint;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public Long getLoadedBytes()
+ {
+ return loadedBytes;
+ }
+
@Override
public String asString()
{
@@ -83,13 +118,15 @@ public class SegmentChangeRequestLoad implements
DataSegmentChangeRequest
return false;
}
SegmentChangeRequestLoad that = (SegmentChangeRequestLoad) o;
- return Objects.equals(segment, that.segment);
+ return Objects.equals(segment, that.segment)
+ && Objects.equals(fingerprint, that.fingerprint)
+ && Objects.equals(loadedBytes, that.loadedBytes);
}
@Override
public int hashCode()
{
- return Objects.hash(segment);
+ return Objects.hash(segment, fingerprint, loadedBytes);
}
@Override
@@ -97,6 +134,8 @@ public class SegmentChangeRequestLoad implements
DataSegmentChangeRequest
{
return "SegmentChangeRequestLoad{" +
"segment=" + segment +
+ (fingerprint != null ? ", fingerprint=" + fingerprint : "") +
+ (loadedBytes != null ? ", loadedBytes=" + loadedBytes : "") +
'}';
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/PartialLoadProfile.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/PartialLoadProfile.java
new file mode 100644
index 00000000000..a220c6f3fc3
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/PartialLoadProfile.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import org.apache.druid.error.InvalidInput;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Bundles the partial-load metadata that travels alongside a segment between
the coordinator and the historicals.
+ * Generic across partial-load schemes: scheme-specific data lives inside the
{@code wrappedLoadSpec} map, which
+ * matches the shape of the wire-form load-spec wrapper that gets stamped onto
outbound load requests via
+ * {@code DataSegment.withLoadSpec(...)}.
+ * <p>
+ * Used in three distinct states:
+ * <ul>
+ * <li>{@link #forRequest(Map, String) forRequest}: outbound from
coordinator to historical. Carries the wrapped
+ * load-spec map identifying what to load and the fingerprint
identifying that request. {@code loadedBytes} is
+ * null because the on-disk footprint is only known after the historical
has parsed segment metadata.</li>
+ * <li>{@link #forLoaded(Map, String, long) forLoaded}: inbound on a
historical announcement after a successful
+ * partial load. {@code wrappedLoadSpec} echoes the request and {@code
loadedBytes} reports the realized
+ * footprint.</li>
+ * <li>{@link #forFullFallback(String, long) forFullFallback}: inbound on a
historical announcement when partial
+ * loading was requested but the historical fell back to a full download
(zipped-V10 segment, capability
+ * mismatch on a partially-upgraded server, etc.). {@code
wrappedLoadSpec} is null to signal the fallback;
+ * {@code loadedBytes} equals the full segment size. The matching
fingerprint still satisfies the rule that
+ * requested the load, so no reload-thrash occurs.</li>
+ * </ul>
+ * The fingerprint is derived from the resolved scheme-specific data (e.g.,
for projections, the sorted/deduped name
+ * list — see {@code ProjectionPartialLoadMatcher}) so that two rule
configurations that resolve to the same set on a
+ * segment produce the same fingerprint and don't churn replicas across
coordinator runs.
+ */
+public record PartialLoadProfile(
+ @Nullable Map<String, Object> wrappedLoadSpec,
+ String fingerprint,
+ @Nullable Long loadedBytes
+)
+{
+ public PartialLoadProfile
+ {
+ Objects.requireNonNull(fingerprint, "fingerprint");
+ if (wrappedLoadSpec != null) {
+ wrappedLoadSpec = Map.copyOf(wrappedLoadSpec);
+ }
+ }
+
+ /**
+ * Build the outbound (coordinator → historical) profile for a partial-load
request.
+ * {@code wrappedLoadSpec} must be non-empty: an empty wrapped load spec is
the matcher's "does not apply" signal
+ * and should never produce a request profile.
+ */
+ public static PartialLoadProfile forRequest(Map<String, Object>
wrappedLoadSpec, String fingerprint)
+ {
+ if (wrappedLoadSpec == null || wrappedLoadSpec.isEmpty()) {
+ throw InvalidInput.exception("wrappedLoadSpec must not be null or empty
for an outbound load request");
+ }
+ return new PartialLoadProfile(wrappedLoadSpec, fingerprint, null);
+ }
+
+ /**
+ * Build the inbound profile for a successful partial load announcement.
+ */
+ public static PartialLoadProfile forLoaded(Map<String, Object>
wrappedLoadSpec, String fingerprint, long loadedBytes)
+ {
+ if (wrappedLoadSpec == null || wrappedLoadSpec.isEmpty()) {
+ throw InvalidInput.exception("wrappedLoadSpec must not be null or empty
for a loaded announcement");
+ }
+ return new PartialLoadProfile(wrappedLoadSpec, fingerprint, loadedBytes);
+ }
+
+ /**
+ * Build the inbound profile for a historical that was asked to partial-load
but fell back to a full download.
+ * {@code wrappedLoadSpec} is null as a sentinel; {@code loadedBytes} should
be the full segment size so that
+ * inventory accounting reflects actual on-disk footprint.
+ */
+ public static PartialLoadProfile forFullFallback(String fingerprint, long
fullBytes)
+ {
+ return new PartialLoadProfile(null, fingerprint, fullBytes);
+ }
+
+ /**
+ * Whether this profile represents a historical's full-fallback (i.e., it
was asked to partial-load but couldn't).
+ */
+ public boolean isFullFallback()
+ {
+ return wrappedLoadSpec == null;
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java
index fd7cfc3cc5b..b1c9c78ace1 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java
@@ -52,7 +52,8 @@ public interface PartialLoadMatcher
/**
* Output of {@link #match(DataSegment, Map)} when the matcher applies.
Carries the wrapped load-spec map (ready to
* be stamped onto an outbound {@link SegmentChangeRequestLoad}) and the
fingerprint used by the coordinator to
- * reconcile loaded replicas against the rule that requested them.
+ * reconcile loaded replicas against the rule that requested them.
Scheme-specific data lives inside
+ * {@code wrappedLoadSpec}; callers that need a typed view extract from the
map themselves.
*/
record MatchResult(Map<String, Object> wrappedLoadSpec, String fingerprint)
{
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadRule.java
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadRule.java
index 052389d72f4..b16942df6d5 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadRule.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadRule.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;
import org.apache.druid.error.InvalidInput;
+import org.apache.druid.server.coordinator.loading.PartialLoadProfile;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
@@ -89,9 +90,21 @@ public abstract class PartialLoadRule extends LoadRule
@Override
public void run(DataSegment segment, SegmentActionHandler handler)
{
- // Partial plumbing is added in future work. For now, a partial rule that
applies to a segment full-loads it,
- // identical behavior to the corresponding non-partial rule
- handler.replicateSegment(segment, getTieredReplicants());
+ final PartialLoadMatcher.MatchResult result = matcher.match(segment,
segment.getLoadSpec());
+ if (result != null) {
+ // Matcher resolved: route through the partial-load handler. The
wrappedLoadSpec map carries scheme-specific
+ // data that the historical-side wrapper deserializes.
+ handler.replicateSegmentPartially(
+ segment,
+ PartialLoadProfile.forRequest(result.wrappedLoadSpec(),
result.fingerprint()),
+ getTieredReplicants()
+ );
+ } else {
+ // Matcher does not apply, but the rule still applies because
onCannotMatch == FULL_LOAD (FALL_THROUGH would
+ // have caused appliesTo to return false, so run wouldn't be invoked).
Route through the regular full-load
+ // handler.
+ handler.replicateSegment(segment, getTieredReplicants());
+ }
}
@Override
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/rules/SegmentActionHandler.java
b/server/src/main/java/org/apache/druid/server/coordinator/rules/SegmentActionHandler.java
index dc76735c135..f2cb52b14b9 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/rules/SegmentActionHandler.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/rules/SegmentActionHandler.java
@@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator.rules;
+import org.apache.druid.server.coordinator.loading.PartialLoadProfile;
import org.apache.druid.timeline.DataSegment;
import java.util.Map;
@@ -36,6 +37,26 @@ public interface SegmentActionHandler
*/
void replicateSegment(DataSegment segment, Map<String, Integer>
tierToReplicaCount);
+ /**
+ * Like {@link #replicateSegment(DataSegment, Map)} but for a partial-load
rule. The given {@code profile} carries
+ * the partial load spec and rule fingerprint that the historicals are being
asked to load and that the
+ * coordinator uses to reconcile already-loaded replicas against the rule's
request.
+ * <p>
+ * Default implementation throws {@link UnsupportedOperationException}:
callers that don't intend to support partial
+ * load rules (e.g., minimal mock handlers used in unit tests) can leave it
unimplemented. The production handler
+ * {@code StrategicSegmentAssigner} overrides this to do fingerprint-aware
replica counting.
+ */
+ default void replicateSegmentPartially(
+ DataSegment segment,
+ PartialLoadProfile profile,
+ Map<String, Integer> tierToReplicaCount
+ )
+ {
+ throw new UnsupportedOperationException(
+ "replicateSegmentPartially is not supported by this
SegmentActionHandler implementation"
+ );
+ }
+
/**
* Marks the given segment as unused. Unused segments are eventually unloaded
* from all servers and deleted from metadata as well as deep storage.
diff --git
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
index 9a402fddf3e..37fe42db8e0 100644
---
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
+++
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
@@ -298,8 +298,12 @@ public class SegmentListerResource
}
SegmentLoaderConfig config =
loadDropRequestHandler.getSegmentLoaderConfig();
+ // supportsPartialLoad is hard-coded false until the historical-side load
handler actually honors the
+ // partial-load LoadSpec wrapper and announces the realized fingerprint
and loadedBytes back. Until then, the
+ // wire format and coordinator routing exist but the historical performs a
regular full load on any request, so
+ // advertising the capability would lie to the coordinator and cause it to
thrash partial-load assignments.
SegmentLoadingCapabilities capabilitiesResponse =
- new SegmentLoadingCapabilities(config.getNumLoadingThreads(),
config.getNumBootstrapThreads());
+ new SegmentLoadingCapabilities(config.getNumLoadingThreads(),
config.getNumBootstrapThreads(), false);
return
Response.status(Response.Status.OK).entity(capabilitiesResponse).build();
}
diff --git
a/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java
b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java
index d74101482fa..f8ca94435b8 100644
---
a/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java
+++
b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java
@@ -30,15 +30,27 @@ public class SegmentLoadingCapabilities
{
private final int numLoadingThreads;
private final int numTurboLoadingThreads;
+ private final boolean supportsPartialLoad;
@JsonCreator
public SegmentLoadingCapabilities(
@JsonProperty("numLoadingThreads") int numLoadingThreads,
- @JsonProperty("numTurboLoadingThreads") int numTurboLoadingThreads
+ @JsonProperty("numTurboLoadingThreads") int numTurboLoadingThreads,
+ @JsonProperty("supportsPartialLoad") boolean supportsPartialLoad
)
{
this.numLoadingThreads = numLoadingThreads;
this.numTurboLoadingThreads = numTurboLoadingThreads;
+ this.supportsPartialLoad = supportsPartialLoad;
+ }
+
+ /**
+ * Convenience for callers that don't care about the partial-load capability
(older code paths and tests).
+ * Defaults the capability to {@code false}.
+ */
+ public SegmentLoadingCapabilities(int numLoadingThreads, int
numTurboLoadingThreads)
+ {
+ this(numLoadingThreads, numTurboLoadingThreads, false);
}
@JsonProperty
@@ -53,12 +65,25 @@ public class SegmentLoadingCapabilities
return numTurboLoadingThreads;
}
+ /**
+ * Whether the server understands the partial-load {@code LoadSpec} wrapper
wire formats this Druid version ships
+ * with (e.g., {@code PartialProjectionLoadSpec}). Older historicals
deserialize an unknown wrapper type and fail;
+ * the coordinator consults this flag and degrades partial-load requests to
full-loads when it is {@code false}.
+ * Treated as a single capability across all built-in partial-load schemes
since they are all core-loaded.
+ */
+ @JsonProperty
+ public boolean isSupportsPartialLoad()
+ {
+ return supportsPartialLoad;
+ }
+
@Override
public String toString()
{
return "SegmentLoadingCapabilities{" +
"numLoadingThreads=" + numLoadingThreads +
", numTurboLoadingThreads=" + numTurboLoadingThreads +
+ ", supportsPartialLoad=" + supportsPartialLoad +
'}';
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestLoadTest.java
b/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestLoadTest.java
index d165e3480f5..9667f7f7481 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestLoadTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestLoadTest.java
@@ -32,6 +32,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
/**
@@ -76,4 +77,79 @@ public class SegmentChangeRequestLoadTest
Assert.assertEquals(IndexIO.CURRENT_VERSION_ID,
objectMap.get("binaryVersion"));
Assert.assertEquals(1, objectMap.get("size"));
}
+
+ @Test
+ public void testPartialLoadFieldsOmittedWhenNull() throws Exception
+ {
+ // Coordinator-issued load requests leave the partial-load fields null;
the wire payload should not include them.
+ ObjectMapper mapper = new DefaultObjectMapper();
+ DataSegment segment = new DataSegment(
+ "ds",
+ Intervals.of("2024-01-01/2024-02-01"),
+ "v1",
+ Map.of("type", "local"),
+ List.of("d"),
+ List.of("m"),
+ NoneShardSpec.instance(),
+ IndexIO.CURRENT_VERSION_ID,
+ 1
+ );
+ SegmentChangeRequestLoad load = new SegmentChangeRequestLoad(segment);
+ Map<String, Object> objectMap = mapper.readValue(
+ mapper.writeValueAsString(load),
+ JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ );
+ Assert.assertFalse(objectMap.containsKey("fingerprint"));
+ Assert.assertFalse(objectMap.containsKey("loadedBytes"));
+ }
+
+ @Test
+ public void testPartialLoadFieldsRoundTrip() throws Exception
+ {
+ // Historical announcement with partial-load metadata: round-trip
preserves both fields.
+ ObjectMapper mapper = new DefaultObjectMapper();
+ DataSegment segment = new DataSegment(
+ "ds",
+ Intervals.of("2024-01-01/2024-02-01"),
+ "v1",
+ Map.of("type", "local"),
+ List.of("d"),
+ List.of("m"),
+ NoneShardSpec.instance(),
+ IndexIO.CURRENT_VERSION_ID,
+ 1
+ );
+ SegmentChangeRequestLoad load = new SegmentChangeRequestLoad(
+ segment,
+ "v1:abcdef0123456789",
+ 12345L
+ );
+ String json = mapper.writeValueAsString(load);
+ SegmentChangeRequestLoad reread = mapper.readValue(json,
SegmentChangeRequestLoad.class);
+ Assert.assertEquals(load, reread);
+ Assert.assertEquals("v1:abcdef0123456789", reread.getFingerprint());
+ Assert.assertEquals(Long.valueOf(12345L), reread.getLoadedBytes());
+ }
+
+ @Test
+ public void testOldPayloadDeserializesWithoutPartialFields() throws Exception
+ {
+ // An old-version payload with no partial-load fields should deserialize
cleanly; the partial fields are null.
+ ObjectMapper mapper = new DefaultObjectMapper();
+ DataSegment segment = new DataSegment(
+ "ds",
+ Intervals.of("2024-01-01/2024-02-01"),
+ "v1",
+ Map.of("type", "local"),
+ List.of("d"),
+ List.of("m"),
+ NoneShardSpec.instance(),
+ IndexIO.CURRENT_VERSION_ID,
+ 1
+ );
+ String oldJson = mapper.writeValueAsString(new
SegmentChangeRequestLoad(segment));
+ SegmentChangeRequestLoad reread = mapper.readValue(oldJson,
SegmentChangeRequestLoad.class);
+ Assert.assertNull(reread.getFingerprint());
+ Assert.assertNull(reread.getLoadedBytes());
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/loading/PartialLoadProfileTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/loading/PartialLoadProfileTest.java
new file mode 100644
index 00000000000..37df6344581
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/coordinator/loading/PartialLoadProfileTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import com.google.common.collect.ImmutableMap;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.hamcrest.MatcherAssert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialLoadProfileTest
+{
+ private static final String FINGERPRINT = "v1:0123456789abcdef";
+
+ private static final Map<String, Object> WRAPPED = ImmutableMap.of(
+ "type", "partialProjection",
+ "delegate", ImmutableMap.of("type", "local", "path",
"/var/druid/segments/foo"),
+ "projections", List.of("user_daily", "user_hourly"),
+ "fingerprint", FINGERPRINT
+ );
+
+ @Test
+ public void testForRequest()
+ {
+ PartialLoadProfile profile = PartialLoadProfile.forRequest(WRAPPED,
FINGERPRINT);
+ Assertions.assertEquals(WRAPPED, profile.wrappedLoadSpec());
+ Assertions.assertEquals(FINGERPRINT, profile.fingerprint());
+ Assertions.assertNull(profile.loadedBytes());
+ Assertions.assertFalse(profile.isFullFallback());
+ }
+
+ @Test
+ public void testForRequestRejectsNullWrappedLoadSpec()
+ {
+ MatcherAssert.assertThat(
+ Assertions.assertThrows(
+ DruidException.class,
+ () -> PartialLoadProfile.forRequest(null, FINGERPRINT)
+ ),
+
DruidExceptionMatcher.invalidInput().expectMessageContains("wrappedLoadSpec
must not be null or empty")
+ );
+ }
+
+ @Test
+ public void testForRequestRejectsEmptyWrappedLoadSpec()
+ {
+ MatcherAssert.assertThat(
+ Assertions.assertThrows(
+ DruidException.class,
+ () -> PartialLoadProfile.forRequest(Map.of(), FINGERPRINT)
+ ),
+
DruidExceptionMatcher.invalidInput().expectMessageContains("wrappedLoadSpec
must not be null or empty")
+ );
+ }
+
+ @Test
+ public void testForLoaded()
+ {
+ PartialLoadProfile profile = PartialLoadProfile.forLoaded(WRAPPED,
FINGERPRINT, 12345L);
+ Assertions.assertEquals(WRAPPED, profile.wrappedLoadSpec());
+ Assertions.assertEquals(FINGERPRINT, profile.fingerprint());
+ Assertions.assertEquals(12345L, profile.loadedBytes());
+ Assertions.assertFalse(profile.isFullFallback());
+ }
+
+ @Test
+ public void testForLoadedRejectsEmptyWrappedLoadSpec()
+ {
+ MatcherAssert.assertThat(
+ Assertions.assertThrows(
+ DruidException.class,
+ () -> PartialLoadProfile.forLoaded(Map.of(), FINGERPRINT, 100L)
+ ),
+
DruidExceptionMatcher.invalidInput().expectMessageContains("wrappedLoadSpec
must not be null or empty")
+ );
+ }
+
+ @Test
+ public void testForFullFallback()
+ {
+ PartialLoadProfile profile =
PartialLoadProfile.forFullFallback(FINGERPRINT, 99999L);
+ Assertions.assertNull(profile.wrappedLoadSpec());
+ Assertions.assertEquals(FINGERPRINT, profile.fingerprint());
+ Assertions.assertEquals(99999L, profile.loadedBytes());
+ Assertions.assertTrue(profile.isFullFallback());
+ }
+
+ @Test
+ public void testFingerprintRequired()
+ {
+ Assertions.assertThrows(
+ NullPointerException.class,
+ () -> PartialLoadProfile.forRequest(WRAPPED, null)
+ );
+ }
+
+ @Test
+ public void testDefensiveCopyOfWrappedLoadSpec()
+ {
+ Map<String, Object> mutable = new HashMap<>();
+ mutable.put("type", "partialProjection");
+ PartialLoadProfile profile = PartialLoadProfile.forRequest(mutable,
FINGERPRINT);
+ mutable.put("extra", "added-after");
+ Assertions.assertFalse(profile.wrappedLoadSpec().containsKey("extra"));
+ }
+
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(PartialLoadProfile.class)
+ .withNonnullFields("fingerprint")
+ .usingGetClass()
+ .verify();
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/rules/PartialLoadRuleTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/rules/PartialLoadRuleTest.java
index f7cf0f2bc07..35113529c01 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/rules/PartialLoadRuleTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/rules/PartialLoadRuleTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.server.coordinator.loading.PartialLoadProfile;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -291,8 +292,10 @@ public class PartialLoadRuleTest
}
@Test
- void testRunForwardsToReplicateSegment()
+ void testRunWithMatchRoutesToReplicateSegmentPartially()
{
+ // Matcher resolves to a non-empty set on the segment, so run() routes
through the partial-load handler with a
+ // PartialLoadProfile carrying the resolved set + fingerprint.
PeriodPartialLoadRule rule = new PeriodPartialLoadRule(
new Period("P30D"),
false,
@@ -304,10 +307,36 @@ public class PartialLoadRuleTest
DataSegment segment = segmentWithProjections(IN_WINDOW, List.of("a", "b"));
RecordingHandler handler = new RecordingHandler();
rule.run(segment, handler);
+ Assertions.assertEquals(0, handler.replicateCalls);
+ Assertions.assertEquals(1, handler.replicatePartialCalls);
+ Assertions.assertEquals(tier(2), handler.lastTierToReplicaCount);
+ Assertions.assertNotNull(handler.lastProfile);
+ Assertions.assertNotNull(handler.lastProfile.wrappedLoadSpec());
+ Assertions.assertEquals("partialProjection",
handler.lastProfile.wrappedLoadSpec().get("type"));
+ Assertions.assertEquals(List.of("a"),
handler.lastProfile.wrappedLoadSpec().get("projections"));
+ Assertions.assertTrue(handler.lastProfile.fingerprint().startsWith("v1:"));
+ }
+
+ @Test
+ void testRunWithFullLoadFallbackRoutesToReplicateSegment()
+ {
+ // Matcher does not apply but the rule's onCannotMatch is FULL_LOAD, so
run() falls back to the regular full-load
+ // handler.
+ PeriodPartialLoadRule rule = new PeriodPartialLoadRule(
+ new Period("P30D"),
+ false,
+ tier(2),
+ null,
+ exact("nonexistent"),
+ CannotMatchBehavior.FULL_LOAD
+ );
+ DataSegment segment = segmentWithProjections(IN_WINDOW, List.of("a", "b"));
+ RecordingHandler handler = new RecordingHandler();
+ rule.run(segment, handler);
Assertions.assertEquals(1, handler.replicateCalls);
- Assertions.assertEquals(0, handler.broadcastCalls);
- Assertions.assertEquals(0, handler.deleteCalls);
+ Assertions.assertEquals(0, handler.replicatePartialCalls);
Assertions.assertEquals(tier(2), handler.lastTierToReplicaCount);
+ Assertions.assertNull(handler.lastProfile);
}
@Test
@@ -363,9 +392,11 @@ public class PartialLoadRuleTest
private static class RecordingHandler implements SegmentActionHandler
{
int replicateCalls;
+ int replicatePartialCalls;
int broadcastCalls;
int deleteCalls;
Map<String, Integer> lastTierToReplicaCount;
+ PartialLoadProfile lastProfile;
@Override
public void replicateSegment(DataSegment segment, Map<String, Integer>
tierToReplicaCount)
@@ -374,6 +405,18 @@ public class PartialLoadRuleTest
lastTierToReplicaCount = tierToReplicaCount;
}
+ @Override
+ public void replicateSegmentPartially(
+ DataSegment segment,
+ PartialLoadProfile profile,
+ Map<String, Integer> tierToReplicaCount
+ )
+ {
+ replicatePartialCalls++;
+ lastProfile = profile;
+ lastTierToReplicaCount = tierToReplicaCount;
+ }
+
@Override
public void deleteSegment(DataSegment segment)
{
diff --git
a/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java
b/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java
index 0819439a0aa..2a21bee73c0 100644
---
a/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java
@@ -49,4 +49,32 @@ public class SegmentLoadingCapabilitiesTest
Assert.assertEquals(3, reread.getNumLoadingThreads());
Assert.assertEquals(5, reread.getNumTurboLoadingThreads());
}
+
+ @Test
+ public void testTwoArgConstructorDefaultsPartialLoadToFalse()
+ {
+ SegmentLoadingCapabilities capabilities = new
SegmentLoadingCapabilities(1, 4);
+ Assert.assertFalse(capabilities.isSupportsPartialLoad());
+ }
+
+ @Test
+ public void testSerdeWithPartialLoadTrue() throws Exception
+ {
+ SegmentLoadingCapabilities capabilities = new
SegmentLoadingCapabilities(1, 4, true);
+ SegmentLoadingCapabilities reread = jsonMapper.readValue(
+ jsonMapper.writeValueAsString(capabilities),
+ SegmentLoadingCapabilities.class
+ );
+ Assert.assertTrue(reread.isSupportsPartialLoad());
+ }
+
+ @Test
+ public void testOldPayloadDeserializesWithPartialLoadFalse() throws
JsonProcessingException
+ {
+ // An older historical that doesn't include the new field should
deserialize cleanly with the field defaulting to
+ // false. The coordinator then conservatively avoids sending wrapped
LoadSpecs to that historical.
+ String json = "{\"numLoadingThreads\":3,\"numTurboLoadingThreads\":5}";
+ SegmentLoadingCapabilities reread = jsonMapper.readValue(json,
SegmentLoadingCapabilities.class);
+ Assert.assertFalse(reread.isSupportsPartialLoad());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]