This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e3b61e0f3f0 Add `strict` and `pooled` Broker tier selector strategies
(#19094)
e3b61e0f3f0 is described below
commit e3b61e0f3f053476ffcd2c0e666d77b9b5a0f0ac
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Tue Mar 10 10:50:25 2026 -0700
Add `strict` and `pooled` Broker tier selector strategies (#19094)
Adds two new Broker TierSelectorStrategy implementations to provide finer
control over how Brokers select Historical and Realtime servers for query
execution.
- strict – Only selects servers whose priorities match the configured list.
Unlike other existing strategies, there is no fallback to servers with
other priorities if the configured priorities are unavailable. This also
addresses a current limitation with watched tiers: when multiple tiers are
configured, Brokers can still retain visibility into the state of the cluster,
while enforcing query isolation at the time of server selection rather than
filtering servers at the time of building the Broker's server view.
- pooled – Pools servers across the configured priorities and selects among
them, allowing queries to utilize multiple priority tiers for improved
availability. This is particularly useful for querying realtime servers where
the number of task replicas per tier may be limited for cost reasons.
Both strategies require the configured set of priorities to be non-empty.
Similar to queries routed to tiers that are not part of the watched tiers,
these strategies may result in queries returning no data if the configured
tiers are unavailable.
---
.../selector/AbstractTierSelectorStrategy.java | 2 +-
.../selector/PooledTierSelectorStrategy.java | 156 ++++++++
.../selector/PooledTierSelectorStrategyConfig.java | 63 ++++
.../selector/StrictTierSelectorStrategy.java | 183 ++++++++++
.../selector/StrictTierSelectorStrategyConfig.java | 63 ++++
.../client/selector/TierSelectorStrategy.java | 2 +
.../client/selector/TierSelectorStrategyTest.java | 401 +++++++++++++++++++++
.../druid/cli/BrokerRealtimeSelectorModule.java | 23 ++
.../main/java/org/apache/druid/cli/CliBroker.java | 4 +
.../java/org/apache/druid/cli/CliBrokerTest.java | 161 +++++++++
10 files changed, 1057 insertions(+), 1 deletion(-)
diff --git
a/server/src/main/java/org/apache/druid/client/selector/AbstractTierSelectorStrategy.java
b/server/src/main/java/org/apache/druid/client/selector/AbstractTierSelectorStrategy.java
index 59a611df30f..1821245cbf3 100644
---
a/server/src/main/java/org/apache/druid/client/selector/AbstractTierSelectorStrategy.java
+++
b/server/src/main/java/org/apache/druid/client/selector/AbstractTierSelectorStrategy.java
@@ -34,7 +34,7 @@ import java.util.Set;
*/
public abstract class AbstractTierSelectorStrategy implements
TierSelectorStrategy
{
- private final ServerSelectorStrategy serverSelectorStrategy;
+ protected final ServerSelectorStrategy serverSelectorStrategy;
public AbstractTierSelectorStrategy(ServerSelectorStrategy
serverSelectorStrategy)
{
diff --git
a/server/src/main/java/org/apache/druid/client/selector/PooledTierSelectorStrategy.java
b/server/src/main/java/org/apache/druid/client/selector/PooledTierSelectorStrategy.java
new file mode 100644
index 00000000000..3b77e01086d
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/client/selector/PooledTierSelectorStrategy.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.selector;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
+import org.apache.druid.client.QueryableDruidServer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A {@link TierSelectorStrategy} that pools servers with the configured set
of priorities from {@link PooledTierSelectorStrategyConfig#getPriorities()}
+ * and delegates server selection to the configured {@link
ServerSelectorStrategy}.
+ * <p>
+ * Unlike other {@link TierSelectorStrategy} like {@link
CustomTierSelectorStrategy}
+ * which has a preference for priority order, this strategy treats all
configured priorities equally
+ * by combining their servers into a single selection pool and delegates to
{@link ServerSelectorStrategy} to do
+ * the server selection. If no servers match the configured priorities in the
pool, an empty server list is returned,
+ * which may cause queries to return partial or no data.
+ * <p>
+ * Example configuration:
+ * <li> <code> druid.broker.select.tier=pooled </code> </li>
+ * <li> <code> druid.broker.select.tier.pooled.priorities=[2,1] </code> </li>
+ * <p>
+ * With this configuration, servers with priority 2 and 1 are pooled together
and
+ * selection is delegated to the {@link ServerSelectorStrategy}. Servers with
other
+ * priorities are ignored.
+ */
+public class PooledTierSelectorStrategy extends AbstractTierSelectorStrategy
+{
+ private static final EmittingLogger log = new
EmittingLogger(PooledTierSelectorStrategy.class);
+ public static final String TYPE = "pooled";
+
+ private final PooledTierSelectorStrategyConfig config;
+ private final ServiceEmitter emitter;
+ private final Set<Integer> configuredPriorities;
+
+ @JsonCreator
+ public PooledTierSelectorStrategy(
+ @JacksonInject final ServerSelectorStrategy serverSelectorStrategy,
+ @JacksonInject final PooledTierSelectorStrategyConfig config,
+ @JacksonInject final ServiceEmitter emitter
+ )
+ {
+ super(serverSelectorStrategy);
+ this.config = config;
+ this.emitter = emitter;
+ this.configuredPriorities = config.getPriorities();
+ }
+
+ @Override
+ public <T> List<QueryableDruidServer> pick(
+ @Nullable final Query<T> query,
+ final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
+ final DataSegment segment,
+ final int numServersToPick
+ )
+ {
+ final Set<QueryableDruidServer> candidateServerPool = new
LinkedHashSet<>();
+
+ for (Int2ObjectMap.Entry<Set<QueryableDruidServer>> entry :
prioritizedServers.int2ObjectEntrySet()) {
+ final int priority = entry.getIntKey();
+ final Set<QueryableDruidServer> servers = entry.getValue();
+
+ if (configuredPriorities.contains(priority)) {
+ candidateServerPool.addAll(servers);
+ } else {
+ log.debug(
+ "Server priority[%d] not in the configured list of priorities[%s]
so ignore servers[%s] for query[%s]",
+ priority, config.getPriorities(), servers, query
+ );
+ }
+ }
+
+ if (candidateServerPool.isEmpty()) {
+ if (query == null || query instanceof SegmentMetadataQuery) {
+ // Debug logging to reduce logging spam as these are typically
system-generated segment metadata queries
+ log.debug(
+ "No server found for query[%s] from server priorities[%s].
Configured priorities[%s].",
+ query, prioritizedServers.keySet(), config.getPriorities()
+ );
+ } else {
+ log.warn(
+ "No servers found for query[%s] matching configured
priorities[%s]. Available priorities[%s].",
+ query, config.getPriorities(), prioritizedServers.keySet()
+ );
+ emitter.emit(
+ ServiceMetricEvent.builder()
+ .setMetric("tierSelector/noServer", 1)
+ .setDimension("dataSource",
String.valueOf(query.getDataSource()))
+ .setDimension("tierSelectorType", TYPE)
+ .setDimension("queryType", query.getType())
+ .setDimension("queryPriority",
String.valueOf(query.context().getPriority()))
+ .setDimensionIfNotNull("queryId", query.getId())
+ );
+ }
+ return List.of();
+ }
+
+ final List<QueryableDruidServer> selectedServers =
serverSelectorStrategy.pick(query, candidateServerPool, segment,
numServersToPick);
+ log.debug("Selected servers[%s] for query[%s] from given servers[%s] and
candidateServerPool[%s]", selectedServers, query, prioritizedServers,
candidateServerPool);
+ return selectedServers;
+ }
+
+ /**
+ * @return the natural order of priorities since priority order doesn't
matter for this strategy as the configured set of
+ * priorities in the pool are treated equally and delegated to {@link
#serverSelectorStrategy}.
+ */
+ @Override
+ public Comparator<Integer> getComparator()
+ {
+ return Comparator.naturalOrder();
+ }
+
+ public PooledTierSelectorStrategyConfig getConfig()
+ {
+ return config;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "PooledTierSelectorStrategy{" +
+ "config=" + config +
+ '}';
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/client/selector/PooledTierSelectorStrategyConfig.java
b/server/src/main/java/org/apache/druid/client/selector/PooledTierSelectorStrategyConfig.java
new file mode 100644
index 00000000000..d1782851598
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/client/selector/PooledTierSelectorStrategyConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.selector;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.Set;
+
+/**
+ * Configuration for {@link PooledTierSelectorStrategy}.
+ * <p>
+ * Requires a non-empty set of {@code priorities}. The order of priorities
don't matter.
+ */
+public class PooledTierSelectorStrategyConfig
+{
+ @JsonProperty
+ private final Set<Integer> priorities;
+
+ public Set<Integer> getPriorities()
+ {
+ return priorities;
+ }
+
+ public PooledTierSelectorStrategyConfig(@JsonProperty("priorities") final
Set<Integer> priorities)
+ {
+ if (CollectionUtils.isNullOrEmpty(priorities)) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build(
+ "priorities must be non-empty when using pooled
tier selector on the Broker. Found priorities[%s].",
+ priorities
+ );
+ }
+ this.priorities = priorities;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "PooledTierSelectorStrategyConfig{" +
+ "priorities=" + priorities +
+ '}';
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/client/selector/StrictTierSelectorStrategy.java
b/server/src/main/java/org/apache/druid/client/selector/StrictTierSelectorStrategy.java
new file mode 100644
index 00000000000..048e6e5c479
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/client/selector/StrictTierSelectorStrategy.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.selector;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
+import org.apache.druid.client.QueryableDruidServer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A {@link TierSelectorStrategy} that only considers servers whose priorities
+ * are explicitly listed in {@link
StrictTierSelectorStrategyConfig#getPriorities()}.
+ * <p>
+ * Unlike other strategies like {@link CustomTierSelectorStrategy} that fall
back to servers with different priorities,
+ * this strategy strictly filters the available servers to the configured
priorities.
+ * If no servers match the configured priorities, an empty server list is
returned from {@link #pick(Query, Int2ObjectRBTreeMap, DataSegment, int)},
+ * which may cause queries to return partial or no data.
+ * <p>
+ * When multiple priorities are configured, they are evaluated in the order
specified, with earlier priorities
+ * preferred over later ones.
+ * <p>
+ * Example configuration:
+ * <li> <code> druid.broker.select.tier=strict </code> </li>
+ * <li> <code> druid.broker.select.tier.strict.priorities=[2,1] </code> </li>
+ * <p>
+ * With this configuration, servers with priority 2 are preferred over servers
with priority 1.
+ * Servers with any other tier priority are not considered.
+ * <p>
+ * This strategy is useful when query isolation is required between different
server priority tiers. Brokers may still
+ * be configured to watch all server tiers, allowing them to retain visibility
into the overall cluster state
+ * while enforcing isolation at query time.
+ */
+public class StrictTierSelectorStrategy extends AbstractTierSelectorStrategy
+{
+ private static final EmittingLogger log = new
EmittingLogger(StrictTierSelectorStrategy.class);
+ public static final String TYPE = "strict";
+
+ private final StrictTierSelectorStrategyConfig config;
+ private final ServiceEmitter emitter;
+ private final Map<Integer, Integer> configuredPriorities;
+ private final Comparator<Integer> comparator;
+
+ @JsonCreator
+ public StrictTierSelectorStrategy(
+ @JacksonInject final ServerSelectorStrategy serverSelectorStrategy,
+ @JacksonInject final StrictTierSelectorStrategyConfig config,
+ @JacksonInject final ServiceEmitter emitter
+ )
+ {
+ super(serverSelectorStrategy);
+ this.config = config;
+ this.emitter = emitter;
+
+ configuredPriorities = new HashMap<>();
+ for (int i = 0; i < config.getPriorities().size(); i++) {
+ configuredPriorities.put(config.getPriorities().get(i), i);
+ }
+
+ this.comparator = (p1, p2) -> {
+ final Integer rank1 = configuredPriorities.get(p1);
+ final Integer rank2 = configuredPriorities.get(p2);
+
+ if (rank1 != null && rank2 != null) {
+ return Integer.compare(rank1, rank2);
+ }
+ if (rank1 != null) {
+ return -1;
+ }
+ if (rank2 != null) {
+ return 1;
+ }
+
+ // Priorities outside configuredPriorities don't matter and won't be
selected in pick() for this strategy.
+ // This fallback ordering can be anything and is needed because the
comparator may be used by ServerSelector/ServerView
+ // which maintains views of all servers, including those with
unconfigured priorities.
+ return Integer.compare(p2, p1);
+ };
+ }
+
+ @Override
+ public <T> List<QueryableDruidServer> pick(
+ @Nullable final Query<T> query,
+ final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
+ final DataSegment segment,
+ final int numServersToPick
+ )
+ {
+ final Int2ObjectRBTreeMap<Set<QueryableDruidServer>>
candidatePrioritizedServers = new Int2ObjectRBTreeMap<>(getComparator());
+
+ for (Int2ObjectMap.Entry<Set<QueryableDruidServer>> entry :
prioritizedServers.int2ObjectEntrySet()) {
+ final int priority = entry.getIntKey();
+ final Set<QueryableDruidServer> servers = entry.getValue();
+
+ if (configuredPriorities.containsKey(priority)) {
+ candidatePrioritizedServers.put(priority, servers);
+ } else {
+ log.debug(
+ "Server priority[%d] not in the configured list of priorities[%s]
so ignore servers[%s] for query[%s]",
+ priority, config.getPriorities(), servers, query
+ );
+ }
+ }
+
+ if (candidatePrioritizedServers.isEmpty()) {
+ if (query == null || query instanceof SegmentMetadataQuery) {
+ // Debug logging to reduce logging spam as these are typically
system-generated segment metadata queries
+ log.debug(
+ "No server found for query[%s] from server priorities[%s].
Configured priorities[%s].",
+ query, prioritizedServers.keySet(), config.getPriorities()
+ );
+ } else {
+ log.warn(
+ "No servers found for query[%s] matching configured
priorities[%s]. Available priorities[%s].",
+ query, config.getPriorities(), prioritizedServers.keySet()
+ );
+ emitter.emit(
+ ServiceMetricEvent.builder()
+ .setMetric("tierSelector/noServer", 1)
+ .setDimension("dataSource",
String.valueOf(query.getDataSource()))
+ .setDimension("tierSelectorType", TYPE)
+ .setDimension("queryType", query.getType())
+ .setDimension("queryPriority",
String.valueOf(query.context().getPriority()))
+ .setDimensionIfNotNull("queryId", query.getId())
+ );
+ }
+ return List.of();
+ }
+
+ final List<QueryableDruidServer> selectedServers = super.pick(query,
candidatePrioritizedServers, segment, numServersToPick);
+ log.debug("Selected servers[%s] for query[%s] from given servers[%s] and
numServersToPick[%s]", selectedServers, query, prioritizedServers,
numServersToPick);
+ return selectedServers;
+ }
+
+ @Override
+ public Comparator<Integer> getComparator()
+ {
+ return comparator;
+ }
+
+ public StrictTierSelectorStrategyConfig getConfig()
+ {
+ return config;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StrictTierSelectorStrategy{" +
+ "config=" + config +
+ '}';
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/client/selector/StrictTierSelectorStrategyConfig.java
b/server/src/main/java/org/apache/druid/client/selector/StrictTierSelectorStrategyConfig.java
new file mode 100644
index 00000000000..5ad02b11ee4
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/client/selector/StrictTierSelectorStrategyConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.selector;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.List;
+
+/**
+ * Configuration for {@link StrictTierSelectorStrategy}.
+ * <p>
+ * Requires a non-empty list of {@code priorities}.
+ */
+public class StrictTierSelectorStrategyConfig
+{
+ @JsonProperty
+ private final List<Integer> priorities;
+
+ public List<Integer> getPriorities()
+ {
+ return priorities;
+ }
+
+ public StrictTierSelectorStrategyConfig(@JsonProperty("priorities") final
List<Integer> priorities)
+ {
+ if (CollectionUtils.isNullOrEmpty(priorities)) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build(
+ "priorities must be non-empty when using strict
tier selector on the Broker. Found priorities[%s].",
+ priorities
+ );
+ }
+ this.priorities = priorities;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StrictTierSelectorStrategyConfig{" +
+ "priorities=" + priorities +
+ '}';
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/client/selector/TierSelectorStrategy.java
b/server/src/main/java/org/apache/druid/client/selector/TierSelectorStrategy.java
index 0cf3b3740ee..eb53bae61c7 100644
---
a/server/src/main/java/org/apache/druid/client/selector/TierSelectorStrategy.java
+++
b/server/src/main/java/org/apache/druid/client/selector/TierSelectorStrategy.java
@@ -39,6 +39,8 @@ import java.util.Set;
@JsonSubTypes.Type(name = LowestPriorityTierSelectorStrategy.TYPE, value =
LowestPriorityTierSelectorStrategy.class),
@JsonSubTypes.Type(name = CustomTierSelectorStrategy.TYPE, value =
CustomTierSelectorStrategy.class),
@JsonSubTypes.Type(name = PreferredTierSelectorStrategy.TYPE, value =
PreferredTierSelectorStrategy.class),
+ @JsonSubTypes.Type(name = StrictTierSelectorStrategy.TYPE, value =
StrictTierSelectorStrategy.class),
+ @JsonSubTypes.Type(name = PooledTierSelectorStrategy.TYPE, value =
PooledTierSelectorStrategy.class),
})
public interface TierSelectorStrategy
{
diff --git
a/server/src/test/java/org/apache/druid/client/selector/TierSelectorStrategyTest.java
b/server/src/test/java/org/apache/druid/client/selector/TierSelectorStrategyTest.java
index b05ec999646..2d2d047b3df 100644
---
a/server/src/test/java/org/apache/druid/client/selector/TierSelectorStrategyTest.java
+++
b/server/src/test/java/org/apache/druid/client/selector/TierSelectorStrategyTest.java
@@ -23,16 +23,25 @@ import com.google.common.collect.ImmutableList;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.QueryableDruidServer;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.CloneQueryMode;
import org.apache.druid.query.Query;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
@@ -43,11 +52,28 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class TierSelectorStrategyTest
{
+ private StubServiceEmitter serviceEmitter;
+
+ private static final Query SAMPLE_GROUPBY_QUERY = GroupByQuery.builder()
+
.setDataSource("foo3")
+
.setInterval(new
MultipleIntervalSegmentSpec(List.of(Intervals.of("2000/3000"))))
+
.setGranularity(Granularities.ALL)
+
.setDimensions(new DefaultDimensionSpec("dim2", "d0"))
+ .build();
+
+ @Before
+ public void testSetup()
+ {
+ serviceEmitter = StubServiceEmitter.createStarted();
+ EmittingLogger.registerEmitter(serviceEmitter);
+ }
+
@Test
public void testHighestPriorityTierSelectorStrategyRealtime()
{
@@ -568,4 +594,379 @@ public class TierSelectorStrategyTest
preferredTierHighPriority, preferredTierLowPriority,
nonPreferredTierHighestPriority
);
}
+
+ @Test
+ public void testStrictTierSelectorStrategyAllConfigured()
+ {
+ DirectDruidClient client = EasyMock.createMock(DirectDruidClient.class);
+ QueryableDruidServer pNeg1 = new QueryableDruidServer(
+ new DruidServer("test1", "localhost", null, 0, null,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, -1),
+ client
+ );
+ QueryableDruidServer p0 = new QueryableDruidServer(
+ new DruidServer("test2", "localhost", null, 0, null,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0),
+ client
+ );
+ QueryableDruidServer p2 = new QueryableDruidServer(
+ new DruidServer("test3", "localhost", null, 0, null,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 2),
+ client
+ );
+
+ testTierSelectorStrategy(
+ new StrictTierSelectorStrategy(
+ new ConnectionCountServerSelectorStrategy(),
+ new StrictTierSelectorStrategyConfig(List.of(2, 0, -1)),
+ serviceEmitter
+ ),
+ p2, p0, pNeg1
+ );
+ }
+
+ @Test
+ public void testStrictTierSelectorStrategyWithPartialMatchOfPriorities()
+ {
+ // Create 3 servers with priorities: -1, 0, 2
+ DirectDruidClient client = EasyMock.createMock(DirectDruidClient.class);
+ QueryableDruidServer pNeg1 = new QueryableDruidServer(
+ new DruidServer("test1", "localhost", null, 0, null,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, -1),
+ client
+ );
+ QueryableDruidServer p0 = new QueryableDruidServer(
+ new DruidServer("test2", "localhost", null, 0, null,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0),
+ client
+ );
+ QueryableDruidServer p2 = new QueryableDruidServer(
+ new DruidServer("test3", "localhost", null, 0, null,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 2),
+ client
+ );
+
+
+ // Configure strict strategy with only priorities [0, 2]
+ final ServerSelector serverSelector = new ServerSelector(
+
DataSegment.builder(SegmentId.dummy("foo")).shardSpec(NoneShardSpec.instance()).build(),
+ new StrictTierSelectorStrategy(
+ new ConnectionCountServerSelectorStrategy(),
+ new StrictTierSelectorStrategyConfig(List.of(0, 2)),
+ serviceEmitter
+ ),
+ HistoricalFilter.IDENTITY_FILTER
+ );
+
+ for (QueryableDruidServer server : List.of(p0, pNeg1, p2)) {
+ serverSelector.addServerAndUpdateSegment(server,
serverSelector.getSegment());
+ }
+
+ // Verify all 3 servers are registered
+ List<DruidServerMetadata> allServers =
serverSelector.getAllServers(CloneQueryMode.EXCLUDECLONES);
+ Assert.assertEquals(3, allServers.size());
+
+ Assert.assertEquals(p0, serverSelector.pick(null,
CloneQueryMode.EXCLUDECLONES));
+ Assert.assertEquals(p0, serverSelector.pick(SAMPLE_GROUPBY_QUERY,
CloneQueryMode.EXCLUDECLONES));
+
+ Assert.assertEquals(
+ "Only p0 should be returned when 1 candidate is requested",
+ List.of(p0.getServer().getMetadata()),
+ serverSelector.getCandidates(1, CloneQueryMode.EXCLUDECLONES)
+ );
+
+ Assert.assertEquals(
+ "Only p0 and p2 should be returned, pNeg1 shouldn't be a candidate",
+ List.of(p0.getServer().getMetadata(), p2.getServer().getMetadata()),
+ serverSelector.getCandidates(3, CloneQueryMode.EXCLUDECLONES)
+ );
+ }
+
+ @Test
+ public void testStrictTierSelectorStrategyNoMatchingPriorities()
+ {
+ DirectDruidClient client = EasyMock.createMock(DirectDruidClient.class);
+ QueryableDruidServer p0 = new QueryableDruidServer(
+ new DruidServer("test1", "localhost", null, 0, null,
ServerType.INDEXER_EXECUTOR, DruidServer.DEFAULT_TIER, 0),
+ client
+ );
+ QueryableDruidServer p1 = new QueryableDruidServer(
+ new DruidServer("test2", "localhost", null, 0, null,
ServerType.INDEXER_EXECUTOR, DruidServer.DEFAULT_TIER, 1),
+ client
+ );
+ QueryableDruidServer p2 = new QueryableDruidServer(
+ new DruidServer("test3", "localhost", null, 0, null,
ServerType.INDEXER_EXECUTOR, DruidServer.DEFAULT_TIER, 2),
+ client
+ );
+
+ final ServerSelector serverSelector = new ServerSelector(
+
DataSegment.builder(SegmentId.dummy("foo")).shardSpec(NoneShardSpec.instance()).build(),
+ new StrictTierSelectorStrategy(
+ new ConnectionCountServerSelectorStrategy(),
+ new StrictTierSelectorStrategyConfig(List.of(5, 6)),
+ serviceEmitter
+ ),
+ HistoricalFilter.IDENTITY_FILTER
+ );
+
+ for (QueryableDruidServer server : List.of(p0, p1, p2)) {
+ serverSelector.addServerAndUpdateSegment(server,
serverSelector.getSegment());
+ }
+
+ // Should return null when no matching priorities
+ Assert.assertNull(serverSelector.pick(null, CloneQueryMode.EXCLUDECLONES));
+ Assert.assertNull(serverSelector.pick(SAMPLE_GROUPBY_QUERY,
CloneQueryMode.EXCLUDECLONES));
+
+ serviceEmitter.verifyEmitted("tierSelector/noServer", 1);
+
+ // Should return empty list for getCandidates
+ Assert.assertEquals(Collections.emptyList(),
serverSelector.getCandidates(2, CloneQueryMode.EXCLUDECLONES));
+ }
+
+ @Test
+ public void testEmptyStrictTierPrioritiesThrowsException()
+ {
+ DruidException ex = Assert.assertThrows(
+ DruidException.class,
+ () -> new StrictTierSelectorStrategy(
+ new ConnectionCountServerSelectorStrategy(),
+ new StrictTierSelectorStrategyConfig(List.of()),
+ serviceEmitter
+ )
+ );
+ Assert.assertEquals(
+ "priorities must be non-empty when using strict tier selector on the
Broker. Found priorities[[]].",
+ ex.getMessage()
+ );
+ }
+
+ @Test
+ public void testPooledTierSelectorWithRandomServerStrategy()
+ {
+ DirectDruidClient client = EasyMock.createMock(DirectDruidClient.class);
+ QueryableDruidServer pNeg1 = new QueryableDruidServer(
+ new DruidServer("test1", "localhost", null, 0, null,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, -1),
+ client
+ );
+ QueryableDruidServer p0 = new QueryableDruidServer(
+ new DruidServer("test2", "localhost", null, 0, null,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0),
+ client
+ );
+ QueryableDruidServer p2 = new QueryableDruidServer(
+ new DruidServer("test3", "localhost", null, 0, null,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 2),
+ client
+ );
+
+ TierSelectorStrategy strategy = new PooledTierSelectorStrategy(
+ new RandomServerSelectorStrategy(),
+ new PooledTierSelectorStrategyConfig(Set.of(2, 0, -1)),
+ serviceEmitter
+ );
+
+ final ServerSelector serverSelector = new ServerSelector(
+
DataSegment.builder(SegmentId.dummy("foo")).shardSpec(NoneShardSpec.instance()).build(),
+ strategy,
+ HistoricalFilter.IDENTITY_FILTER
+ );
+
+ List<QueryableDruidServer> servers = List.of(pNeg1, p0, p2);
+ for (QueryableDruidServer server : servers) {
+ serverSelector.addServerAndUpdateSegment(server,
serverSelector.getSegment());
+ }
+
+ // All 3 servers should be configured
+ List<DruidServerMetadata> allServers =
serverSelector.getAllServers(CloneQueryMode.EXCLUDECLONES);
+ Assert.assertEquals(3, allServers.size());
+ Set<Integer> priorities = allServers.stream()
+ .map(DruidServerMetadata::getPriority)
+ .collect(Collectors.toSet());
+ Assert.assertEquals(Set.of(-1, 0, 2), priorities);
+
+ // Test getCandidates with different sizes - verify correct count
+ List<DruidServerMetadata> candidates1 = serverSelector.getCandidates(1,
CloneQueryMode.EXCLUDECLONES);
+ Assert.assertEquals(1, candidates1.size());
+ Assert.assertTrue(Set.of(-1, 0,
2).contains(candidates1.get(0).getPriority()));
+
+ List<DruidServerMetadata> candidates2 = serverSelector.getCandidates(2,
CloneQueryMode.EXCLUDECLONES);
+ Assert.assertEquals(2, candidates2.size());
+
+ List<DruidServerMetadata> candidates3 = serverSelector.getCandidates(3,
CloneQueryMode.EXCLUDECLONES);
+ Assert.assertEquals(3, candidates3.size());
+ Set<Integer> candidates3Priorities = candidates3.stream()
+
.map(DruidServerMetadata::getPriority)
+
.collect(Collectors.toSet());
+ Assert.assertEquals(Set.of(-1, 0, 2), candidates3Priorities);
+
+ // Pick should return one of the three servers (any priority -
demonstrates flattening)
+ QueryableDruidServer picked = serverSelector.pick(null,
CloneQueryMode.EXCLUDECLONES);
+ Assert.assertNotNull(picked);
+ Assert.assertTrue(
+ "Picked server should have one of the configured priorities",
+ picked.getServer().getPriority() == -1 ||
+ picked.getServer().getPriority() == 0 ||
+ picked.getServer().getPriority() == 2
+ );
+
+ // Pick multiple times to verify servers from flattened pool are accessible
+ Set<Integer> pickedPriorities = new HashSet<>();
+ for (int i = 0; i < 20; i++) {
+ QueryableDruidServer server = serverSelector.pick(null,
CloneQueryMode.EXCLUDECLONES);
+ Assert.assertNotNull(server);
+ pickedPriorities.add(server.getServer().getPriority());
+ }
+ // With RandomServerSelectorStrategy and 20 picks from 3 servers, we
should see multiple priorities
+ Assert.assertTrue(
+ "Expected to see servers from multiple priorities, but only saw: " +
pickedPriorities,
+ pickedPriorities.size() >= 2
+ );
+ }
+
+ @Test
+ public void testPooledTierSelectorWithConnectionCountServerStrategy()
+ {
+ AtomicInteger c0 = new AtomicInteger(20);
+ AtomicInteger c1 = new AtomicInteger(25);
+ AtomicInteger c2a = new AtomicInteger(1);
+ AtomicInteger c2b = new AtomicInteger(2);
+
+ DirectDruidClient client0 = EasyMock.createMock(DirectDruidClient.class);
+
EasyMock.expect(client0.getNumOpenConnections()).andAnswer(c0::get).anyTimes();
+
+ DirectDruidClient client1 = EasyMock.createMock(DirectDruidClient.class);
+
EasyMock.expect(client1.getNumOpenConnections()).andAnswer(c1::get).anyTimes();
+
+ DirectDruidClient client2a = EasyMock.createMock(DirectDruidClient.class);
+
EasyMock.expect(client2a.getNumOpenConnections()).andAnswer(c2a::get).anyTimes();
+
+ DirectDruidClient client2b = EasyMock.createMock(DirectDruidClient.class);
+
EasyMock.expect(client2b.getNumOpenConnections()).andAnswer(c2b::get).anyTimes();
+
+ EasyMock.replay(client0, client1, client2a, client2b);
+
+ QueryableDruidServer p0 = new QueryableDruidServer(
+ new DruidServer("p0", "localhost:8001", null, 0, null,
ServerType.INDEXER_EXECUTOR, DruidServer.DEFAULT_TIER, 0),
+ client0
+ );
+ QueryableDruidServer p1 = new QueryableDruidServer(
+ new DruidServer("p1", "localhost:8002", null, 0, null,
ServerType.INDEXER_EXECUTOR, DruidServer.DEFAULT_TIER, 1),
+ client1
+ );
+ QueryableDruidServer p2a = new QueryableDruidServer(
+ new DruidServer("p2a", "localhost:8003", null, 0, null,
ServerType.INDEXER_EXECUTOR, DruidServer.DEFAULT_TIER, 2),
+ client2a
+ );
+ QueryableDruidServer p2b = new QueryableDruidServer(
+ new DruidServer("p2b", "localhost:8004", null, 0, null,
ServerType.INDEXER_EXECUTOR, DruidServer.DEFAULT_TIER, 2),
+ client2b
+ );
+
+ TierSelectorStrategy strategy = new PooledTierSelectorStrategy(
+ new ConnectionCountServerSelectorStrategy(),
+ new PooledTierSelectorStrategyConfig(Set.of(0, 1, 2)),
+ serviceEmitter
+ );
+
+ ServerSelector selector = new ServerSelector(
+
DataSegment.builder(SegmentId.dummy("foo")).shardSpec(NoneShardSpec.instance()).build(),
+ strategy,
+ HistoricalFilter.IDENTITY_FILTER
+ );
+
+ for (QueryableDruidServer s : List.of(p0, p1, p2a, p2b)) {
+ selector.addServerAndUpdateSegment(s, selector.getSegment());
+ }
+
+ for (int i = 0; i < 20; i++) {
+ QueryableDruidServer picked = selector.pick(null,
CloneQueryMode.EXCLUDECLONES);
+ Assert.assertNotNull(picked);
+
+ // Should always be one of the priority-2 replicas
+ Assert.assertEquals(2, picked.getServer().getPriority());
+
+ if (picked.getServer().equals(p2a.getServer())) {
+ c2a.incrementAndGet();
+ } else if (picked.getServer().equals(p2b.getServer())) {
+ c2b.incrementAndGet();
+ } else {
+ Assert.fail("Expected pick to be either p2a or p2b but got: " +
picked);
+ }
+ }
+
+ // Now we just verify that all the servers are picked at least once
+ final Set<DruidServer> pickedServers = new HashSet<>();
+ for (int i = 0; i < 50; i++) {
+ QueryableDruidServer picked = selector.pick(null,
CloneQueryMode.EXCLUDECLONES);
+ Assert.assertNotNull(picked);
+
+ DruidServer pickedServer = picked.getServer();
+ pickedServers.add(pickedServer);
+
+ if (pickedServer.equals(p0.getServer())) {
+ c0.incrementAndGet();
+ } else if (pickedServer.equals(p1.getServer())) {
+ c1.incrementAndGet();
+ } else if (pickedServer.equals(p2a.getServer())) {
+ c2a.incrementAndGet();
+ } else {
+ c2b.incrementAndGet();
+ }
+ }
+
+ Assert.assertEquals(4, pickedServers.size());
+
+ EasyMock.verify(client0, client1, client2a, client2b);
+ }
+
+ @Test
+ public void testPooledTierSelectorStrategyWithNoServerCandidatesInPool()
+ {
+ DirectDruidClient client = EasyMock.createMock(DirectDruidClient.class);
+ QueryableDruidServer p0 = new QueryableDruidServer(
+ new DruidServer("test1", "localhost", null, 0, null,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0),
+ client
+ );
+ QueryableDruidServer p1 = new QueryableDruidServer(
+ new DruidServer("test2", "localhost", null, 0, null,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 1),
+ client
+ );
+ QueryableDruidServer p2 = new QueryableDruidServer(
+ new DruidServer("test3", "localhost", null, 0, null,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 2),
+ client
+ );
+
+ TierSelectorStrategy strategy = new PooledTierSelectorStrategy(
+ new ConnectionCountServerSelectorStrategy(),
+ new PooledTierSelectorStrategyConfig(Set.of(5, 6)),
+ serviceEmitter
+ );
+
+ final ServerSelector serverSelector = new ServerSelector(
+
DataSegment.builder(SegmentId.dummy("foo")).shardSpec(NoneShardSpec.instance()).build(),
+ strategy,
+ HistoricalFilter.IDENTITY_FILTER
+ );
+
+ List<QueryableDruidServer> servers = List.of(p0, p1, p2);
+ for (QueryableDruidServer server : servers) {
+ serverSelector.addServerAndUpdateSegment(server,
serverSelector.getSegment());
+ }
+
+ // Should return null since there are no matching priorities
+ Assert.assertNull(serverSelector.pick(null, CloneQueryMode.EXCLUDECLONES));
+ Assert.assertNull(serverSelector.pick(SAMPLE_GROUPBY_QUERY,
CloneQueryMode.EXCLUDECLONES));
+
+ Assert.assertEquals(List.of(), serverSelector.getCandidates(1,
CloneQueryMode.EXCLUDECLONES));
+ Assert.assertEquals(List.of(), serverSelector.getCandidates(2,
CloneQueryMode.EXCLUDECLONES));
+ }
+
+ @Test
+ public void testEmptyPooledTierPrioritiesThrowsException()
+ {
+ DruidException ex = Assert.assertThrows(
+ DruidException.class,
+ () -> new PooledTierSelectorStrategy(
+ new ConnectionCountServerSelectorStrategy(),
+ new PooledTierSelectorStrategyConfig(Set.of()),
+ serviceEmitter
+ )
+ );
+ Assert.assertEquals(
+ "priorities must be non-empty when using pooled tier selector on the
Broker. Found priorities[[]].",
+ ex.getMessage()
+ );
+ }
}
diff --git
a/services/src/main/java/org/apache/druid/cli/BrokerRealtimeSelectorModule.java
b/services/src/main/java/org/apache/druid/cli/BrokerRealtimeSelectorModule.java
index 5d5aad7f82a..0e7ca65f3f6 100644
---
a/services/src/main/java/org/apache/druid/cli/BrokerRealtimeSelectorModule.java
+++
b/services/src/main/java/org/apache/druid/cli/BrokerRealtimeSelectorModule.java
@@ -29,9 +29,13 @@ import com.google.inject.name.Names;
import org.apache.druid.client.BrokerServerView;
import org.apache.druid.client.selector.CustomTierSelectorStrategy;
import org.apache.druid.client.selector.CustomTierSelectorStrategyConfig;
+import org.apache.druid.client.selector.PooledTierSelectorStrategy;
+import org.apache.druid.client.selector.PooledTierSelectorStrategyConfig;
import org.apache.druid.client.selector.PreferredTierSelectorStrategy;
import org.apache.druid.client.selector.PreferredTierSelectorStrategyConfig;
import org.apache.druid.client.selector.ServerSelectorStrategy;
+import org.apache.druid.client.selector.StrictTierSelectorStrategy;
+import org.apache.druid.client.selector.StrictTierSelectorStrategyConfig;
import org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.JsonConfigurator;
@@ -39,6 +43,7 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import javax.inject.Named;
import java.util.Properties;
@@ -153,6 +158,24 @@ public class BrokerRealtimeSelectorModule implements
DruidModule
log.info("Creating PreferredTierSelectorStrategy for realtime servers
with config[%s]", config);
return new
PreferredTierSelectorStrategy(realtimeServerSelectorStrategy, config);
+ } else if (StrictTierSelectorStrategy.TYPE.equals(realtimeTier)) {
+ final StrictTierSelectorStrategyConfig config =
configurator.configurate(
+ properties,
+ REALTIME_SELECT_TIER_PROPERTY + "." + realtimeTier,
+ StrictTierSelectorStrategyConfig.class
+ );
+
+ log.info("Creating StrictTierSelectorStrategy for realtime servers
with config[%s]", config);
+ return new StrictTierSelectorStrategy(realtimeServerSelectorStrategy,
config, injector.getInstance(ServiceEmitter.class));
+ } else if (PooledTierSelectorStrategy.TYPE.equals(realtimeTier)) {
+ final PooledTierSelectorStrategyConfig config =
configurator.configurate(
+ properties,
+ REALTIME_SELECT_TIER_PROPERTY + "." + realtimeTier,
+ PooledTierSelectorStrategyConfig.class
+ );
+
+ log.info("Creating PooledTierSelectorStrategy for realtime servers
with config[%s]", config);
+ return new PooledTierSelectorStrategy(realtimeServerSelectorStrategy,
config, injector.getInstance(ServiceEmitter.class));
} else {
return configurator.configurate(properties,
"druid.broker.realtime.select", TierSelectorStrategy.class);
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java
b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index be38a7ff030..b9e39479b06 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -38,8 +38,10 @@ import org.apache.druid.client.QueryableDruidServer;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.selector.CustomTierSelectorStrategyConfig;
+import org.apache.druid.client.selector.PooledTierSelectorStrategyConfig;
import org.apache.druid.client.selector.PreferredTierSelectorStrategyConfig;
import org.apache.druid.client.selector.ServerSelectorStrategy;
+import org.apache.druid.client.selector.StrictTierSelectorStrategyConfig;
import org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.druid.curator.ZkEnablementConfig;
import org.apache.druid.discovery.NodeRole;
@@ -156,6 +158,8 @@ public class CliBroker extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.broker.select",
TierSelectorStrategy.class);
JsonConfigProvider.bind(binder, "druid.broker.select.tier.custom",
CustomTierSelectorStrategyConfig.class);
JsonConfigProvider.bind(binder,
"druid.broker.select.tier.preferred",
PreferredTierSelectorStrategyConfig.class);
+ JsonConfigProvider.bind(binder, "druid.broker.select.tier.strict",
StrictTierSelectorStrategyConfig.class);
+ JsonConfigProvider.bind(binder, "druid.broker.select.tier.pooled",
PooledTierSelectorStrategyConfig.class);
JsonConfigProvider.bind(binder, "druid.broker.balancer",
ServerSelectorStrategy.class);
JsonConfigProvider.bind(binder, "druid.broker.retryPolicy",
RetryQueryRunnerConfig.class);
diff --git a/services/src/test/java/org/apache/druid/cli/CliBrokerTest.java
b/services/src/test/java/org/apache/druid/cli/CliBrokerTest.java
index 66cb6a1a39d..ad7c14c3a57 100644
--- a/services/src/test/java/org/apache/druid/cli/CliBrokerTest.java
+++ b/services/src/test/java/org/apache/druid/cli/CliBrokerTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.cli;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
+import com.google.inject.ProvisionException;
import com.google.inject.Scopes;
import com.google.inject.name.Names;
import org.apache.druid.client.BrokerServerView;
@@ -29,10 +30,12 @@ import
org.apache.druid.client.selector.ConnectionCountServerSelectorStrategy;
import org.apache.druid.client.selector.CustomTierSelectorStrategy;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.LowestPriorityTierSelectorStrategy;
+import org.apache.druid.client.selector.PooledTierSelectorStrategy;
import org.apache.druid.client.selector.PreferredTierSelectorStrategy;
import org.apache.druid.client.selector.PreferredTierSelectorStrategyConfig;
import org.apache.druid.client.selector.RandomServerSelectorStrategy;
import org.apache.druid.client.selector.ServerSelectorStrategy;
+import org.apache.druid.client.selector.StrictTierSelectorStrategy;
import org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.LazySingleton;
@@ -269,6 +272,164 @@ public class CliBrokerTest
Assert.assertSame(realtimeBalancer, historicalBalancer);
}
+ @Test
+ public void testRealtimeStrictStrategy()
+ {
+ final Properties properties = new Properties();
+ properties.setProperty("druid.broker.realtime.select.tier", "strict");
+
properties.setProperty("druid.broker.realtime.select.tier.strict.priorities",
"[2,1,0]");
+ properties.setProperty("druid.broker.balancer.type", "random");
+
+ final Injector injector = makeBrokerInjector(properties);
+
+ final TierSelectorStrategy realtime = injector.getInstance(
+ Key.get(TierSelectorStrategy.class,
Names.named(BrokerServerView.REALTIME_SELECTOR))
+ );
+
+ Assert.assertTrue(realtime instanceof StrictTierSelectorStrategy);
+ Assert.assertEquals(List.of(2, 1, 0), ((StrictTierSelectorStrategy)
realtime).getConfig().getPriorities());
+
+ // Historical should use default (highest priority)
+ Assert.assertTrue(injector.getInstance(TierSelectorStrategy.class)
instanceof HighestPriorityTierSelectorStrategy);
+
+ final ServerSelectorStrategy realtimeBalancer = injector.getInstance(
+ Key.get(ServerSelectorStrategy.class,
Names.named(BrokerServerView.REALTIME_SELECTOR))
+ );
+ Assert.assertTrue(realtimeBalancer instanceof
RandomServerSelectorStrategy);
+ }
+
+ @Test
+ public void testStrictStrategyForHistoricalAndRealtimeServers()
+ {
+ final Properties properties = new Properties();
+ properties.setProperty("druid.broker.select.tier", "strict");
+ properties.setProperty("druid.broker.select.tier.strict.priorities",
"[0,1]");
+ properties.setProperty("druid.broker.balancer.type", "random");
+
+ properties.setProperty("druid.broker.realtime.select.tier", "strict");
+
properties.setProperty("druid.broker.realtime.select.tier.strict.priorities",
"[2,1,0]");
+ properties.setProperty("druid.broker.realtime.balancer.type",
"connectionCount");
+
+ final Injector injector = makeBrokerInjector(properties);
+ final TierSelectorStrategy historical =
injector.getInstance(TierSelectorStrategy.class);
+ final TierSelectorStrategy realtime = injector.getInstance(
+ Key.get(TierSelectorStrategy.class,
Names.named(BrokerServerView.REALTIME_SELECTOR))
+ );
+
+ Assert.assertTrue(historical instanceof StrictTierSelectorStrategy);
+ Assert.assertEquals(List.of(0, 1), ((StrictTierSelectorStrategy)
historical).getConfig().getPriorities());
+
+ Assert.assertTrue(realtime instanceof StrictTierSelectorStrategy);
+ Assert.assertEquals(List.of(2, 1, 0), ((StrictTierSelectorStrategy)
realtime).getConfig().getPriorities());
+ }
+
+ @Test
+ public void testEmptyStrictPrioritiesThrowsException()
+ {
+ final Properties properties = new Properties();
+ properties.setProperty("druid.broker.select.tier", "strict");
+ properties.setProperty("druid.broker.realtime.select.tier", "strict");
+
+ final Injector injector = makeBrokerInjector(properties);
+ ProvisionException e1 = Assert.assertThrows(ProvisionException.class, ()
-> injector.getInstance(TierSelectorStrategy.class));
+ Assert.assertTrue(e1.getMessage().contains(
+ "Problem parsing object at prefix[druid.broker.select.tier.strict]:
Cannot construct instance of"
+ + " `StrictTierSelectorStrategyConfig`, problem: priorities must be
non-empty when using strict tier selector on the Broker. Found
priorities[null]."
+ ));
+
+ ProvisionException e2 = Assert.assertThrows(ProvisionException.class, ()
-> injector.getInstance(
+ Key.get(TierSelectorStrategy.class,
Names.named(BrokerServerView.REALTIME_SELECTOR))
+ ));
+ Assert.assertTrue(e2.getMessage().contains(
+ "Problem parsing object at
prefix[druid.broker.realtime.select.tier.strict]: Cannot construct instance of"
+ + " `StrictTierSelectorStrategyConfig`, problem: priorities must be
non-empty when using strict tier selector on the Broker. Found
priorities[null]."
+ ));
+ }
+
+ @Test
+ public void testHistoricalAndRealtimePooledStrategies()
+ {
+ final Properties properties = new Properties();
+ properties.setProperty("druid.broker.select.tier", "pooled");
+ properties.setProperty("druid.broker.select.tier.pooled.priorities",
"[0]");
+ properties.setProperty("druid.broker.balancer.type", "random");
+
+ properties.setProperty("druid.broker.realtime.select.tier", "pooled");
+
properties.setProperty("druid.broker.realtime.select.tier.pooled.priorities",
"[2,1]");
+ properties.setProperty("druid.broker.realtime.balancer.type",
"connectionCount");
+
+ final Injector injector = makeBrokerInjector(properties);
+ final TierSelectorStrategy historical =
injector.getInstance(TierSelectorStrategy.class);
+ final TierSelectorStrategy realtime = injector.getInstance(
+ Key.get(TierSelectorStrategy.class,
Names.named(BrokerServerView.REALTIME_SELECTOR))
+ );
+
+ // Verify tier selector strategies with strategy-specific config paths
+ Assert.assertTrue(historical instanceof PooledTierSelectorStrategy);
+ Assert.assertEquals(Set.of(0), ((PooledTierSelectorStrategy)
historical).getConfig().getPriorities());
+
+ Assert.assertTrue(realtime instanceof PooledTierSelectorStrategy);
+ Assert.assertEquals(Set.of(2, 1), ((PooledTierSelectorStrategy)
realtime).getConfig().getPriorities());
+
+ // Verify different server selector strategies
+ final ServerSelectorStrategy historicalBalancer =
injector.getInstance(ServerSelectorStrategy.class);
+ final ServerSelectorStrategy realtimeBalancer = injector.getInstance(
+ Key.get(ServerSelectorStrategy.class,
Names.named(BrokerServerView.REALTIME_SELECTOR))
+ );
+
+ Assert.assertTrue(historicalBalancer instanceof
RandomServerSelectorStrategy);
+ Assert.assertTrue(realtimeBalancer instanceof
ConnectionCountServerSelectorStrategy);
+ }
+
+ @Test
+ public void testRealtimePooledStrategy()
+ {
+ final Properties properties = new Properties();
+ properties.setProperty("druid.broker.realtime.select.tier", "pooled");
+
properties.setProperty("druid.broker.realtime.select.tier.pooled.priorities",
"[2,1,0]");
+ properties.setProperty("druid.broker.balancer.type", "random");
+
+ final Injector injector = makeBrokerInjector(properties);
+
+ final TierSelectorStrategy realtime = injector.getInstance(
+ Key.get(TierSelectorStrategy.class,
Names.named(BrokerServerView.REALTIME_SELECTOR))
+ );
+
+ Assert.assertTrue(realtime instanceof PooledTierSelectorStrategy);
+ Assert.assertEquals(Set.of(2, 1, 0), ((PooledTierSelectorStrategy)
realtime).getConfig().getPriorities());
+
+ // Historical should use default (highest priority)
+ Assert.assertTrue(injector.getInstance(TierSelectorStrategy.class)
instanceof HighestPriorityTierSelectorStrategy);
+
+ final ServerSelectorStrategy realtimeBalancer = injector.getInstance(
+ Key.get(ServerSelectorStrategy.class,
Names.named(BrokerServerView.REALTIME_SELECTOR))
+ );
+ Assert.assertTrue(realtimeBalancer instanceof
RandomServerSelectorStrategy);
+ }
+
+ @Test
+ public void testEmptyPooledPrioritiesThrowsException()
+ {
+ final Properties properties = new Properties();
+ properties.setProperty("druid.broker.select.tier", "pooled");
+ properties.setProperty("druid.broker.realtime.select.tier", "pooled");
+
+ final Injector injector = makeBrokerInjector(properties);
+ ProvisionException e1 = Assert.assertThrows(ProvisionException.class, ()
-> injector.getInstance(TierSelectorStrategy.class));
+ Assert.assertTrue(e1.getMessage().contains(
+ "Problem parsing object at prefix[druid.broker.select.tier.pooled]:
Cannot construct instance of"
+ + " `PooledTierSelectorStrategyConfig`, problem: priorities must be
non-empty when using pooled tier selector on the Broker. Found
priorities[null]."
+ ));
+
+ ProvisionException e2 = Assert.assertThrows(ProvisionException.class, ()
-> injector.getInstance(
+ Key.get(TierSelectorStrategy.class,
Names.named(BrokerServerView.REALTIME_SELECTOR))
+ ));
+ Assert.assertTrue(e2.getMessage().contains(
+ "Problem parsing object at
prefix[druid.broker.realtime.select.tier.pooled]: Cannot construct instance of"
+ + " `PooledTierSelectorStrategyConfig`, problem: priorities must be
non-empty when using pooled tier selector on the Broker. Found
priorities[null]."
+ ));
+ }
+
private Injector makeBrokerInjector(final Properties props)
{
final Injector baseInjector = Guice.createInjector(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]