This is an automated email from the ASF dual-hosted git repository.

clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 174b3a7d4cc feat: clustered segments DataSegment changes and 
PartialLoadMatcher (#19462)
174b3a7d4cc is described below

commit 174b3a7d4cc64b19f563e812104c7bffe59c6b6e
Author: Clint Wylie <[email protected]>
AuthorDate: Fri May 15 01:56:23 2026 -0700

    feat: clustered segments DataSegment changes and PartialLoadMatcher (#19462)
---
 .../msq/input/table/DataSegmentWithLocation.java   |   7 +-
 ...dSpec.java => PartialClusterGroupLoadSpec.java} |  48 +--
 .../segment/loading/PartialProjectionLoadSpec.java |   5 -
 .../apache/druid/timeline/ClusterGroupTuples.java  | 181 +++++++++
 .../org/apache/druid/timeline/DataSegment.java     | 102 +++++
 .../partition/BaseDimensionRangeShardSpec.java     |   7 +-
 .../loading/PartialClusterGroupLoadSpecTest.java   | 222 ++++++++++
 .../druid/timeline/ClusterGroupTuplesTest.java     | 360 +++++++++++++++++
 .../timeline/DataSegmentClusterGroupsTest.java     | 175 ++++++++
 .../apache/druid/guice/PartialLoadSpecModule.java  |  13 +-
 .../server/coordination/LoadableDataSegment.java   |   3 +
 .../rules/ClusterGroupPartialLoadMatcher.java      |  79 ++++
 .../druid/server/coordinator/rules/Globs.java      | 182 +++++++++
 .../coordinator/rules/PartialLoadMatcher.java      |   3 +-
 .../WildcardClusterGroupPartialLoadMatcher.java    | 339 ++++++++++++++++
 .../WildcardProjectionPartialLoadMatcher.java      |  91 +----
 .../druid/client/CachingClusteredClientTest.java   |   1 +
 .../server/coordinator/CreateDataSegments.java     |   1 +
 .../druid/server/coordinator/rules/GlobsTest.java  | 144 +++++++
 ...WildcardClusterGroupPartialLoadMatcherTest.java | 449 +++++++++++++++++++++
 20 files changed, 2282 insertions(+), 130 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java
index 72e765d620d..860a9d656d4 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java
@@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
 import org.apache.druid.jackson.CommaListJoinDeserializer;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.timeline.ClusterGroupTuples;
 import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.ShardSpec;
@@ -54,8 +55,8 @@ public class DataSegmentWithLocation extends DataSegment
       @JsonProperty("dimensions") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable
       List<String> dimensions,
       @JsonProperty("metrics") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable List<String> metrics,
-      @JsonProperty("projections") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable
-      List<String> projections,
+      @JsonProperty("projections") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable List<String> projections,
+      @JsonProperty("clusterGroups") @Nullable ClusterGroupTuples 
clusterGroups,
       @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
       @JsonProperty("lastCompactionState") @Nullable CompactionState 
lastCompactionState,
       @JsonProperty("binaryVersion") Integer binaryVersion,
@@ -74,6 +75,7 @@ public class DataSegmentWithLocation extends DataSegment
         dimensions,
         metrics,
         projections,
+        clusterGroups,
         shardSpec,
         lastCompactionState,
         binaryVersion,
@@ -98,6 +100,7 @@ public class DataSegmentWithLocation extends DataSegment
         dataSegment.getDimensions(),
         dataSegment.getMetrics(),
         dataSegment.getProjections(),
+        dataSegment.getClusterGroups(),
         dataSegment.getShardSpec(),
         null,
         dataSegment.getBinaryVersion(),
diff --git 
a/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java
 
b/processing/src/main/java/org/apache/druid/segment/loading/PartialClusterGroupLoadSpec.java
similarity index 64%
copy from 
processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java
copy to 
processing/src/main/java/org/apache/druid/segment/loading/PartialClusterGroupLoadSpec.java
index 07c642a2887..86e562010c0 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/loading/PartialClusterGroupLoadSpec.java
@@ -32,61 +32,57 @@ import java.util.Map;
 import java.util.Objects;
 
 /**
- * A {@link PartialLoadSpec} that requests partial loading of a segment's 
projections. The base class carries the
- * common {@code fingerprint} and {@code delegate} wire fields; this subtype 
adds the resolved projection names that
+ * A {@link PartialLoadSpec} that requests partial loading of a clustered 
segment's cluster groups. The base class
+ * carries the common {@code fingerprint} and {@code delegate} wire fields; 
this subtype adds the resolved
+ * {@code clusterGroupIndices} (positions into {@link 
org.apache.druid.timeline.ClusterGroupTuples#getTuples()}) that
  * the historical should range-read into the local segment.
- * <p>
- * The historical-side partial-load path inspects this wrapper at mount time. 
Until that path exists, the base
- * class's default {@link #loadSegment} performs a full download via the inner 
delegate, and the announcement layer
- * stamps the fingerprint + full size on the response so the coordinator's 
reconciler counts the replica as a
- * satisfying full-fallback rather than re-queuing the load.
  */
-@JsonTypeName(PartialProjectionLoadSpec.TYPE)
-public class PartialProjectionLoadSpec extends PartialLoadSpec
+@JsonTypeName(PartialClusterGroupLoadSpec.TYPE)
+public class PartialClusterGroupLoadSpec extends PartialLoadSpec
 {
-  public static final String TYPE = "partialProjection";
+  public static final String TYPE = "partialClusterGroup";
 
   /**
-   * Builds the raw wire-form {@link Map} representation of a {@link 
PartialProjectionLoadSpec} request. Used by the
+   * Builds the raw wire-form {@link Map} representation of a {@link 
PartialClusterGroupLoadSpec} request. Used by the
    * coordinator-side matcher (which doesn't instantiate the typed class 
because doing so would require plumbing an
    * {@link ObjectMapper} through every matcher just to satisfy the 
constructor's lazy-delegate supplier).
    */
   public static Map<String, Object> wireForm(
       Map<String, Object> delegate,
-      List<String> projections,
+      List<Integer> clusterGroupIndices,
       String fingerprint
   )
   {
     return Map.of(
         "type", TYPE,
         "delegate", delegate,
-        "projections", projections,
+        "clusterGroupIndices", clusterGroupIndices,
         "fingerprint", fingerprint
     );
   }
 
-  private final List<String> projections;
+  private final List<Integer> clusterGroupIndices;
 
   @JsonCreator
-  public PartialProjectionLoadSpec(
+  public PartialClusterGroupLoadSpec(
       @JsonProperty("delegate") Map<String, Object> delegate,
-      @JsonProperty("projections") List<String> projections,
+      @JsonProperty("clusterGroupIndices") List<Integer> clusterGroupIndices,
       @JsonProperty("fingerprint") String fingerprint,
       @JacksonInject ObjectMapper jsonMapper
   )
   {
     super(delegate, fingerprint, jsonMapper);
     Preconditions.checkArgument(
-        !CollectionUtils.isNullOrEmpty(projections),
-        "projections must not be null or empty"
+        !CollectionUtils.isNullOrEmpty(clusterGroupIndices),
+        "clusterGroupIndices must not be null or empty"
     );
-    this.projections = List.copyOf(projections);
+    this.clusterGroupIndices = List.copyOf(clusterGroupIndices);
   }
 
   @JsonProperty
-  public List<String> getProjections()
+  public List<Integer> getClusterGroupIndices()
   {
-    return projections;
+    return clusterGroupIndices;
   }
 
   @Override
@@ -98,24 +94,24 @@ public class PartialProjectionLoadSpec extends 
PartialLoadSpec
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    PartialProjectionLoadSpec that = (PartialProjectionLoadSpec) o;
+    PartialClusterGroupLoadSpec that = (PartialClusterGroupLoadSpec) o;
     return Objects.equals(getDelegate(), that.getDelegate())
-        && Objects.equals(projections, that.projections)
+        && Objects.equals(clusterGroupIndices, that.clusterGroupIndices)
         && Objects.equals(getFingerprint(), that.getFingerprint());
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(getDelegate(), projections, getFingerprint());
+    return Objects.hash(getDelegate(), clusterGroupIndices, getFingerprint());
   }
 
   @Override
   public String toString()
   {
-    return "PartialProjectionLoadSpec{" +
+    return "PartialClusterGroupLoadSpec{" +
            "delegate=" + getDelegate() +
-           ", projections=" + projections +
+           ", clusterGroupIndices=" + clusterGroupIndices +
            ", fingerprint=" + getFingerprint() +
            '}';
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java
 
b/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java
index 07c642a2887..737a1065589 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java
@@ -35,11 +35,6 @@ import java.util.Objects;
  * A {@link PartialLoadSpec} that requests partial loading of a segment's 
projections. The base class carries the
  * common {@code fingerprint} and {@code delegate} wire fields; this subtype 
adds the resolved projection names that
  * the historical should range-read into the local segment.
- * <p>
- * The historical-side partial-load path inspects this wrapper at mount time. 
Until that path exists, the base
- * class's default {@link #loadSegment} performs a full download via the inner 
delegate, and the announcement layer
- * stamps the fingerprint + full size on the response so the coordinator's 
reconciler counts the replica as a
- * satisfying full-fallback rather than re-queuing the load.
  */
 @JsonTypeName(PartialProjectionLoadSpec.TYPE)
 public class PartialProjectionLoadSpec extends PartialLoadSpec
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/ClusterGroupTuples.java 
b/processing/src/main/java/org/apache/druid/timeline/ClusterGroupTuples.java
new file mode 100644
index 00000000000..7b16295d3a1
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/timeline/ClusterGroupTuples.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Typed clustering tuples carried on {@link DataSegment#getClusterGroups()} 
for clustered base-table segments. Each
+ * entry in {@link #tuples()} is one cluster group's clustering-column values, 
in the order declared by
+ * {@link #clusteringColumns()}. Optionally carries the clustering {@link 
VirtualColumns} when the segment was
+ * clustered on a virtual-column expression, so that matching for things like 
partial load rules and query time
+ * segment pruning can make use of this information.
+ * <p>
+ * The compact constructor validates {@code clusteringColumns}, interns the 
virtual columns through
+ * {@link DataSegment#virtualColumnInterner()}, and canonicalizes every tuple 
value to its declared
+ * {@link ColumnType} via {@link #coerceValue} so {@link Object#equals} works 
across the JSON/programmatic boundary.
+ */
+public record ClusterGroupTuples(
+    @JsonProperty("clusteringColumns") RowSignature clusteringColumns,
+    @JsonProperty("virtualColumns") 
@JsonInclude(JsonInclude.Include.NON_EMPTY) VirtualColumns virtualColumns,
+    @JsonProperty("tuples") List<List<Object>> tuples
+)
+{
+  @JsonCreator
+  public ClusterGroupTuples
+  {
+    if (clusteringColumns == null || clusteringColumns.size() == 0) {
+      throw InvalidInput.exception("clusteringColumns must not be null or 
empty");
+    }
+    virtualColumns = internVirtualColumns(virtualColumns);
+    tuples = canonicalizeTuples(clusteringColumns, tuples);
+  }
+
+  /**
+   * Convenience constructor for callers that don't carry clustering virtual 
columns. Equivalent to passing
+   * {@code null} for the virtual columns argument.
+   */
+  public ClusterGroupTuples(RowSignature clusteringColumns, @Nullable 
List<List<Object>> tuples)
+  {
+    this(clusteringColumns, null, tuples);
+  }
+
+  /**
+   * Canonicalize {@code raw} for the declared clustering column {@code type}. 
This is intentionally narrow: its job
+   * is to unbreak Jackson's number-type narrowing (e.g., an Integer arriving 
for a LONG column gets normalized to a
+   * Long), not to do general value coercion. Rules:
+   * <ul>
+   *   <li>{@code null} → {@code null}.</li>
+   *   <li>STRING: {@link Objects#toString} on any non-null value 
(stringifying numeric operator input is benign).</li>
+   *   <li>LONG / DOUBLE / FLOAT: require {@link Number}; return via the 
matching primitive accessor. Strings,
+   *       Booleans, etc. are rejected — typed rule authoring should produce 
typed JSON, and silently parsing
+   *       strings risks accepting operator typos that change the matched 
set.</li>
+   * </ul>
+   * Unsupported column types (anything that isn't STRING/LONG/DOUBLE/FLOAT) 
are rejected.
+   * <p>
+   * Used by:
+   * <ul>
+   *   <li>{@link ClusterGroupTuples}'s compact constructor to canonicalize 
segment-side tuples (strict).</li>
+   *   <li>Operator-supplied rule tuples in future cluster-group partial-load 
matchers, which can catch the
+   *       exception and treat it as "no match for this segment" rather than a 
hard failure.</li>
+   * </ul>
+   */
+  @Nullable
+  public static Object coerceValue(String columnName, ColumnType type, 
@Nullable Object raw)
+  {
+    if (raw == null) {
+      return null;
+    }
+    if (ColumnType.STRING.equals(type)) {
+      return raw instanceof String ? raw : Objects.toString(raw);
+    }
+    if (ColumnType.LONG.equals(type)) {
+      if (raw instanceof Number) {
+        return ((Number) raw).longValue();
+      }
+      throw cannotCoerce(raw, columnName, "LONG");
+    }
+    if (ColumnType.DOUBLE.equals(type)) {
+      if (raw instanceof Number) {
+        return ((Number) raw).doubleValue();
+      }
+      throw cannotCoerce(raw, columnName, "DOUBLE");
+    }
+    if (ColumnType.FLOAT.equals(type)) {
+      if (raw instanceof Number) {
+        return ((Number) raw).floatValue();
+      }
+      throw cannotCoerce(raw, columnName, "FLOAT");
+    }
+    throw InvalidInput.exception(
+        "Unsupported clustering column type [%s] for column [%s]; supported 
types are STRING, LONG, DOUBLE, FLOAT",
+        type,
+        columnName
+    );
+  }
+
+  private static VirtualColumns internVirtualColumns(@Nullable VirtualColumns 
virtualColumns)
+  {
+    if (virtualColumns == null || virtualColumns.isEmpty()) {
+      return VirtualColumns.EMPTY;
+    }
+    return VirtualColumns.create(
+        Arrays.stream(virtualColumns.getVirtualColumns())
+              .map(DataSegment.virtualColumnInterner()::intern)
+              .toList()
+    );
+  }
+
+  private static List<List<Object>> canonicalizeTuples(
+      RowSignature clusteringColumns,
+      @Nullable List<List<Object>> tuples
+  )
+  {
+    final List<List<Object>> source = tuples == null ? Collections.emptyList() 
: tuples;
+    final int numCols = clusteringColumns.size();
+    final List<List<Object>> coerced = new ArrayList<>(source.size());
+    for (int t = 0; t < source.size(); t++) {
+      final List<Object> tuple = source.get(t);
+      if (tuple == null || tuple.size() != numCols) {
+        throw InvalidInput.exception(
+            "tuple[%s] has size [%s] but clusteringColumns size is [%s]",
+            t,
+            tuple == null ? "null" : tuple.size(),
+            numCols
+        );
+      }
+      final Object[] out = new Object[numCols];
+      for (int i = 0; i < numCols; i++) {
+        final String name = clusteringColumns.getColumnName(i);
+        final ColumnType type = clusteringColumns.getColumnType(i).orElseThrow(
+            () -> InvalidInput.exception("clusteringColumn[%s] has no declared 
type", name)
+        );
+        out[i] = coerceValue(name, type, tuple.get(i));
+      }
+      coerced.add(Collections.unmodifiableList(Arrays.asList(out)));
+    }
+    return Collections.unmodifiableList(coerced);
+  }
+
+  private static DruidException cannotCoerce(Object raw, String columnName, 
String targetType)
+  {
+    return InvalidInput.exception(
+        "Cannot coerce value [%s] of type [%s] for column [%s] to %s",
+        raw,
+        raw.getClass().getName(),
+        columnName,
+        targetType
+    );
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/DataSegment.java 
b/processing/src/main/java/org/apache/druid/timeline/DataSegment.java
index 601ea7613c5..f9375db5ae9 100644
--- a/processing/src/main/java/org/apache/druid/timeline/DataSegment.java
+++ b/processing/src/main/java/org/apache/druid/timeline/DataSegment.java
@@ -39,6 +39,7 @@ import org.apache.druid.jackson.CommaListJoinDeserializer;
 import org.apache.druid.jackson.CommaListJoinSerializer;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.ShardSpec;
 import org.joda.time.Interval;
@@ -65,6 +66,18 @@ public class DataSegment implements Comparable<DataSegment>, 
Overshadowable<Data
     return STRING_INTERNER;
   }
 
+  /**
+   * Shared canonical interner for {@link 
org.apache.druid.segment.VirtualColumn} instances embedded in segment-side
+   * metadata that lives in broker/coordinator memory. Used by callers that 
carry virtual columns through
+   * {@link DataSegment}'s wire form (clustering virtual columns on {@link 
ClusterGroupTuples},
+   * {@link org.apache.druid.timeline.partition.BaseDimensionRangeShardSpec}'s 
range-clustering virtual columns) so
+   * identical VC definitions across segments collapse to a single instance in 
memory.
+   */
+  public static Interner<VirtualColumn> virtualColumnInterner()
+  {
+    return VIRTUAL_COLUMN_INTERNER;
+  }
+
   /*
    * The difference between this class and org.apache.druid.segment.Segment is 
that this class contains the segment
    * metadata only, while org.apache.druid.segment.Segment represents the 
actual body of segment data, queryable.
@@ -93,6 +106,8 @@ public class DataSegment implements Comparable<DataSegment>, 
Overshadowable<Data
   private static final Interner<List<String>> DIMENSIONS_INTERNER = 
Interners.newWeakInterner();
   private static final Interner<List<String>> METRICS_INTERNER = 
Interners.newWeakInterner();
   private static final Interner<List<String>> PROJECTIONS_INTERNER = 
Interners.newWeakInterner();
+  private static final Interner<ClusterGroupTuples> CLUSTER_GROUPS_INTERNER = 
Interners.newWeakInterner();
+  private static final Interner<VirtualColumn> VIRTUAL_COLUMN_INTERNER = 
Interners.newWeakInterner();
   private static final Interner<CompactionState> COMPACTION_STATE_INTERNER = 
Interners.newWeakInterner();
   private static final Map<String, Object> PRUNED_LOAD_SPEC = ImmutableMap.of(
       "load spec is pruned, because it's not needed on Brokers, but eats a lot 
of heap space",
@@ -106,6 +121,13 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
   private final List<String> dimensions;
   private final List<String> metrics;
   private final List<String> projections;
+  /**
+   * Typed clustering tuples for clustered base-table segments, or {@code 
null} for non-clustered segments. Each tuple
+   * is one cluster group's clustering-column values in the order declared by
+   * {@link ClusterGroupTuples#getClusteringColumns()}. Consumed by 
cluster-group partial-load matchers.
+   */
+  @Nullable
+  private final ClusterGroupTuples clusterGroups;
   private final ShardSpec shardSpec;
 
   /**
@@ -154,6 +176,7 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
         dimensions,
         metrics,
         null,
+        null,
         shardSpec,
         null,
         binaryVersion,
@@ -189,6 +212,7 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
         dimensions,
         metrics,
         null,
+        null,
         shardSpec,
         lastCompactionState,
         binaryVersion,
@@ -199,6 +223,46 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
     );
   }
 
+  /**
+   * @deprecated use {@link #builder(SegmentId)} or {@link 
#builder(DataSegment)} instead.
+   */
+  @Deprecated
+  public DataSegment(
+      String dataSource,
+      Interval interval,
+      String version,
+      @Nullable Map<String, Object> loadSpec,
+      @Nullable List<String> dimensions,
+      @Nullable List<String> metrics,
+      @Nullable List<String> projections,
+      @Nullable ShardSpec shardSpec,
+      @Nullable CompactionState lastCompactionState,
+      Integer binaryVersion,
+      long size,
+      Integer totalRows,
+      String indexingStateFingerprint,
+      PruneSpecsHolder pruneSpecsHolder
+  )
+  {
+    this(
+        dataSource,
+        interval,
+        version,
+        loadSpec,
+        dimensions,
+        metrics,
+        projections,
+        null,
+        shardSpec,
+        lastCompactionState,
+        binaryVersion,
+        size,
+        totalRows,
+        indexingStateFingerprint,
+        pruneSpecsHolder
+    );
+  }
+
   @JsonCreator
   private DataSegment(
       @JsonProperty("dataSource") String dataSource,
@@ -212,6 +276,7 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
       @JsonProperty("metrics") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable List<String> metrics,
       @JsonProperty("projections") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable
       List<String> projections,
+      @JsonProperty("clusterGroups") @Nullable ClusterGroupTuples 
clusterGroups,
       @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
       @JsonProperty("lastCompactionState") @Nullable CompactionState 
lastCompactionState,
       @JsonProperty("binaryVersion") Integer binaryVersion,
@@ -229,6 +294,7 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
         dimensions,
         metrics,
         projections,
+        clusterGroups,
         shardSpec,
         lastCompactionState,
         binaryVersion,
@@ -247,6 +313,7 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
       @Nullable List<String> dimensions,
       @Nullable List<String> metrics,
       @Nullable List<String> projections,
+      @Nullable ClusterGroupTuples clusterGroups,
       @Nullable ShardSpec shardSpec,
       @Nullable CompactionState lastCompactionState,
       Integer binaryVersion,
@@ -264,6 +331,7 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
     // A null value for projections means that this segment is not aware of 
projections (launched in druid 32).
     // An empty list means that this segment is projection-aware, but has no 
projections.
     this.projections = projections == null ? null : 
prepareWithInterner(projections, PROJECTIONS_INTERNER);
+    this.clusterGroups = prepareClusterGroups(clusterGroups);
     this.shardSpec = (shardSpec == null) ? new NumberedShardSpec(0, 1) : 
shardSpec;
     this.lastCompactionState = pruneSpecsHolder.pruneLastCompactionState
                                ? null
@@ -331,6 +399,14 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
     return projections;
   }
 
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public ClusterGroupTuples getClusterGroups()
+  {
+    return clusterGroups;
+  }
+
   @JsonProperty
   public ShardSpec getShardSpec()
   {
@@ -461,6 +537,11 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
     return builder(this).projections(projections).build();
   }
 
+  public DataSegment withClusterGroups(@Nullable ClusterGroupTuples 
clusterGroups)
+  {
+    return builder(this).clusterGroups(clusterGroups).build();
+  }
+
   public DataSegment withShardSpec(ShardSpec newSpec)
   {
     return builder(this).shardSpec(newSpec).build();
@@ -527,6 +608,7 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
            ", dimensions=" + dimensions +
            ", metrics=" + metrics +
            ", projections=" + projections +
+           ", clusterGroups=" + clusterGroups +
            ", shardSpec=" + shardSpec +
            ", lastCompactionState=" + lastCompactionState +
            ", size=" + size +
@@ -559,6 +641,15 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
     return COMPACTION_STATE_INTERNER.intern(lastCompactionState);
   }
 
+  @Nullable
+  private static ClusterGroupTuples prepareClusterGroups(@Nullable 
ClusterGroupTuples clusterGroups)
+  {
+    if (clusterGroups == null) {
+      return null;
+    }
+    return CLUSTER_GROUPS_INTERNER.intern(clusterGroups);
+  }
+
   /**
    * Returns a list of strings with all empty strings removed and all strings 
interned.
    * <p>
@@ -606,6 +697,8 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
     private List<String> dimensions;
     private List<String> metrics;
     private List<String> projections;
+    @Nullable
+    private ClusterGroupTuples clusterGroups;
     private ShardSpec shardSpec;
     private CompactionState lastCompactionState;
     private Integer binaryVersion;
@@ -651,6 +744,7 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
       this.dimensions = segment.getDimensions();
       this.metrics = segment.getMetrics();
       this.projections = segment.getProjections();
+      this.clusterGroups = segment.getClusterGroups();
       this.shardSpec = segment.getShardSpec();
       this.lastCompactionState = segment.getLastCompactionState();
       this.binaryVersion = segment.getBinaryVersion();
@@ -668,6 +762,7 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
       this.dimensions = segmentBuilder.dimensions;
       this.metrics = segmentBuilder.metrics;
       this.projections = segmentBuilder.projections;
+      this.clusterGroups = segmentBuilder.clusterGroups;
       this.shardSpec = segmentBuilder.shardSpec;
       this.lastCompactionState = segmentBuilder.lastCompactionState;
       this.binaryVersion = segmentBuilder.binaryVersion;
@@ -718,6 +813,12 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
       return this;
     }
 
+    public Builder clusterGroups(@Nullable ClusterGroupTuples clusterGroups)
+    {
+      this.clusterGroups = clusterGroups;
+      return this;
+    }
+
     public Builder shardSpec(ShardSpec shardSpec)
     {
       this.shardSpec = shardSpec;
@@ -770,6 +871,7 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
           dimensions,
           metrics,
           projections,
+          clusterGroups,
           shardSpec,
           lastCompactionState,
           binaryVersion,
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java
 
b/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java
index d7a9d2018b0..36a761a0955 100644
--- 
a/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java
@@ -19,14 +19,11 @@
 
 package org.apache.druid.timeline.partition;
 
-import com.google.common.collect.Interner;
-import com.google.common.collect.Interners;
 import com.google.common.collect.Ordering;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.guava.Comparators;
-import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.timeline.DataSegment;
 
@@ -37,8 +34,6 @@ import java.util.List;
 
 public abstract class BaseDimensionRangeShardSpec implements ShardSpec
 {
-  private static final Interner<VirtualColumn> VIRTUAL_COLUMN_INTERNER = 
Interners.newWeakInterner();
-
   protected final List<String> dimensions;
   protected final VirtualColumns virtualColumns;
   @Nullable
@@ -57,7 +52,7 @@ public abstract class BaseDimensionRangeShardSpec implements 
ShardSpec
     this.virtualColumns = virtualColumns == null
                           ? VirtualColumns.EMPTY
                           : 
VirtualColumns.create(Arrays.stream(virtualColumns.getVirtualColumns())
-                                                        
.map(VIRTUAL_COLUMN_INTERNER::intern)
+                                                        
.map(DataSegment.virtualColumnInterner()::intern)
                                                         .toList());
     this.start = start;
     this.end = end;
diff --git 
a/processing/src/test/java/org/apache/druid/segment/loading/PartialClusterGroupLoadSpecTest.java
 
b/processing/src/test/java/org/apache/druid/segment/loading/PartialClusterGroupLoadSpecTest.java
new file mode 100644
index 00000000000..5474617ea32
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/loading/PartialClusterGroupLoadSpecTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class PartialClusterGroupLoadSpecTest
+{
+  private static final Map<String, Object> DELEGATE = ImmutableMap.of(
+      "type", "stub",
+      "path", "/var/druid/segments/foo"
+  );
+  private static final String FINGERPRINT = "v1:abcdef0123456789";
+
+  private static ObjectMapper configuredMapper()
+  {
+    final ObjectMapper m = new DefaultObjectMapper();
+    final SimpleModule module = new SimpleModule();
+    module.registerSubtypes(PartialClusterGroupLoadSpec.class, 
StubLoadSpec.class);
+    m.registerModule(module);
+    m.setInjectableValues(new 
InjectableValues.Std().addValue(ObjectMapper.class, m));
+    return m;
+  }
+
+  private final ObjectMapper jsonMapper = configuredMapper();
+
+  @Test
+  void testJsonRoundTrip() throws Exception
+  {
+    PartialClusterGroupLoadSpec spec = new PartialClusterGroupLoadSpec(
+        DELEGATE,
+        List.of(0, 2, 5),
+        FINGERPRINT,
+        jsonMapper
+    );
+    String json = jsonMapper.writeValueAsString(spec);
+    LoadSpec reread = jsonMapper.readValue(json, LoadSpec.class);
+    Assertions.assertInstanceOf(PartialClusterGroupLoadSpec.class, reread);
+    Assertions.assertEquals(spec, reread);
+  }
+
+  @Test
+  void testWireFormHasPartialClusterGroupType() throws Exception
+  {
+    PartialClusterGroupLoadSpec spec = new PartialClusterGroupLoadSpec(
+        DELEGATE,
+        List.of(0, 1),
+        FINGERPRINT,
+        jsonMapper
+    );
+    Map<String, Object> wireForm = jsonMapper.readValue(
+        jsonMapper.writeValueAsString(spec),
+        new TypeReference<>()
+        {
+        }
+    );
+    Assertions.assertEquals("partialClusterGroup", wireForm.get("type"));
+    Assertions.assertEquals(DELEGATE, wireForm.get("delegate"));
+    Assertions.assertEquals(List.of(0, 1), 
wireForm.get("clusterGroupIndices"));
+    Assertions.assertEquals(FINGERPRINT, wireForm.get("fingerprint"));
+  }
+
+  @Test
+  void testLoadSegmentDelegatesToInner() throws Exception
+  {
+    PartialClusterGroupLoadSpec spec = new PartialClusterGroupLoadSpec(
+        DELEGATE,
+        List.of(0),
+        FINGERPRINT,
+        jsonMapper
+    );
+    StubLoadSpec.LOAD_CALLS.set(0);
+    LoadSpec.LoadSpecResult result = spec.loadSegment(new File("/tmp/dest"));
+    Assertions.assertEquals(1, StubLoadSpec.LOAD_CALLS.get());
+    Assertions.assertEquals(42L, result.getSize());
+  }
+
+  @Test
+  void testOpenRangeReaderDelegatesToInner() throws Exception
+  {
+    PartialClusterGroupLoadSpec spec = new PartialClusterGroupLoadSpec(
+        DELEGATE,
+        List.of(0),
+        FINGERPRINT,
+        jsonMapper
+    );
+    StubLoadSpec.RANGE_CALLS.set(0);
+    SegmentRangeReader reader = spec.openRangeReader();
+    Assertions.assertNotNull(reader);
+    Assertions.assertEquals(1, StubLoadSpec.RANGE_CALLS.get());
+  }
+
+  @Test
+  void testOpenRangeReaderReturnsNullWhenInnerDoesNotSupport() throws Exception
+  {
+    PartialClusterGroupLoadSpec spec = new PartialClusterGroupLoadSpec(
+        ImmutableMap.of("type", "stub", "path", "/", "supportsRange", false),
+        List.of(0),
+        FINGERPRINT,
+        jsonMapper
+    );
+    Assertions.assertNull(spec.openRangeReader());
+  }
+
+  @Test
+  void testRejectsNullDelegate()
+  {
+    Assertions.assertThrows(
+        NullPointerException.class,
+        () -> new PartialClusterGroupLoadSpec(null, List.of(0), "v1:x", 
jsonMapper)
+    );
+  }
+
+  @Test
+  void testRejectsNullOrEmptyClusterGroupIndices()
+  {
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> new PartialClusterGroupLoadSpec(DELEGATE, null, "v1:x", 
jsonMapper)
+    );
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> new PartialClusterGroupLoadSpec(DELEGATE, List.of(), "v1:x", 
jsonMapper)
+    );
+  }
+
+  @Test
+  void testRejectsNullFingerprint()
+  {
+    Assertions.assertThrows(
+        NullPointerException.class,
+        () -> new PartialClusterGroupLoadSpec(DELEGATE, List.of(0), null, 
jsonMapper)
+    );
+  }
+
+  /**
+   * Stub LoadSpec used to verify delegation. Uses the same JSON 
"type"=="stub" key as the test {@link #DELEGATE}.
+   */
+  @JsonTypeName("stub")
+  public static class StubLoadSpec implements LoadSpec
+  {
+    static final AtomicInteger LOAD_CALLS = new AtomicInteger(0);
+    static final AtomicInteger RANGE_CALLS = new AtomicInteger(0);
+
+    private final String path;
+    private final boolean supportsRange;
+
+    @JsonCreator
+    public StubLoadSpec(
+        @JsonProperty("path") String path,
+        @JsonProperty("supportsRange") @Nullable Boolean supportsRange
+    )
+    {
+      this.path = path;
+      this.supportsRange = supportsRange == null || supportsRange;
+    }
+
+    @JsonProperty
+    public String getPath()
+    {
+      return path;
+    }
+
+    @JsonProperty
+    public boolean isSupportsRange()
+    {
+      return supportsRange;
+    }
+
+    @Override
+    public LoadSpecResult loadSegment(File destDir)
+    {
+      LOAD_CALLS.incrementAndGet();
+      return new LoadSpecResult(42L);
+    }
+
+    @Override
+    @Nullable
+    public SegmentRangeReader openRangeReader()
+    {
+      if (!supportsRange) {
+        return null;
+      }
+      RANGE_CALLS.incrementAndGet();
+      return (filename, offset, length) -> new ByteArrayInputStream(new 
byte[0]);
+    }
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/timeline/ClusterGroupTuplesTest.java
 
b/processing/src/test/java/org/apache/druid/timeline/ClusterGroupTuplesTest.java
new file mode 100644
index 00000000000..674ad78c502
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/timeline/ClusterGroupTuplesTest.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.hamcrest.MatcherAssert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+class ClusterGroupTuplesTest
+{
+  private static final ObjectMapper MAPPER = new DefaultObjectMapper();
+  private static final VirtualColumns VIRTUAL_COLUMNS = VirtualColumns.create(
+      new ExpressionVirtualColumn(
+          "tenant_lower",
+          "lower(tenant)",
+          ColumnType.STRING,
+          TestExprMacroTable.INSTANCE
+      )
+  );
+
+  private static RowSignature tenantRegion()
+  {
+    return RowSignature.builder()
+                       .add("tenant", ColumnType.STRING)
+                       .add("region", ColumnType.STRING)
+                       .build();
+  }
+
+  private static RowSignature tenantPriority()
+  {
+    return RowSignature.builder()
+                       .add("tenant", ColumnType.STRING)
+                       .add("priority", ColumnType.LONG)
+                       .build();
+  }
+
+  @Test
+  void testConstructorRejectsNullClusteringColumns()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new ClusterGroupTuples(null, List.of(List.of("acme", 
"us-east-1")))
+        ),
+        
DruidExceptionMatcher.invalidInput().expectMessageContains("clusteringColumns 
must not be null or empty")
+    );
+  }
+
+  @Test
+  void testConstructorRejectsEmptyClusteringColumns()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new ClusterGroupTuples(RowSignature.empty(), List.of())
+        ),
+        
DruidExceptionMatcher.invalidInput().expectMessageContains("clusteringColumns 
must not be null or empty")
+    );
+  }
+
+  @Test
+  void testConstructorAllowsEmptyTuples()
+  {
+    final ClusterGroupTuples groups = new ClusterGroupTuples(tenantRegion(), 
List.of());
+    Assertions.assertTrue(groups.tuples().isEmpty());
+  }
+
+  @Test
+  void testConstructorAllowsNullTuplesList()
+  {
+    final ClusterGroupTuples groups = new ClusterGroupTuples(tenantRegion(), 
null);
+    Assertions.assertTrue(groups.tuples().isEmpty());
+  }
+
+  @Test
+  void testConstructorRejectsTupleLengthMismatch()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new ClusterGroupTuples(tenantRegion(), 
List.of(List.of("acme")))
+        ),
+        DruidExceptionMatcher.invalidInput()
+                             .expectMessageContains("tuple[0] has size [1] but 
clusteringColumns size is [2]")
+    );
+  }
+
+  @Test
+  void testConstructorRejectsNullTuple()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new ClusterGroupTuples(tenantRegion(), 
Arrays.asList(Arrays.asList("acme", "us-east-1"), null))
+        ),
+        DruidExceptionMatcher.invalidInput()
+                             .expectMessageContains("tuple[1] has size [null] 
but clusteringColumns size is [2]")
+    );
+  }
+
+  @Test
+  void testConstructorRejectsUntypedClusteringColumn()
+  {
+    final RowSignature untyped = RowSignature.builder().add("tenant", 
null).build();
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new ClusterGroupTuples(untyped, List.of(List.of("acme")))
+        ),
+        
DruidExceptionMatcher.invalidInput().expectMessageContains("clusteringColumn[tenant]
 has no declared type")
+    );
+  }
+
+  @Test
+  void testConstructorRejectsUnsupportedColumnType()
+  {
+    final RowSignature arraySig = RowSignature.builder().add("arr", 
ColumnType.STRING_ARRAY).build();
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new ClusterGroupTuples(arraySig, 
List.of(List.of(List.of("a"))))
+        ),
+        
DruidExceptionMatcher.invalidInput().expectMessageContains("Unsupported 
clustering column type")
+    );
+  }
+
+  @Test
+  void testNullsAllowedAtAnyTuplePosition()
+  {
+    final ClusterGroupTuples groups = new ClusterGroupTuples(
+        tenantRegion(),
+        Arrays.asList(
+            Arrays.asList(null, "us-east-1"),
+            Arrays.asList("acme", null),
+            Arrays.asList(null, null)
+        )
+    );
+    Assertions.assertEquals(3, groups.tuples().size());
+    Assertions.assertNull(groups.tuples().get(0).get(0));
+    Assertions.assertNull(groups.tuples().get(1).get(1));
+    Assertions.assertNull(groups.tuples().get(2).get(0));
+    Assertions.assertNull(groups.tuples().get(2).get(1));
+  }
+
+  @Test
+  void testEqualsAndHashCodeNullSafe()
+  {
+    final ClusterGroupTuples a = new ClusterGroupTuples(
+        tenantRegion(),
+        Arrays.asList(Arrays.asList("acme", null), Arrays.asList(null, 
"us-east-1"))
+    );
+    final ClusterGroupTuples b = new ClusterGroupTuples(
+        tenantRegion(),
+        Arrays.asList(Arrays.asList("acme", null), Arrays.asList(null, 
"us-east-1"))
+    );
+    Assertions.assertEquals(a, b);
+    Assertions.assertEquals(a.hashCode(), b.hashCode());
+  }
+
+  @Test
+  void testCoercionIntegerToLong()
+  {
+    final ClusterGroupTuples groups = new ClusterGroupTuples(
+        tenantPriority(),
+        List.of(List.of("acme", Integer.valueOf(5)))
+    );
+    Assertions.assertEquals(Long.class, 
groups.tuples().get(0).get(1).getClass());
+    Assertions.assertEquals(5L, groups.tuples().get(0).get(1));
+  }
+
+  @Test
+  void testCoercionStringRejectedForLong()
+  {
+    // Coercion is intentionally narrow: only Number-family inputs are 
normalized. A String numeric value is rejected
+    // rather than parsed, so operator typos don't silently broaden the 
matched set in future rule consumers.
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new ClusterGroupTuples(tenantPriority(), 
List.of(List.of("acme", "42")))
+        ),
+        DruidExceptionMatcher.invalidInput()
+                             .expectMessageContains("Cannot coerce value [42] 
of type [java.lang.String] for column [priority] to LONG")
+    );
+  }
+
+  @Test
+  void testCoercionDoubleToFloat()
+  {
+    final RowSignature sig = RowSignature.builder().add("temp", 
ColumnType.FLOAT).build();
+    final ClusterGroupTuples groups = new ClusterGroupTuples(sig, 
List.of(List.of((Object) Double.valueOf(98.6))));
+    Assertions.assertEquals(Float.class, 
groups.tuples().get(0).get(0).getClass());
+    Assertions.assertEquals(98.6f, (Float) groups.tuples().get(0).get(0), 
0.0001f);
+  }
+
+  @Test
+  void testCoercionStringRejectedForDouble()
+  {
+    final RowSignature sig = RowSignature.builder().add("v", 
ColumnType.DOUBLE).build();
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new ClusterGroupTuples(sig, List.of(List.of("3.14")))
+        ),
+        DruidExceptionMatcher.invalidInput()
+                             .expectMessageContains("Cannot coerce value 
[3.14] of type [java.lang.String] for column [v] to DOUBLE")
+    );
+  }
+
+  @Test
+  void testCoercionBooleanRejectedForLong()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new ClusterGroupTuples(tenantPriority(), 
List.of(List.of("acme", (Object) Boolean.TRUE)))
+        ),
+        DruidExceptionMatcher.invalidInput()
+                             .expectMessageContains("Cannot coerce value 
[true] of type [java.lang.Boolean] for column [priority] to LONG")
+    );
+  }
+
+  @Test
+  void testCoercionAcceptsAnyTypeForString()
+  {
+    final RowSignature sig = RowSignature.builder().add("v", 
ColumnType.STRING).build();
+    final ClusterGroupTuples groups = new ClusterGroupTuples(sig, 
List.of(List.of((Object) Long.valueOf(7))));
+    Assertions.assertEquals("7", groups.tuples().get(0).get(0));
+  }
+
+  @Test
+  void testJsonRoundTripPreservesCoercedTypes() throws Exception
+  {
+    // Both small Integer (Jackson default) and large Long pass through 
coercion to canonical Long.
+    final ClusterGroupTuples groups = new ClusterGroupTuples(
+        tenantPriority(),
+        List.of(List.of("acme", (Object) 5), List.of("globex", (Object) 
5_000_000_000L))
+    );
+    final String json = MAPPER.writeValueAsString(groups);
+    final ClusterGroupTuples back = MAPPER.readValue(json, 
ClusterGroupTuples.class);
+    Assertions.assertEquals(groups, back);
+    // Round-tripped tuples must end up with the same canonical types as the 
in-memory original.
+    Assertions.assertEquals(Long.class, 
back.tuples().get(0).get(1).getClass());
+    Assertions.assertEquals(Long.class, 
back.tuples().get(1).get(1).getClass());
+  }
+
+  @Test
+  void testTuplesAreImmutable()
+  {
+    final ClusterGroupTuples groups = new ClusterGroupTuples(
+        tenantRegion(),
+        List.of(List.of("acme", "us-east-1"))
+    );
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () -> groups.tuples().add(List.of("globex", "us-east-1"))
+    );
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () -> groups.tuples().get(0).set(0, "hijacked")
+    );
+  }
+
+  @Test
+  void testVirtualColumnsDefaultEmpty()
+  {
+    final ClusterGroupTuples groups = new ClusterGroupTuples(tenantRegion(), 
List.of());
+    Assertions.assertSame(VirtualColumns.EMPTY, groups.virtualColumns());
+  }
+
+  @Test
+  void testVirtualColumnsAreStored()
+  {
+    final ClusterGroupTuples groups = new ClusterGroupTuples(
+        RowSignature.builder().add("tenant_lower", ColumnType.STRING).build(),
+        VIRTUAL_COLUMNS,
+        List.of(List.of("acme"))
+    );
+    
Assertions.assertNotNull(groups.virtualColumns().getVirtualColumn("tenant_lower"));
+  }
+
+  @Test
+  void testVirtualColumnsJsonRoundTrip() throws Exception
+  {
+    final ClusterGroupTuples original = new ClusterGroupTuples(
+        RowSignature.builder().add("tenant_lower", ColumnType.STRING).build(),
+        VIRTUAL_COLUMNS,
+        List.of(List.of("acme"))
+    );
+    // Round-trip needs an injectable ExprMacroTable for 
ExpressionVirtualColumn deserialization.
+    final ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(
+        new InjectableValues.Std()
+            .addValue(ExprMacroTable.class, TestExprMacroTable.INSTANCE)
+    );
+    final String json = mapper.writeValueAsString(original);
+    Assertions.assertTrue(json.contains("\"virtualColumns\""), () -> "expected 
virtualColumns in JSON: " + json);
+    final ClusterGroupTuples back = mapper.readValue(json, 
ClusterGroupTuples.class);
+    Assertions.assertEquals(original, back);
+  }
+
+  @Test
+  void testVirtualColumnsOmittedFromJsonWhenEmpty() throws Exception
+  {
+    final ClusterGroupTuples groups = new ClusterGroupTuples(tenantRegion(), 
List.of(List.of("acme", "us-east-1")));
+    final String json = MAPPER.writeValueAsString(groups);
+    Assertions.assertFalse(json.contains("virtualColumns"), () -> "did not 
expect virtualColumns in JSON: " + json);
+  }
+
+  @Test
+  void testVirtualColumnInternerSharesAcrossInstances()
+  {
+    // Two ClusterGroupTuples built from independent (but equal) VC inputs 
should share their VC instances via the
+    // shared interner on DataSegment, so identical clustering VCs dedupe 
across segments held in memory.
+    final ClusterGroupTuples a = new ClusterGroupTuples(
+        RowSignature.builder().add("tenant_lower", ColumnType.STRING).build(),
+        VIRTUAL_COLUMNS,
+        List.of(List.of("acme"))
+    );
+    final ClusterGroupTuples b = new ClusterGroupTuples(
+        RowSignature.builder().add("tenant_lower", ColumnType.STRING).build(),
+        VIRTUAL_COLUMNS,
+        List.of(List.of("globex"))
+    );
+    Assertions.assertSame(
+        a.virtualColumns().getVirtualColumns()[0],
+        b.virtualColumns().getVirtualColumns()[0]
+    );
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/timeline/DataSegmentClusterGroupsTest.java
 
b/processing/src/test/java/org/apache/druid/timeline/DataSegmentClusterGroupsTest.java
new file mode 100644
index 00000000000..a0f54db1865
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/timeline/DataSegmentClusterGroupsTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+/**
+ * Coverage for {@link DataSegment#getClusterGroups()} and the associated 
builder / interner plumbing.
+ */
+class DataSegmentClusterGroupsTest
+{
+  private final ObjectMapper mapper = new DefaultObjectMapper();
+
+  @BeforeEach
+  void setUp()
+  {
+    final InjectableValues.Std injectables = new InjectableValues.Std();
+    injectables.addValue(DataSegment.PruneSpecsHolder.class, 
DataSegment.PruneSpecsHolder.DEFAULT);
+    mapper.setInjectableValues(injectables);
+  }
+
+  private static RowSignature tenantRegion()
+  {
+    return RowSignature.builder()
+                       .add("tenant", ColumnType.STRING)
+                       .add("region", ColumnType.STRING)
+                       .build();
+  }
+
+  private static ClusterGroupTuples sampleGroups()
+  {
+    return new ClusterGroupTuples(
+        tenantRegion(),
+        List.of(List.of("acme", "us-east-1"), List.of("globex", "us-east-1"))
+    );
+  }
+
+  private DataSegment.Builder baseBuilder()
+  {
+    return DataSegment.builder(SegmentId.of(
+        "ds",
+        Intervals.of("2026-01-01/2026-01-02"),
+        "v",
+        new NumberedShardSpec(0, 1)
+    )).size(1L);
+  }
+
+  @Test
+  void testNullByDefault()
+  {
+    final DataSegment segment = baseBuilder().build();
+    Assertions.assertNull(segment.getClusterGroups());
+  }
+
+  @Test
+  void testBuilderAttachesClusterGroups()
+  {
+    final ClusterGroupTuples groups = sampleGroups();
+    final DataSegment segment = baseBuilder().clusterGroups(groups).build();
+    Assertions.assertEquals(groups, segment.getClusterGroups());
+  }
+
+  @Test
+  void testBuilderCopyPreservesClusterGroups()
+  {
+    final ClusterGroupTuples groups = sampleGroups();
+    final DataSegment original = baseBuilder().clusterGroups(groups).build();
+    final DataSegment copy = DataSegment.builder(original).build();
+    Assertions.assertSame(original.getClusterGroups(), 
copy.getClusterGroups());
+  }
+
+  @Test
+  void testWithClusterGroupsCanClearField()
+  {
+    final DataSegment original = 
baseBuilder().clusterGroups(sampleGroups()).build();
+    final DataSegment cleared = original.withClusterGroups(null);
+    Assertions.assertNull(cleared.getClusterGroups());
+  }
+
+  @Test
+  void testJsonRoundTripWithClusterGroups() throws Exception
+  {
+    final DataSegment segment = 
baseBuilder().clusterGroups(sampleGroups()).build();
+    final String json = mapper.writeValueAsString(segment);
+    Assertions.assertTrue(json.contains("\"clusterGroups\""), () -> "expected 
clusterGroups in JSON: " + json);
+    final DataSegment back = mapper.readValue(json, DataSegment.class);
+    Assertions.assertEquals(segment.getClusterGroups(), 
back.getClusterGroups());
+  }
+
+  @Test
+  void testJsonRoundTripOmitsNullClusterGroups() throws Exception
+  {
+    final DataSegment segment = baseBuilder().build();
+    final String json = mapper.writeValueAsString(segment);
+    Assertions.assertFalse(json.contains("clusterGroups"), () -> "did not 
expect clusterGroups in JSON: " + json);
+    final DataSegment back = mapper.readValue(json, DataSegment.class);
+    Assertions.assertNull(back.getClusterGroups());
+  }
+
+  @Test
+  void testOlderJsonWithoutFieldDeserializes() throws Exception
+  {
+    // A representative pre-clusterGroups JSON shape; the field is simply 
absent. Older readers (and segments published
+    // before this PR) don't carry it.
+    final String legacyJson =
+        "{\"dataSource\":\"ds\","
+        + "\"interval\":\"2026-01-01T00:00:00.000Z/2026-01-02T00:00:00.000Z\","
+        + "\"version\":\"v\","
+        + "\"loadSpec\":{},"
+        + "\"dimensions\":\"\",\"metrics\":\"\","
+        + 
"\"shardSpec\":{\"type\":\"numbered\",\"partitionNum\":0,\"partitions\":1},"
+        + "\"binaryVersion\":0,\"size\":1}";
+    final DataSegment back = mapper.readValue(legacyJson, DataSegment.class);
+    Assertions.assertNull(back.getClusterGroups());
+  }
+
+  @Test
+  void testInternerReturnsSameInstanceAcrossSegments()
+  {
+    final DataSegment a = baseBuilder().clusterGroups(sampleGroups()).build();
+    // sampleGroups() builds a fresh instance each call; the interner should 
dedupe identical content.
+    final DataSegment b = DataSegment.builder(SegmentId.of(
+        "ds-other",
+        Intervals.of("2026-01-01/2026-01-02"),
+        "v",
+        new NumberedShardSpec(0, 1)
+    )).size(1L).clusterGroups(sampleGroups()).build();
+    Assertions.assertSame(a.getClusterGroups(), b.getClusterGroups());
+  }
+
+  @Test
+  void testInternerDistinguishesDifferentContent()
+  {
+    final DataSegment a = baseBuilder().clusterGroups(sampleGroups()).build();
+    final ClusterGroupTuples otherGroups = new ClusterGroupTuples(
+        tenantRegion(),
+        List.of(List.of("globex", "us-west-2"))
+    );
+    final DataSegment b = DataSegment.builder(SegmentId.of(
+        "ds-other",
+        Intervals.of("2026-01-01/2026-01-02"),
+        "v",
+        new NumberedShardSpec(0, 1)
+    )).size(1L).clusterGroups(otherGroups).build();
+    Assertions.assertNotSame(a.getClusterGroups(), b.getClusterGroups());
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/guice/PartialLoadSpecModule.java 
b/server/src/main/java/org/apache/druid/guice/PartialLoadSpecModule.java
index d1bc533269c..d60ca44c12f 100644
--- a/server/src/main/java/org/apache/druid/guice/PartialLoadSpecModule.java
+++ b/server/src/main/java/org/apache/druid/guice/PartialLoadSpecModule.java
@@ -24,13 +24,15 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.inject.Binder;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.segment.loading.LoadSpec;
+import org.apache.druid.segment.loading.PartialClusterGroupLoadSpec;
 import org.apache.druid.segment.loading.PartialProjectionLoadSpec;
 
 import java.util.List;
 
 /**
- * Registers {@link PartialProjectionLoadSpec} as a {@link LoadSpec} subtype 
for serde of partial load rules. This
- * module is added to the always-loaded core list so it is available alongside 
any other deep-storage load spec modules.
+ * Registers {@link PartialProjectionLoadSpec} and {@link 
PartialClusterGroupLoadSpec} as {@link LoadSpec} subtypes
+ * for serde of partial load rules. This module is added to the always-loaded 
core list so they are available
+ * alongside any other deep-storage load spec modules.
  */
 public class PartialLoadSpecModule implements DruidModule
 {
@@ -43,6 +45,11 @@ public class PartialLoadSpecModule implements DruidModule
   @Override
   public List<? extends Module> getJacksonModules()
   {
-    return List.of(new 
SimpleModule().registerSubtypes(PartialProjectionLoadSpec.class));
+    return List.of(
+        new SimpleModule().registerSubtypes(
+            PartialProjectionLoadSpec.class,
+            PartialClusterGroupLoadSpec.class
+        )
+    );
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
 
b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
index 427c670cf4c..d853c675890 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.apache.druid.jackson.CommaListJoinDeserializer;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.timeline.ClusterGroupTuples;
 import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.ShardSpec;
@@ -50,6 +51,7 @@ public class LoadableDataSegment extends DataSegment
       @JsonProperty("dimensions") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable List<String> dimensions,
       @JsonProperty("metrics") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable List<String> metrics,
       @JsonProperty("projections") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable List<String> projections,
+      @JsonProperty("clusterGroups") @Nullable ClusterGroupTuples 
clusterGroups,
       @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
       @JsonProperty("lastCompactionState") @Nullable CompactionState 
lastCompactionState,
       @JsonProperty("binaryVersion") Integer binaryVersion,
@@ -66,6 +68,7 @@ public class LoadableDataSegment extends DataSegment
         dimensions,
         metrics,
         projections,
+        clusterGroups,
         shardSpec,
         lastCompactionState,
         binaryVersion,
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/ClusterGroupPartialLoadMatcher.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/ClusterGroupPartialLoadMatcher.java
new file mode 100644
index 00000000000..2dd50818def
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/ClusterGroupPartialLoadMatcher.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.segment.loading.PartialClusterGroupLoadSpec;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * Base for {@link PartialLoadMatcher} implementations that decide which of a 
clustered segment's cluster groups to
+ * partially load. Subclasses supply the resolution policy via {@link 
#resolveClusterGroupIndices(DataSegment)}; the
+ * sorted, deduped indices into {@code 
segment.getClusterGroups().getTuples()}, and this base handles fingerprint
+ * computation and wraps the result into the {@code partialClusterGroup} 
load-spec wire form consumed by the
+ * historical-side partial loader.
+ * <p>
+ * The fingerprint is a hash of the resolved indices for a segment; the data 
node includes this value in the segment
+ * announcement so the coordinator can detect rule changes between runs and 
reconcile loaded replicas.
+ */
+public abstract class ClusterGroupPartialLoadMatcher implements 
PartialLoadMatcher
+{
+  static final String FINGERPRINT_VERSION = "v1";
+
+  /**
+   * Returns the sorted, deduped list of indices into {@code 
segment.getClusterGroups().getTuples()} selected by this
+   * matcher. Returns an empty list when nothing matches (the segment is not 
clustered, or no configured pattern /
+   * tuple intersects what the segment has).
+   */
+  protected abstract List<Integer> resolveClusterGroupIndices(DataSegment 
segment);
+
+  @Override
+  @Nullable
+  public MatchResult match(DataSegment segment, Map<String, Object> 
baseLoadSpec)
+  {
+    if (segment.getClusterGroups() == null) {
+      return null;
+    }
+    final List<Integer> resolved = resolveClusterGroupIndices(segment);
+    if (resolved.isEmpty()) {
+      return null;
+    }
+    final String fingerprint = computeFingerprint(resolved);
+    return new MatchResult(PartialClusterGroupLoadSpec.wireForm(baseLoadSpec, 
resolved, fingerprint), fingerprint);
+  }
+
+  static String computeFingerprint(List<Integer> sortedDedupedIndices)
+  {
+    final Hasher hasher = Hashing.sha256().newHasher();
+    for (Integer idx : sortedDedupedIndices) {
+      hasher.putInt(idx);
+    }
+    final String hex = 
BaseEncoding.base16().encode(hasher.hash().asBytes()).toLowerCase(Locale.ROOT);
+    // should be good enough without dragging the whole thing around for every 
segment
+    return FINGERPRINT_VERSION + ":" + hex.substring(0, 16);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/Globs.java 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/Globs.java
new file mode 100644
index 00000000000..b0f81b98a56
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/Globs.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Shared glob compilation + matching helpers used by partial-load matchers 
that match operator-supplied glob patterns
+ * against strings (projection names, stringified cluster-group values, etc.). 
Supported metacharacters:
+ * <ul>
+ *   <li>{@code *} — any sequence of characters (including empty)</li>
+ *   <li>{@code ?} — any single character</li>
+ *   <li>{@code \} — escapes the following character so it is treated 
literally; use {@code \*}, {@code \?}, or
+ *       {@code \\} to match a literal {@code *}, {@code ?}, or {@code \}. A 
trailing unescaped {@code \} is
+ *       rejected.</li>
+ * </ul>
+ * Other characters are literal; regex metacharacters in literal positions are 
escaped automatically.
+ */
+public final class Globs
+{
+  private Globs()
+  {
+    // no instantiation
+  }
+
+  /**
+   * Translates a glob pattern into a regex pattern that matches the entire 
input string.
+   *
+   * @throws org.apache.druid.error.DruidException if {@code glob} ends with 
an unescaped backslash
+   */
+  public static String globToRegex(String glob)
+  {
+    final StringBuilder sb = new StringBuilder(glob.length() + 4);
+    boolean escaping = false;
+    for (int i = 0; i < glob.length(); i++) {
+      final char c = glob.charAt(i);
+      if (escaping) {
+        appendLiteral(sb, c);
+        escaping = false;
+        continue;
+      }
+      switch (c) {
+        case '\\':
+          escaping = true;
+          break;
+        case '*':
+          sb.append(".*");
+          break;
+        case '?':
+          sb.append('.');
+          break;
+        default:
+          appendLiteral(sb, c);
+      }
+    }
+    if (escaping) {
+      throw InvalidInput.exception("Glob pattern [%s] ends with an unescaped 
backslash", glob);
+    }
+    return sb.toString();
+  }
+
+  public static List<Pattern> compileAll(List<String> globs)
+  {
+    if (globs.isEmpty()) {
+      return List.of();
+    }
+    final List<Pattern> compiled = new ArrayList<>(globs.size());
+    for (String glob : globs) {
+      compiled.add(Pattern.compile(globToRegex(glob)));
+    }
+    return List.copyOf(compiled);
+  }
+
+  public static boolean matchesAny(String name, List<Pattern> patterns)
+  {
+    for (Pattern pattern : patterns) {
+      if (pattern.matcher(name).matches()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Compile a glob, special-casing the literal {@code "*"} to a {@link 
CompiledGlob#matchAny} marker that matches any
+   * value including null
+   */
+  public static CompiledGlob compile(String glob)
+  {
+    if ("*".equals(glob)) {
+      return CompiledGlob.MATCH_ANY;
+    }
+    return new CompiledGlob(false, Pattern.compile(globToRegex(glob)));
+  }
+
+  private static void appendLiteral(StringBuilder sb, char c)
+  {
+    switch (c) {
+      case '.':
+      case '(':
+      case ')':
+      case '[':
+      case ']':
+      case '{':
+      case '}':
+      case '+':
+      case '|':
+      case '^':
+      case '$':
+      case '\\':
+      case '*':
+      case '?':
+        sb.append('\\').append(c);
+        break;
+      default:
+        sb.append(c);
+    }
+  }
+
+  /**
+   * Compiled, per-column glob. The literal {@code "*"} short-circuits to a 
"match any value, including null" flag;
+   * any other glob compiles to a regex matched against the rendered tuple 
value (and never matches null).
+   */
+  public static final class CompiledGlob
+  {
+    public static final CompiledGlob MATCH_ANY = new CompiledGlob(true, null);
+
+    final boolean matchAny;
+    @Nullable
+    final Pattern pattern;
+
+    private CompiledGlob(boolean matchAny, @Nullable Pattern pattern)
+    {
+      if (matchAny) {
+        DruidException.conditionalDefensive(pattern == null, "matchAny=true 
must not carry a pattern");
+      } else {
+        DruidException.conditionalDefensive(pattern != null, "pattern must be 
non-null when matchAny=false");
+      }
+      this.matchAny = matchAny;
+      this.pattern = pattern;
+    }
+
+    public boolean matches(@Nullable String value)
+    {
+      if (matchAny) {
+        return true;
+      }
+      if (value == null) {
+        return false;
+      }
+      return pattern.matcher(value).matches();
+    }
+
+    public boolean isMatchAny()
+    {
+      return matchAny;
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java
index b1c9c78ace1..65c69113b62 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java
@@ -37,7 +37,8 @@ import java.util.Map;
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = 
UnknownPartialLoadMatcher.class)
 @JsonSubTypes({
     @JsonSubTypes.Type(name = ExactProjectionPartialLoadMatcher.TYPE, value = 
ExactProjectionPartialLoadMatcher.class),
-    @JsonSubTypes.Type(name = WildcardProjectionPartialLoadMatcher.TYPE, value 
= WildcardProjectionPartialLoadMatcher.class)
+    @JsonSubTypes.Type(name = WildcardProjectionPartialLoadMatcher.TYPE, value 
= WildcardProjectionPartialLoadMatcher.class),
+    @JsonSubTypes.Type(name = WildcardClusterGroupPartialLoadMatcher.TYPE, 
value = WildcardClusterGroupPartialLoadMatcher.class)
 })
 public interface PartialLoadMatcher
 {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardClusterGroupPartialLoadMatcher.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardClusterGroupPartialLoadMatcher.java
new file mode 100644
index 00000000000..bf88d2952d0
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardClusterGroupPartialLoadMatcher.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.ClusterGroupTuples;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.CollectionUtils;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Selects cluster groups whose clustering tuples match any of the configured 
per-column glob patterns, minus any
+ * groups matched by an entry in {@code excludePatterns}. Each pattern is a 
{@code Map<String, String>} where keys are
+ * clustering column names (or operator-side virtual column names, see below) 
and values are glob patterns matched
+ * against the rendered tuple value at that column. Columns omitted from a 
pattern are treated as wildcards. Glob
+ * syntax (including escape semantics) is shared with {@link 
WildcardProjectionPartialLoadMatcher} via {@link Globs}.
+ * <p>
+ * If the operator supplies {@link #getVirtualColumns()}, a pattern key may 
also reference one of those virtual
+ * columns. At match time, the matcher resolves such a key to a clustering 
column on the segment via
+ * {@link VirtualColumns#findEquivalent(VirtualColumns.Node)} between the 
matchers VCs and the segment's clustering
+ * VCs (carried on {@link ClusterGroupTuples#virtualColumns()}). This lets 
operators author portable rules, they
+ * write their preferred VC name and expression, and the matcher resolves to 
whatever name the segment happens to use
+ * for the equivalent clustering VC.
+ * <p>
+ * If the operator-side VC for a pattern key has no equivalent clustering VC 
on the segment, the pattern is treated as
+ * non-matching for that segment (defensive against typos or schema drift). 
The operator-VC-first ordering also
+ * disambiguates the shadowing case where an operator-VC and a clustering 
column share a name: the operator-VC
+ * interpretation wins, and a pattern is only matchable when the VCs are 
actually equivalent.
+ * <p>
+ * Null clustering values are matched only by omitting the column entirely or 
using the literal {@code "*"} glob; both
+ * have explicit "match-anything-including-null" semantics. Any other glob 
(including {@code "null"}) does not match a
+ * null clustering value. Matching specifically by null clustering values 
(e.g., load only the null-keyed group) is
+ * not supported by this matcher and is left for a future typed-tuple matcher 
variant.
+ */
+public class WildcardClusterGroupPartialLoadMatcher extends 
ClusterGroupPartialLoadMatcher
+{
+  public static final String TYPE = "globClusterGroup";
+
+  private final List<Map<String, String>> patterns;
+  private final List<Map<String, String>> excludePatterns;
+  private final VirtualColumns virtualColumns;
+  private final List<Map<String, Globs.CompiledGlob>> compiledPatterns;
+  private final List<Map<String, Globs.CompiledGlob>> compiledExcludePatterns;
+
+  @JsonCreator
+  public WildcardClusterGroupPartialLoadMatcher(
+      @JsonProperty("patterns") List<Map<String, String>> patterns,
+      @JsonProperty("excludePatterns") @Nullable List<Map<String, String>> 
excludePatterns,
+      @JsonProperty("virtualColumns") @Nullable VirtualColumns virtualColumns
+  )
+  {
+    if (patterns == null || patterns.isEmpty()) {
+      throw InvalidInput.exception("patterns must not be null or empty for 
globClusterGroup matcher");
+    }
+    this.patterns = copyAndValidatePatterns(patterns, "patterns");
+    this.excludePatterns = excludePatterns == null
+                           ? List.of()
+                           : copyAndValidatePatterns(excludePatterns, 
"excludePatterns");
+    this.virtualColumns = internVirtualColumns(virtualColumns);
+    this.compiledPatterns = compileAll(this.patterns);
+    this.compiledExcludePatterns = compileAll(this.excludePatterns);
+  }
+
+  /**
+   * Convenience constructor for callers that don't carry operator-side 
virtual columns. Equivalent to passing
+   * {@code null} for the virtual columns argument.
+   */
+  public WildcardClusterGroupPartialLoadMatcher(
+      List<Map<String, String>> patterns,
+      @Nullable List<Map<String, String>> excludePatterns
+  )
+  {
+    this(patterns, excludePatterns, null);
+  }
+
+  @JsonProperty
+  public List<Map<String, String>> getPatterns()
+  {
+    return patterns;
+  }
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_EMPTY)
+  public List<Map<String, String>> getExcludePatterns()
+  {
+    return excludePatterns;
+  }
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_EMPTY)
+  public VirtualColumns getVirtualColumns()
+  {
+    return virtualColumns;
+  }
+
+  @Override
+  protected List<Integer> resolveClusterGroupIndices(DataSegment segment)
+  {
+    final ClusterGroupTuples clusterGroups = segment.getClusterGroups();
+    if (clusterGroups == null) {
+      return Collections.emptyList();
+    }
+    final RowSignature clusteringColumns = clusterGroups.clusteringColumns();
+    final VirtualColumns segmentVcs = clusterGroups.virtualColumns();
+    final List<List<Object>> tuples = clusterGroups.tuples();
+
+    // Per-pattern resolution: which clustering column does each pattern key 
map to? Resolution is segment-scoped
+    // (depends only on the segment's clustering signature + VCs), so compute 
it once up front and reuse it across
+    // tuples. A null entry in the resolved list marks the pattern as 
non-matching for this segment.
+    final List<Map<String, String>> resolvedPatterns = 
resolveAll(compiledPatterns, clusteringColumns, segmentVcs);
+    final List<Map<String, String>> resolvedExcludes = 
resolveAll(compiledExcludePatterns, clusteringColumns, segmentVcs);
+
+    final TreeSet<Integer> matched = new TreeSet<>();
+    for (int i = 0; i < tuples.size(); i++) {
+      final List<Object> tuple = tuples.get(i);
+      if (!matchesAnyPattern(tuple, clusteringColumns, compiledPatterns, 
resolvedPatterns)) {
+        continue;
+      }
+      if (matchesAnyPattern(tuple, clusteringColumns, compiledExcludePatterns, 
resolvedExcludes)) {
+        continue;
+      }
+      matched.add(i);
+    }
+    return new ArrayList<>(matched);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    WildcardClusterGroupPartialLoadMatcher that = 
(WildcardClusterGroupPartialLoadMatcher) o;
+    return Objects.equals(patterns, that.patterns)
+           && Objects.equals(excludePatterns, that.excludePatterns)
+           && Objects.equals(virtualColumns, that.virtualColumns);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(patterns, excludePatterns, virtualColumns);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "WildcardClusterGroupPartialLoadMatcher{patterns=" + patterns
+           + ", excludePatterns=" + excludePatterns
+           + ", virtualColumns=" + virtualColumns + "}";
+  }
+
+  /**
+   * For each compiled pattern, compute the {@code patternKey -> segment 
clustering column name} map. A {@code null}
+   * entry in the returned list marks a pattern as non-matching for this 
segment (some pattern key couldn't be
+   * resolved to a clustering column via either direct name or operator-VC 
equivalence).
+   */
+  private List<Map<String, String>> resolveAll(
+      List<Map<String, Globs.CompiledGlob>> compiled,
+      RowSignature clusteringColumns,
+      VirtualColumns segmentVcs
+  )
+  {
+    if (compiled.isEmpty()) {
+      return List.of();
+    }
+    final List<Map<String, String>> out = new ArrayList<>(compiled.size());
+    for (Map<String, Globs.CompiledGlob> pattern : compiled) {
+      out.add(resolvePattern(pattern.keySet(), clusteringColumns, segmentVcs));
+    }
+    return out;
+  }
+
+  @Nullable
+  private Map<String, String> resolvePattern(
+      Set<String> patternKeys,
+      RowSignature clusteringColumns,
+      VirtualColumns segmentVcs
+  )
+  {
+    final Map<String, String> resolved = 
CollectionUtils.newLinkedHashMapWithExpectedSize(patternKeys.size());
+    for (String key : patternKeys) {
+      final String target = resolveKey(key, clusteringColumns, segmentVcs);
+      if (target == null) {
+        return null;   // pattern unresolvable for this segment
+      }
+      resolved.put(key, target);
+    }
+    return resolved;
+  }
+
+  /**
+   * Resolve a pattern key to a clustering column name on the segment. Three 
cases:
+   * <ol>
+   *   <li>The matcher carries an virtual column by this name, resolve via
+   *       {@link VirtualColumns#findEquivalent(VirtualColumns.Node)} against 
the segment's clustering VCs. The
+   *       matcher VC interpretation wins regardless of any same-name 
clustering column (shadowing).</li>
+   *   <li>Otherwise, if the key is directly a clustering column name -> 
identity.</li>
+   *   <li>Otherwise -> unresolvable (returns {@code null}).</li>
+   * </ol>
+   */
+  @Nullable
+  private String resolveKey(String key, RowSignature clusteringColumns, 
VirtualColumns segmentVcs)
+  {
+    final VirtualColumns.Node operatorVcNode = virtualColumns.getNode(key);
+    if (operatorVcNode != null) {
+      final VirtualColumn equivalent = 
segmentVcs.findEquivalent(operatorVcNode);
+      if (equivalent == null) {
+        return null;
+      }
+      final String equivalentName = equivalent.getOutputName();
+      return clusteringColumns.contains(equivalentName) ? equivalentName : 
null;
+    }
+    return clusteringColumns.contains(key) ? key : null;
+  }
+
+  private static boolean matchesAnyPattern(
+      List<Object> tuple,
+      RowSignature clusteringColumns,
+      List<Map<String, Globs.CompiledGlob>> compiledPatterns,
+      List<Map<String, String>> resolvedPatterns
+  )
+  {
+    for (int p = 0; p < compiledPatterns.size(); p++) {
+      final Map<String, String> resolved = resolvedPatterns.get(p);
+      if (resolved == null) {
+        continue;
+      }
+      if (matchesPattern(tuple, clusteringColumns, compiledPatterns.get(p), 
resolved)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static boolean matchesPattern(
+      List<Object> tuple,
+      RowSignature clusteringColumns,
+      Map<String, Globs.CompiledGlob> pattern,
+      Map<String, String> resolved
+  )
+  {
+    for (Map.Entry<String, Globs.CompiledGlob> entry : pattern.entrySet()) {
+      final String resolvedColumn = resolved.get(entry.getKey());
+      final int idx = clusteringColumns.indexOf(resolvedColumn);
+      // resolved is guaranteed to map every patternKey to a real clustering 
column (else the pattern was skipped).
+      final Globs.CompiledGlob glob = entry.getValue();
+      if (glob.isMatchAny()) {
+        // Literal "*" matches every value, including null.
+        continue;
+      }
+      final Object value = tuple.get(idx);
+      if (value == null) {
+        // Any non-"*" glob cannot match a null clustering value.
+        return false;
+      }
+      if (!glob.matches(value.toString())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static List<Map<String, String>> 
copyAndValidatePatterns(List<Map<String, String>> input, String fieldName)
+  {
+    final List<Map<String, String>> out = new ArrayList<>(input.size());
+    for (int i = 0; i < input.size(); i++) {
+      final Map<String, String> pattern = input.get(i);
+      if (pattern == null || pattern.isEmpty()) {
+        throw InvalidInput.exception("%s[%s] must not be null or empty", 
fieldName, i);
+      }
+      out.add(Map.copyOf(pattern));
+    }
+    return List.copyOf(out);
+  }
+
+  private static List<Map<String, Globs.CompiledGlob>> 
compileAll(List<Map<String, String>> patterns)
+  {
+    if (patterns.isEmpty()) {
+      return List.of();
+    }
+    final List<Map<String, Globs.CompiledGlob>> out = new 
ArrayList<>(patterns.size());
+    for (Map<String, String> pattern : patterns) {
+      final Map<String, Globs.CompiledGlob> compiled = 
CollectionUtils.newLinkedHashMapWithExpectedSize(pattern.size());
+      for (Map.Entry<String, String> entry : pattern.entrySet()) {
+        compiled.put(entry.getKey(), Globs.compile(entry.getValue()));
+      }
+      out.add(Collections.unmodifiableMap(compiled));
+    }
+    return List.copyOf(out);
+  }
+
+  private static VirtualColumns internVirtualColumns(@Nullable VirtualColumns 
virtualColumns)
+  {
+    if (virtualColumns == null || virtualColumns.isEmpty()) {
+      return VirtualColumns.EMPTY;
+    }
+    return VirtualColumns.create(
+        Arrays.stream(virtualColumns.getVirtualColumns())
+              .map(DataSegment.virtualColumnInterner()::intern)
+              .toList()
+    );
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardProjectionPartialLoadMatcher.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardProjectionPartialLoadMatcher.java
index 934eaf6f958..5adf3c09642 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardProjectionPartialLoadMatcher.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardProjectionPartialLoadMatcher.java
@@ -72,8 +72,8 @@ public class WildcardProjectionPartialLoadMatcher extends 
ProjectionPartialLoadM
     }
     this.patterns = List.copyOf(patterns);
     this.excludePatterns = excludePatterns == null ? List.of() : 
List.copyOf(excludePatterns);
-    this.compiledPatterns = compileAll(this.patterns);
-    this.compiledExcludePatterns = compileAll(this.excludePatterns);
+    this.compiledPatterns = Globs.compileAll(this.patterns);
+    this.compiledExcludePatterns = Globs.compileAll(this.excludePatterns);
   }
 
   @JsonProperty
@@ -98,99 +98,16 @@ public class WildcardProjectionPartialLoadMatcher extends 
ProjectionPartialLoadM
     }
     final TreeSet<String> matched = new TreeSet<>();
     for (String name : segmentProjections) {
-      if (matchesAny(name, compiledExcludePatterns)) {
+      if (Globs.matchesAny(name, compiledExcludePatterns)) {
         continue;
       }
-      if (matchesAny(name, compiledPatterns)) {
+      if (Globs.matchesAny(name, compiledPatterns)) {
         matched.add(name);
       }
     }
     return new ArrayList<>(matched);
   }
 
-  private static List<Pattern> compileAll(List<String> globs)
-  {
-    if (globs.isEmpty()) {
-      return List.of();
-    }
-    final List<Pattern> compiled = new ArrayList<>(globs.size());
-    for (String glob : globs) {
-      compiled.add(Pattern.compile(globToRegex(glob)));
-    }
-    return List.copyOf(compiled);
-  }
-
-  private static boolean matchesAny(String name, List<Pattern> patterns)
-  {
-    for (Pattern pattern : patterns) {
-      if (pattern.matcher(name).matches()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * Translates a glob pattern with {@code *}, {@code ?}, and {@code \} escape 
semantics into an equivalent regex
-   * pattern that matches the entire input string. Regex metacharacters in 
literal positions are escaped.
-   *
-   * @throws org.apache.druid.error.DruidException if {@code glob} ends with 
an unescaped backslash
-   */
-  static String globToRegex(String glob)
-  {
-    final StringBuilder sb = new StringBuilder(glob.length() + 4);
-    boolean escaping = false;
-    for (int i = 0; i < glob.length(); i++) {
-      final char c = glob.charAt(i);
-      if (escaping) {
-        appendLiteral(sb, c);
-        escaping = false;
-        continue;
-      }
-      switch (c) {
-        case '\\':
-          escaping = true;
-          break;
-        case '*':
-          sb.append(".*");
-          break;
-        case '?':
-          sb.append('.');
-          break;
-        default:
-          appendLiteral(sb, c);
-      }
-    }
-    if (escaping) {
-      throw InvalidInput.exception("Glob pattern [%s] ends with an unescaped 
backslash", glob);
-    }
-    return sb.toString();
-  }
-
-  private static void appendLiteral(StringBuilder sb, char c)
-  {
-    switch (c) {
-      case '.':
-      case '(':
-      case ')':
-      case '[':
-      case ']':
-      case '{':
-      case '}':
-      case '+':
-      case '|':
-      case '^':
-      case '$':
-      case '\\':
-      case '*':
-      case '?':
-        sb.append('\\').append(c);
-        break;
-      default:
-        sb.append(c);
-    }
-  }
-
   @Override
   public boolean equals(Object o)
   {
diff --git 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index fa5971e9657..4f76b7b52c1 100644
--- 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -2761,6 +2761,7 @@ public class CachingClusteredClientTest
             null,
             null,
             null,
+            null,
             NoneShardSpec.instance(),
             null,
             -1,
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
index 1546eb76cdf..afee36e530b 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
@@ -248,6 +248,7 @@ public class CreateDataSegments
           Collections.emptyList(),
           Collections.emptyList(),
           Collections.emptyList(),
+          null,
           shardSpec,
           compactionState,
           IndexIO.CURRENT_VERSION_ID,
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/GlobsTest.java 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/GlobsTest.java
new file mode 100644
index 00000000000..b8ad7aa9180
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/GlobsTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import org.apache.druid.error.DruidException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Focused unit coverage for the shared {@link Globs} helpers. Integration 
semantics (e.g., how patterns interact with
+ * excludes inside a matcher) live with the matcher tests; this file pins the 
translation rules so future refactors
+ * don't drift them silently.
+ */
+class GlobsTest
+{
+  @Test
+  void testLiteralGlobMatchesItself()
+  {
+    Assertions.assertEquals("user_daily", Globs.globToRegex("user_daily"));
+  }
+
+  @Test
+  void testStarTranslatesToDotStar()
+  {
+    Assertions.assertEquals("user_.*", Globs.globToRegex("user_*"));
+  }
+
+  @Test
+  void testQuestionMarkTranslatesToDot()
+  {
+    Assertions.assertEquals("user_.", Globs.globToRegex("user_?"));
+  }
+
+  @Test
+  void testRegexMetacharactersInLiteralPositionsAreEscaped()
+  {
+    // Each of these has special regex meaning; the translator must escape 
them so they match literally.
+    Assertions.assertEquals("user\\.daily", Globs.globToRegex("user.daily"));
+    Assertions.assertEquals("a\\+b", Globs.globToRegex("a+b"));
+    Assertions.assertEquals("\\(foo\\)", Globs.globToRegex("(foo)"));
+    Assertions.assertEquals("a\\|b", Globs.globToRegex("a|b"));
+    Assertions.assertEquals("\\^start", Globs.globToRegex("^start"));
+    Assertions.assertEquals("end\\$", Globs.globToRegex("end$"));
+  }
+
+  @Test
+  void testBackslashEscapesGlobMetacharacters()
+  {
+    // Escaped glob metacharacters become literal regex characters (themselves 
escaped).
+    Assertions.assertEquals("a\\*b", Globs.globToRegex("a\\*b"));
+    Assertions.assertEquals("a\\?b", Globs.globToRegex("a\\?b"));
+    Assertions.assertEquals("a\\\\b", Globs.globToRegex("a\\\\b"));
+  }
+
+  @Test
+  void testTrailingUnescapedBackslashIsRejected()
+  {
+    Assertions.assertThrows(DruidException.class, () -> 
Globs.globToRegex("foo\\"));
+  }
+
+  @Test
+  void testCompileAllReturnsEmptyListForEmptyInput()
+  {
+    Assertions.assertTrue(Globs.compileAll(List.of()).isEmpty());
+  }
+
+  @Test
+  void testCompileAllCompilesEachPattern()
+  {
+    final List<Pattern> compiled = Globs.compileAll(List.of("user_*", 
"report_?"));
+    Assertions.assertEquals(2, compiled.size());
+    Assertions.assertTrue(compiled.get(0).matcher("user_daily").matches());
+    Assertions.assertTrue(compiled.get(1).matcher("report_x").matches());
+    Assertions.assertFalse(compiled.get(1).matcher("report_xy").matches());
+  }
+
+  @Test
+  void testMatchesAnyReturnsTrueOnFirstHit()
+  {
+    final List<Pattern> compiled = Globs.compileAll(List.of("user_*", 
"report_*"));
+    Assertions.assertTrue(Globs.matchesAny("user_daily", compiled));
+    Assertions.assertTrue(Globs.matchesAny("report_hourly", compiled));
+    Assertions.assertFalse(Globs.matchesAny("other", compiled));
+  }
+
+  @Test
+  void testMatchesAnyEmptyPatternListNeverMatches()
+  {
+    Assertions.assertFalse(Globs.matchesAny("anything", List.of()));
+  }
+
+  @Test
+  void testCompileLiteralStarReturnsMatchAny()
+  {
+    // The literal "*" is special-cased to MATCH_ANY so callers can 
short-circuit a "matches any value, including
+    // null" branch without paying a regex match. Any other glob takes the 
regex path.
+    final Globs.CompiledGlob compiled = Globs.compile("*");
+    Assertions.assertSame(Globs.CompiledGlob.MATCH_ANY, compiled);
+    Assertions.assertTrue(compiled.matchAny);
+    Assertions.assertNull(compiled.pattern);
+  }
+
+  @Test
+  void testCompileGlobReturnsCompiledRegex()
+  {
+    final Globs.CompiledGlob compiled = Globs.compile("us-*");
+    Assertions.assertFalse(compiled.matchAny);
+    Assertions.assertNotNull(compiled.pattern);
+    Assertions.assertTrue(compiled.matches("us-east-1"));
+    Assertions.assertFalse(compiled.matches("eu-west"));
+  }
+
+  @Test
+  void testCompileLiteralNullStringHasNoSpecialTreatment()
+  {
+    // The string "null" goes through the regex path like any other literal 
glob, the helper does not give the
+    // literal-string "null" any special meaning
+    final Globs.CompiledGlob compiled = Globs.compile("null");
+    Assertions.assertFalse(compiled.matchAny);
+    Assertions.assertNotNull(compiled.pattern);
+    Assertions.assertTrue(compiled.matches("null"));
+    Assertions.assertFalse(compiled.matches("nullx"));
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/WildcardClusterGroupPartialLoadMatcherTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/WildcardClusterGroupPartialLoadMatcherTest.java
new file mode 100644
index 00000000000..0ff1bdc804e
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/WildcardClusterGroupPartialLoadMatcherTest.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.timeline.ClusterGroupTuples;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+class WildcardClusterGroupPartialLoadMatcherTest
+{
+  private static final Map<String, Object> BASE_LOAD_SPEC = Map.of("type", 
"local", "path", "/seg");
+
+  private final ObjectMapper mapper = new DefaultObjectMapper();
+
+  @BeforeEach
+  void setUp()
+  {
+    final InjectableValues.Std injectables = new InjectableValues.Std();
+    injectables.addValue(DataSegment.PruneSpecsHolder.class, 
DataSegment.PruneSpecsHolder.DEFAULT);
+    mapper.setInjectableValues(injectables);
+  }
+
+  private static RowSignature tenantRegion()
+  {
+    return RowSignature.builder()
+                       .add("tenant", ColumnType.STRING)
+                       .add("region", ColumnType.STRING)
+                       .build();
+  }
+
+  /** A 3-group fixture: (acme, us-east-1), (acme, us-west-2), (globex, 
us-east-1). */
+  private static DataSegment threeGroupSegment()
+  {
+    return segmentWithGroups(new ClusterGroupTuples(
+        tenantRegion(),
+        List.of(
+            List.of("acme", "us-east-1"),
+            List.of("acme", "us-west-2"),
+            List.of("globex", "us-east-1")
+        )
+    ));
+  }
+
+  private static DataSegment segmentWithGroups(ClusterGroupTuples groups)
+  {
+    return DataSegment.builder(SegmentId.of(
+        "ds",
+        Intervals.of("2026-01-01/2026-01-02"),
+        "v",
+        new NumberedShardSpec(0, 1)
+    )).loadSpec(BASE_LOAD_SPEC).size(0).clusterGroups(groups).build();
+  }
+
+  @Test
+  void testConstructorRejectsNullPatterns()
+  {
+    Assertions.assertThrows(DruidException.class, () -> new 
WildcardClusterGroupPartialLoadMatcher(null, null));
+  }
+
+  @Test
+  void testConstructorRejectsEmptyPatterns()
+  {
+    Assertions.assertThrows(DruidException.class, () -> new 
WildcardClusterGroupPartialLoadMatcher(List.of(), null));
+  }
+
+  @Test
+  void testConstructorRejectsEmptyPatternMap()
+  {
+    Assertions.assertThrows(
+        DruidException.class,
+        () -> new WildcardClusterGroupPartialLoadMatcher(List.of(Map.of()), 
null)
+    );
+  }
+
+  @Test
+  void testConstructorRejectsEmptyExcludePatternMap()
+  {
+    Assertions.assertThrows(
+        DruidException.class,
+        () -> new 
WildcardClusterGroupPartialLoadMatcher(List.of(Map.of("tenant", "*")), 
List.of(Map.of()))
+    );
+  }
+
+  @Test
+  void testNonClusteredSegmentReturnsNull()
+  {
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("tenant", "acme")),
+        null
+    );
+    final DataSegment segment = DataSegment.builder(SegmentId.of(
+        "ds",
+        Intervals.of("2026-01-01/2026-01-02"),
+        "v",
+        new NumberedShardSpec(0, 1)
+    )).loadSpec(BASE_LOAD_SPEC).size(0).build();
+    Assertions.assertNull(matcher.match(segment, BASE_LOAD_SPEC));
+  }
+
+  @Test
+  void testSingleColumnExactMatch()
+  {
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("tenant", "acme")),
+        null
+    );
+    final PartialLoadMatcher.MatchResult result = 
matcher.match(threeGroupSegment(), BASE_LOAD_SPEC);
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(List.of(0, 1), 
result.wrappedLoadSpec().get("clusterGroupIndices"));
+  }
+
+  @Test
+  void testSingleColumnGlob()
+  {
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("region", "us-east-*")),
+        null
+    );
+    final PartialLoadMatcher.MatchResult result = 
matcher.match(threeGroupSegment(), BASE_LOAD_SPEC);
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(List.of(0, 2), 
result.wrappedLoadSpec().get("clusterGroupIndices"));
+  }
+
+  @Test
+  void testMultiColumnPatternWithExactAndGlob()
+  {
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("tenant", "globex", "region", "us-east-*")),
+        null
+    );
+    final PartialLoadMatcher.MatchResult result = 
matcher.match(threeGroupSegment(), BASE_LOAD_SPEC);
+    Assertions.assertEquals(List.of(2), 
result.wrappedLoadSpec().get("clusterGroupIndices"));
+  }
+
+  @Test
+  void testOmittedColumnIsWildcard()
+  {
+    // No region constraint -> all acme rows match regardless of region.
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("tenant", "acme")),
+        null
+    );
+    final PartialLoadMatcher.MatchResult result = 
matcher.match(threeGroupSegment(), BASE_LOAD_SPEC);
+    Assertions.assertEquals(List.of(0, 1), 
result.wrappedLoadSpec().get("clusterGroupIndices"));
+  }
+
+  @Test
+  void testExcludePatternRemovesGroups()
+  {
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("tenant", "acme")),
+        List.of(Map.of("region", "us-west-2"))
+    );
+    final PartialLoadMatcher.MatchResult result = 
matcher.match(threeGroupSegment(), BASE_LOAD_SPEC);
+    Assertions.assertEquals(List.of(0), 
result.wrappedLoadSpec().get("clusterGroupIndices"));
+  }
+
+  @Test
+  void testStarMatchesAllIncludingNullValuesAtThatColumn()
+  {
+    final DataSegment segment = segmentWithGroups(new ClusterGroupTuples(
+        tenantRegion(),
+        Arrays.asList(
+            Arrays.asList("acme", "us-east-1"),
+            Arrays.asList("acme", null),
+            Arrays.asList(null, "us-east-1")
+        )
+    ));
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("region", "*")),
+        null
+    );
+    final PartialLoadMatcher.MatchResult result = matcher.match(segment, 
BASE_LOAD_SPEC);
+    // The "*" glob matches every value including null at the region position.
+    Assertions.assertEquals(List.of(0, 1, 2), 
result.wrappedLoadSpec().get("clusterGroupIndices"));
+  }
+
+  @Test
+  void testOmittedColumnMatchesNullAtThatPosition()
+  {
+    final DataSegment segment = segmentWithGroups(new ClusterGroupTuples(
+        tenantRegion(),
+        Arrays.asList(
+            Arrays.asList("acme", "us-east-1"),
+            Arrays.asList("acme", null)
+        )
+    ));
+    // No constraint on region; the null-region tuple is still picked up.
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("tenant", "acme")),
+        null
+    );
+    final PartialLoadMatcher.MatchResult result = matcher.match(segment, 
BASE_LOAD_SPEC);
+    Assertions.assertEquals(List.of(0, 1), 
result.wrappedLoadSpec().get("clusterGroupIndices"));
+  }
+
+  @Test
+  void testNonStarGlobDoesNotMatchNull()
+  {
+    final DataSegment segment = segmentWithGroups(new ClusterGroupTuples(
+        tenantRegion(),
+        Arrays.asList(
+            Arrays.asList("acme", "us-east-1"),
+            Arrays.asList("acme", null)
+        )
+    ));
+    // "us-*" must not match a null region (the only way to match null is "*" 
or omission).
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("region", "us-*")),
+        null
+    );
+    final PartialLoadMatcher.MatchResult result = matcher.match(segment, 
BASE_LOAD_SPEC);
+    Assertions.assertEquals(List.of(0), 
result.wrappedLoadSpec().get("clusterGroupIndices"));
+  }
+
+  @Test
+  void testLiteralStringNullDoesNotMatchNullClusteringValue()
+  {
+    final DataSegment segment = segmentWithGroups(new ClusterGroupTuples(
+        tenantRegion(),
+        Arrays.asList(Arrays.asList("acme", null))
+    ));
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("region", "null")),
+        null
+    );
+    Assertions.assertNull(matcher.match(segment, BASE_LOAD_SPEC));
+  }
+
+  @Test
+  void testUnknownColumnInPatternNoMatch()
+  {
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("not_a_clustering_column", "anything")),
+        null
+    );
+    Assertions.assertNull(matcher.match(threeGroupSegment(), BASE_LOAD_SPEC));
+  }
+
+  @Test
+  void testNoMatchReturnsNull()
+  {
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("tenant", "nobody")),
+        null
+    );
+    Assertions.assertNull(matcher.match(threeGroupSegment(), BASE_LOAD_SPEC));
+  }
+
+  @Test
+  void testIndicesAreSortedAndDeduped()
+  {
+    // Two patterns that both match the same group should not duplicate the 
index in the output.
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("tenant", "acme"), Map.of("region", "us-east-1")),
+        null
+    );
+    final PartialLoadMatcher.MatchResult result = 
matcher.match(threeGroupSegment(), BASE_LOAD_SPEC);
+    // acme matches 0, 1; us-east-1 matches 0, 2 — union sorted = [0, 1, 2].
+    Assertions.assertEquals(List.of(0, 1, 2), 
result.wrappedLoadSpec().get("clusterGroupIndices"));
+  }
+
+  @Test
+  void testFingerprintStableAcrossPatternOrder()
+  {
+    final WildcardClusterGroupPartialLoadMatcher a = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("tenant", "acme"), Map.of("region", "us-east-1")),
+        null
+    );
+    final WildcardClusterGroupPartialLoadMatcher b = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("region", "us-east-1"), Map.of("tenant", "acme")),
+        null
+    );
+    final PartialLoadMatcher.MatchResult ra = a.match(threeGroupSegment(), 
BASE_LOAD_SPEC);
+    final PartialLoadMatcher.MatchResult rb = b.match(threeGroupSegment(), 
BASE_LOAD_SPEC);
+    Assertions.assertEquals(ra.fingerprint(), rb.fingerprint());
+  }
+
+  @Test
+  void testJsonRoundTrip() throws Exception
+  {
+    final WildcardClusterGroupPartialLoadMatcher original = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("tenant", "acme"), Map.of("tenant", "globex", "region", 
"us-east-*")),
+        List.of(Map.of("region", "us-west-2"))
+    );
+    final String json = mapper.writeValueAsString(original);
+    final PartialLoadMatcher back = mapper.readValue(json, 
PartialLoadMatcher.class);
+    Assertions.assertEquals(original, back);
+  }
+
+  @Test
+  void testOperatorVcResolvesToClusteringVcByEquivalence()
+  {
+    // Operator names their VC "queryLower" with lower(tenant); the segment's 
clustering VC is "tenant_lower" with
+    // the same expression. The matcher should find equivalence and match the 
segment's tuples.
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("queryLower", "acme")),
+        null,
+        lowerTenantVcs("queryLower")
+    );
+    final DataSegment segment = vcClusteredSegment("acme", "globex");
+    final PartialLoadMatcher.MatchResult result = matcher.match(segment, 
BASE_LOAD_SPEC);
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(List.of(0), 
result.wrappedLoadSpec().get("clusterGroupIndices"));
+  }
+
+  @Test
+  void testOperatorVcWithoutEquivalenceIsNonMatching()
+  {
+    // Operator-VC computes upper(tenant); segment's clustering VC computes 
lower(tenant). Not equivalent → the
+    // pattern is non-matching for this segment.
+    final VirtualColumns operatorVcs = VirtualColumns.create(new 
ExpressionVirtualColumn(
+        "queryUpper",
+        "upper(tenant)",
+        ColumnType.STRING,
+        TestExprMacroTable.INSTANCE
+    ));
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("queryUpper", "ACME")),
+        null,
+        operatorVcs
+    );
+    Assertions.assertNull(matcher.match(vcClusteredSegment("acme"), 
BASE_LOAD_SPEC));
+  }
+
+  @Test
+  void testOperatorVcShadowsClusteringColumnName()
+  {
+    // Operator declares a VC named "tenant" with an unrelated expression. The 
pattern key "tenant" resolves through
+    // the operator's VC (operator-VC interpretation wins). Without an 
equivalent on the segment side, the pattern
+    // is non-matching even though "tenant" happens to also be a clustering 
column name on a different segment.
+    final VirtualColumns operatorVcs = VirtualColumns.create(new 
ExpressionVirtualColumn(
+        "tenant",
+        "lower(otherCol)",
+        ColumnType.STRING,
+        TestExprMacroTable.INSTANCE
+    ));
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("tenant", "acme")),
+        null,
+        operatorVcs
+    );
+    // Segment that clusters directly on physical "tenant" (no segment VCs). 
The operator-VC shadowing rule means
+    // we don't silently treat the pattern's "tenant" as the clustering column 
"tenant" — it's interpreted as the
+    // operator-VC, which has no equivalent on the segment → non-matching.
+    Assertions.assertNull(matcher.match(threeGroupSegment(), BASE_LOAD_SPEC));
+  }
+
+  @Test
+  void testNoOperatorVcsKeepsDirectNameMatching()
+  {
+    // Sanity check: when no operator VCs are configured, the pattern key path 
falls through to direct clustering
+    // column name resolution, unchanged from the pre-VC behavior.
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("tenant", "acme")),
+        null
+    );
+    Assertions.assertNotNull(matcher.match(threeGroupSegment(), 
BASE_LOAD_SPEC));
+  }
+
+  @Test
+  void testOperatorVcJsonRoundTrip() throws Exception
+  {
+    // Round-trip needs an injectable ExprMacroTable for 
ExpressionVirtualColumn deserialization.
+    mapper.setInjectableValues(
+        new InjectableValues.Std()
+            .addValue(DataSegment.PruneSpecsHolder.class, 
DataSegment.PruneSpecsHolder.DEFAULT)
+            .addValue(ExprMacroTable.class, TestExprMacroTable.INSTANCE)
+    );
+    final WildcardClusterGroupPartialLoadMatcher original = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("queryLower", "acme")),
+        null,
+        lowerTenantVcs("queryLower")
+    );
+    final String json = mapper.writeValueAsString(original);
+    Assertions.assertTrue(json.contains("\"virtualColumns\""), () -> "expected 
virtualColumns in JSON: " + json);
+    final PartialLoadMatcher back = mapper.readValue(json, 
PartialLoadMatcher.class);
+    Assertions.assertEquals(original, back);
+  }
+
+  @Test
+  void testVirtualColumnsOmittedFromJsonWhenEmpty() throws Exception
+  {
+    final WildcardClusterGroupPartialLoadMatcher matcher = new 
WildcardClusterGroupPartialLoadMatcher(
+        List.of(Map.of("tenant", "acme")),
+        null
+    );
+    final String json = mapper.writeValueAsString(matcher);
+    Assertions.assertFalse(json.contains("virtualColumns"), () -> "did not 
expect virtualColumns in JSON: " + json);
+  }
+
+  private static VirtualColumns lowerTenantVcs(String outputName)
+  {
+    return VirtualColumns.create(new ExpressionVirtualColumn(
+        outputName,
+        "lower(tenant)",
+        ColumnType.STRING,
+        TestExprMacroTable.INSTANCE
+    ));
+  }
+
+  private static DataSegment vcClusteredSegment(String... lowerTenants)
+  {
+    final RowSignature clusteringColumns = 
RowSignature.builder().add("tenant_lower", ColumnType.STRING).build();
+    final List<List<Object>> tuples = new ArrayList<>(lowerTenants.length);
+    for (String t : lowerTenants) {
+      tuples.add(Collections.singletonList(t));
+    }
+    return segmentWithGroups(new ClusterGroupTuples(clusteringColumns, 
lowerTenantVcs("tenant_lower"), tuples));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to