This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch configurable_broker_realtime_tier_strategy
in repository https://gitbox.apache.org/repos/asf/druid.git

commit 5e5a3224a95f7e6e68fe44741b2fe0964f324dc3
Author: Abhishek Balaji Radhakrishnan <[email protected]>
AuthorDate: Thu Feb 26 13:40:44 2026 -0800

    Make druid.broker.select.realtime.tier configurable for tasks.
    
    Currently the BrokerServerView shares the same strategy configured by
    druid.broker.select.tier for both historicals and realtime servers.
    
    This patch allows operators to optionally overide that behavior based on how
    realtime servers are setup via druid.broker.select.realtime.tier property.
    
    If this property isn't specified, the realtime servers' strategy will 
continue
    to share the historical's strategy (backwards compatble).
---
 .../materializedview/DatasourceOptimizerTest.java  |   1 +
 .../org/apache/druid/client/BrokerServerView.java  |  20 ++-
 .../selector/CustomTierSelectorStrategy.java       |  17 ++
 .../selector/CustomTierSelectorStrategyConfig.java |   8 +
 .../HighestPriorityTierSelectorStrategy.java       |   8 +
 .../LowestPriorityTierSelectorStrategy.java        |   8 +
 .../selector/PreferredTierSelectorStrategy.java    |  16 ++
 .../PreferredTierSelectorStrategyConfig.java       |   9 +
 .../druid/client/selector/ServerSelector.java      |  44 +++--
 .../client/selector/TierSelectorStrategy.java      |   8 +-
 .../apache/druid/client/BrokerServerViewTest.java  |   1 +
 .../cli/BrokerRealtimeTierSelectorModule.java      | 118 ++++++++++++++
 .../main/java/org/apache/druid/cli/CliBroker.java  |   4 +-
 .../java/org/apache/druid/cli/CliBrokerTest.java   | 181 +++++++++++++++++++++
 .../BrokerSegmentMetadataCacheConcurrencyTest.java |   1 +
 15 files changed, 419 insertions(+), 25 deletions(-)

diff --git 
a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
 
b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
index d76242ac98c..f9b67045b49 100644
--- 
a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
+++ 
b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
@@ -330,6 +330,7 @@ public class DatasourceOptimizerTest extends CuratorTestBase
         druidClientFactory,
         baseView,
         new HighestPriorityTierSelectorStrategy(new 
RandomServerSelectorStrategy()),
+        null,
         new NoopServiceEmitter(),
         new BrokerSegmentWatcherConfig(),
         filter
diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java 
b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
index 95dc8636725..bbb60c8a6e3 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
@@ -42,6 +42,8 @@ import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.PartitionChunk;
 
+import javax.annotation.Nullable;
+import javax.inject.Named;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -61,6 +63,7 @@ import java.util.stream.Collectors;
 @ManageLifecycle
 public class BrokerServerView implements TimelineServerView
 {
+  public static final String REALTIME_SELECTOR = "realtime";
   private static final Logger log = new Logger(BrokerServerView.class);
 
   private final Object lock = new Object();
@@ -69,7 +72,8 @@ public class BrokerServerView implements TimelineServerView
   private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> 
timelines = new HashMap<>();
   private final ConcurrentMap<TimelineCallback, Executor> timelineCallbacks = 
new ConcurrentHashMap<>();
   private final QueryableDruidServer.Maker druidClientFactory;
-  private final TierSelectorStrategy tierSelectorStrategy;
+  private final TierSelectorStrategy historicalTierSelectorStrategy;
+  private final TierSelectorStrategy realtimeTierSelectorStrategy;
   private final ServiceEmitter emitter;
   private final BrokerSegmentWatcherConfig segmentWatcherConfig;
   private final Predicate<Pair<DruidServerMetadata, DataSegment>> 
segmentFilter;
@@ -81,7 +85,8 @@ public class BrokerServerView implements TimelineServerView
   public BrokerServerView(
       final QueryableDruidServer.Maker directDruidClientFactory,
       final FilteredServerInventoryView baseView,
-      final TierSelectorStrategy tierSelectorStrategy,
+      final TierSelectorStrategy historicalTierSelectorStrategy,
+      @Named(REALTIME_SELECTOR) @Nullable final TierSelectorStrategy 
realtimeTierSelectorStrategy,
       final ServiceEmitter emitter,
       final BrokerSegmentWatcherConfig segmentWatcherConfig,
       final BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig
@@ -89,7 +94,11 @@ public class BrokerServerView implements TimelineServerView
   {
     this.druidClientFactory = directDruidClientFactory;
     this.baseView = baseView;
-    this.tierSelectorStrategy = tierSelectorStrategy;
+    this.historicalTierSelectorStrategy = historicalTierSelectorStrategy;
+    // If realtime selector strategy is null, then default to historical. This 
was the behavior prior to introduction of realtime strategy configuration.
+    this.realtimeTierSelectorStrategy = realtimeTierSelectorStrategy == null ? 
historicalTierSelectorStrategy : realtimeTierSelectorStrategy;
+    log.info("Using historicalTierSelectorStrategy[%s] and 
realtimeTierSelectorStrategy[%s]", historicalTierSelectorStrategy, 
realtimeTierSelectorStrategy);
+
     this.emitter = emitter;
     this.brokerViewOfCoordinatorConfig = brokerViewOfCoordinatorConfig;
 
@@ -159,7 +168,8 @@ public class BrokerServerView implements TimelineServerView
 
     baseView.registerServerCallback(
         exec,
-        new ServerCallback() {
+        new ServerCallback()
+        {
           @Override
           public CallbackAction serverAdded(DruidServer server)
           {
@@ -270,7 +280,7 @@ public class BrokerServerView implements TimelineServerView
         log.debug("Adding segment[%s] for server[%s]", segment, server);
         ServerSelector selector = selectors.get(segmentId);
         if (selector == null) {
-          selector = new ServerSelector(segment, tierSelectorStrategy, 
brokerViewOfCoordinatorConfig);
+          selector = new ServerSelector(segment, 
historicalTierSelectorStrategy, realtimeTierSelectorStrategy, 
brokerViewOfCoordinatorConfig);
 
           VersionedIntervalTimeline<String, ServerSelector> timeline = 
timelines.get(segment.getDataSource());
           if (timeline == null) {
diff --git 
a/server/src/main/java/org/apache/druid/client/selector/CustomTierSelectorStrategy.java
 
b/server/src/main/java/org/apache/druid/client/selector/CustomTierSelectorStrategy.java
index 68a7ce1d298..28894c8f009 100644
--- 
a/server/src/main/java/org/apache/druid/client/selector/CustomTierSelectorStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/client/selector/CustomTierSelectorStrategy.java
@@ -30,7 +30,10 @@ import java.util.Map;
  */
 public class CustomTierSelectorStrategy extends AbstractTierSelectorStrategy
 {
+  public static final String TYPE = "custom";
+
   private final Comparator<Integer> comparator;
+  private final CustomTierSelectorStrategyConfig config;
 
   @JsonCreator
   public CustomTierSelectorStrategy(
@@ -39,6 +42,7 @@ public class CustomTierSelectorStrategy extends 
AbstractTierSelectorStrategy
   )
   {
     super(serverSelectorStrategy);
+    this.config = config;
 
     final Map<Integer, Integer> lookup = new HashMap<>();
     int pos = 0;
@@ -73,4 +77,17 @@ public class CustomTierSelectorStrategy extends 
AbstractTierSelectorStrategy
   {
     return comparator;
   }
+
+  public CustomTierSelectorStrategyConfig getConfig()
+  {
+    return config;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "CustomTierSelectorStrategy{" +
+           "config=" + config +
+           '}';
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/client/selector/CustomTierSelectorStrategyConfig.java
 
b/server/src/main/java/org/apache/druid/client/selector/CustomTierSelectorStrategyConfig.java
index 3766daf178a..7e70b1704a9 100644
--- 
a/server/src/main/java/org/apache/druid/client/selector/CustomTierSelectorStrategyConfig.java
+++ 
b/server/src/main/java/org/apache/druid/client/selector/CustomTierSelectorStrategyConfig.java
@@ -35,4 +35,12 @@ public class CustomTierSelectorStrategyConfig
   {
     return priorities;
   }
+
+  @Override
+  public String toString()
+  {
+    return "CustomTierSelectorStrategyConfig{" +
+           "priorities=" + priorities +
+           '}';
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/client/selector/HighestPriorityTierSelectorStrategy.java
 
b/server/src/main/java/org/apache/druid/client/selector/HighestPriorityTierSelectorStrategy.java
index 6d5b5a6c6fd..d461da7dcc3 100644
--- 
a/server/src/main/java/org/apache/druid/client/selector/HighestPriorityTierSelectorStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/client/selector/HighestPriorityTierSelectorStrategy.java
@@ -28,6 +28,8 @@ import java.util.Comparator;
  */
 public class HighestPriorityTierSelectorStrategy extends 
AbstractTierSelectorStrategy
 {
+  public static final String TYPE = "highestPriority";
+
   @JsonCreator
   public HighestPriorityTierSelectorStrategy(@JacksonInject 
ServerSelectorStrategy serverSelectorStrategy)
   {
@@ -39,4 +41,10 @@ public class HighestPriorityTierSelectorStrategy extends 
AbstractTierSelectorStr
   {
     return Comparator.reverseOrder();
   }
+
+  @Override
+  public String toString()
+  {
+    return "HighestPriorityTierSelectorStrategy{}";
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/client/selector/LowestPriorityTierSelectorStrategy.java
 
b/server/src/main/java/org/apache/druid/client/selector/LowestPriorityTierSelectorStrategy.java
index 311881662b5..8896ac1a144 100644
--- 
a/server/src/main/java/org/apache/druid/client/selector/LowestPriorityTierSelectorStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/client/selector/LowestPriorityTierSelectorStrategy.java
@@ -28,6 +28,8 @@ import java.util.Comparator;
  */
 public class LowestPriorityTierSelectorStrategy extends 
AbstractTierSelectorStrategy
 {
+  public static final String TYPE = "lowestPriority";
+
   @JsonCreator
   public LowestPriorityTierSelectorStrategy(@JacksonInject 
ServerSelectorStrategy serverSelectorStrategy)
   {
@@ -39,4 +41,10 @@ public class LowestPriorityTierSelectorStrategy extends 
AbstractTierSelectorStra
   {
     return Comparator.naturalOrder();
   }
+
+  @Override
+  public String toString()
+  {
+    return "LowestPriorityTierSelectorStrategy{}";
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/client/selector/PreferredTierSelectorStrategy.java
 
b/server/src/main/java/org/apache/druid/client/selector/PreferredTierSelectorStrategy.java
index 8287830d0c5..cb53bb6e3db 100644
--- 
a/server/src/main/java/org/apache/druid/client/selector/PreferredTierSelectorStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/client/selector/PreferredTierSelectorStrategy.java
@@ -36,10 +36,12 @@ import java.util.Set;
 
 public class PreferredTierSelectorStrategy extends AbstractTierSelectorStrategy
 {
+  public static final String TYPE = "preferred";
   private static final Logger log = new 
Logger(PreferredTierSelectorStrategy.class);
 
   private final String preferredTier;
   private final TierSelectorStrategy priorityStrategy;
+  private final PreferredTierSelectorStrategyConfig config;
 
   public PreferredTierSelectorStrategy(
       @JacksonInject ServerSelectorStrategy serverSelectorStrategy,
@@ -47,6 +49,7 @@ public class PreferredTierSelectorStrategy extends 
AbstractTierSelectorStrategy
   )
   {
     super(serverSelectorStrategy);
+    this.config = config;
     this.preferredTier = config.getTier();
 
     if (config.getPriority() == null) {
@@ -111,4 +114,17 @@ public class PreferredTierSelectorStrategy extends 
AbstractTierSelectorStrategy
 
     return picks;
   }
+
+  public PreferredTierSelectorStrategyConfig getConfig()
+  {
+    return config;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "PreferredTierSelectorStrategy{" +
+           ", config=" + config +
+           '}';
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/client/selector/PreferredTierSelectorStrategyConfig.java
 
b/server/src/main/java/org/apache/druid/client/selector/PreferredTierSelectorStrategyConfig.java
index fda56641159..4b75ed9ce7f 100644
--- 
a/server/src/main/java/org/apache/druid/client/selector/PreferredTierSelectorStrategyConfig.java
+++ 
b/server/src/main/java/org/apache/druid/client/selector/PreferredTierSelectorStrategyConfig.java
@@ -67,4 +67,13 @@ public class PreferredTierSelectorStrategyConfig
   {
     this.priority = priority;
   }
+
+  @Override
+  public String toString()
+  {
+    return "PreferredTierSelectorStrategyConfig{" +
+           "tier='" + tier + '\'' +
+           ", priority='" + priority + '\'' +
+           '}';
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java 
b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java
index 4a26c0ed6fa..352be0f0789 100644
--- a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java
+++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java
@@ -29,6 +29,7 @@ import 
org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.Overshadowable;
+import org.jetbrains.annotations.VisibleForTesting;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
@@ -48,22 +49,35 @@ public class ServerSelector implements 
Overshadowable<ServerSelector>
   @GuardedBy("this")
   private final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> realtimeServers;
 
-  private final TierSelectorStrategy strategy;
+  private final TierSelectorStrategy historicalTierStrategy;
+  private final TierSelectorStrategy realtimeTierStrategy;
 
   private final AtomicReference<DataSegment> segment;
 
   private final HistoricalFilter filter;
 
+  @VisibleForTesting
   public ServerSelector(
       DataSegment segment,
-      TierSelectorStrategy strategy,
+      TierSelectorStrategy historicalTierStrategy,
+      HistoricalFilter filter
+  )
+  {
+    this(segment, historicalTierStrategy, historicalTierStrategy, filter);
+  }
+
+  public ServerSelector(
+      DataSegment segment,
+      TierSelectorStrategy historicalTierStrategy,
+      TierSelectorStrategy realtimeTierStrategy,
       HistoricalFilter filter
   )
   {
     this.segment = new AtomicReference<>(DataSegmentInterner.intern(segment));
-    this.strategy = strategy;
-    this.historicalServers = new 
Int2ObjectRBTreeMap<>(strategy.getComparator());
-    this.realtimeServers = new Int2ObjectRBTreeMap<>(strategy.getComparator());
+    this.historicalTierStrategy = historicalTierStrategy;
+    this.realtimeTierStrategy = realtimeTierStrategy;
+    this.historicalServers = new 
Int2ObjectRBTreeMap<>(historicalTierStrategy.getComparator());
+    this.realtimeServers = new 
Int2ObjectRBTreeMap<>(this.realtimeTierStrategy.getComparator());
     this.filter = filter;
   }
 
@@ -135,16 +149,16 @@ public class ServerSelector implements 
Overshadowable<ServerSelector>
     synchronized (this) {
       if (numCandidates > 0) {
         candidates = new ArrayList<>(numCandidates);
-        strategy.pick(filter.getQueryableServers(historicalServers, 
cloneQueryMode), segment.get(), numCandidates)
-            .stream()
-            .map(server -> server.getServer().getMetadata())
-            .forEach(candidates::add);
+        
historicalTierStrategy.pick(filter.getQueryableServers(historicalServers, 
cloneQueryMode), segment.get(), numCandidates)
+                              .stream()
+                              .map(server -> server.getServer().getMetadata())
+                              .forEach(candidates::add);
 
         if (candidates.size() < numCandidates) { //-V6007: false alarm due to 
a bug in PVS-Studio
-          strategy.pick(realtimeServers, segment.get(), numCandidates - 
candidates.size())
-              .stream()
-              .map(server -> server.getServer().getMetadata())
-              .forEach(candidates::add);
+          realtimeTierStrategy.pick(realtimeServers, segment.get(), 
numCandidates - candidates.size())
+                              .stream()
+                              .map(server -> server.getServer().getMetadata())
+                              .forEach(candidates::add);
         }
         return candidates;
       } else {
@@ -180,9 +194,9 @@ public class ServerSelector implements 
Overshadowable<ServerSelector>
   {
     synchronized (this) {
       if (!historicalServers.isEmpty()) {
-        return strategy.pick(query, 
filter.getQueryableServers(historicalServers, cloneQueryMode), segment.get());
+        return historicalTierStrategy.pick(query, 
filter.getQueryableServers(historicalServers, cloneQueryMode), segment.get());
       }
-      return strategy.pick(query, realtimeServers, segment.get());
+      return realtimeTierStrategy.pick(query, realtimeServers, segment.get());
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/client/selector/TierSelectorStrategy.java
 
b/server/src/main/java/org/apache/druid/client/selector/TierSelectorStrategy.java
index 6eae7db4f34..0cf3b3740ee 100644
--- 
a/server/src/main/java/org/apache/druid/client/selector/TierSelectorStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/client/selector/TierSelectorStrategy.java
@@ -35,10 +35,10 @@ import java.util.Set;
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "tier", defaultImpl = 
HighestPriorityTierSelectorStrategy.class)
 @JsonSubTypes(value = {
-    @JsonSubTypes.Type(name = "highestPriority", value = 
HighestPriorityTierSelectorStrategy.class),
-    @JsonSubTypes.Type(name = "lowestPriority", value = 
LowestPriorityTierSelectorStrategy.class),
-    @JsonSubTypes.Type(name = "custom", value = 
CustomTierSelectorStrategy.class),
-    @JsonSubTypes.Type(name = "preferred", value = 
PreferredTierSelectorStrategy.class),
+    @JsonSubTypes.Type(name = HighestPriorityTierSelectorStrategy.TYPE, value 
= HighestPriorityTierSelectorStrategy.class),
+    @JsonSubTypes.Type(name = LowestPriorityTierSelectorStrategy.TYPE, value = 
LowestPriorityTierSelectorStrategy.class),
+    @JsonSubTypes.Type(name = CustomTierSelectorStrategy.TYPE, value = 
CustomTierSelectorStrategy.class),
+    @JsonSubTypes.Type(name = PreferredTierSelectorStrategy.TYPE, value = 
PreferredTierSelectorStrategy.class),
 })
 public interface TierSelectorStrategy
 {
diff --git 
a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java 
b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
index 5055725bdd1..80a062ee2b7 100644
--- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
@@ -703,6 +703,7 @@ public class BrokerServerViewTest extends CuratorTestBase
         druidClientFactory,
         baseView,
         new HighestPriorityTierSelectorStrategy(new 
RandomServerSelectorStrategy()),
+        null,
         new NoopServiceEmitter(),
         new BrokerSegmentWatcherConfig()
         {
diff --git 
a/services/src/main/java/org/apache/druid/cli/BrokerRealtimeTierSelectorModule.java
 
b/services/src/main/java/org/apache/druid/cli/BrokerRealtimeTierSelectorModule.java
new file mode 100644
index 00000000000..87e34b3b3c1
--- /dev/null
+++ 
b/services/src/main/java/org/apache/druid/cli/BrokerRealtimeTierSelectorModule.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.cli;
+
+import com.google.inject.Binder;
+import com.google.inject.Inject;
+import com.google.inject.Key;
+import com.google.inject.Provider;
+import com.google.inject.name.Names;
+import org.apache.druid.client.BrokerServerView;
+import org.apache.druid.client.selector.CustomTierSelectorStrategy;
+import org.apache.druid.client.selector.CustomTierSelectorStrategyConfig;
+import org.apache.druid.client.selector.PreferredTierSelectorStrategy;
+import org.apache.druid.client.selector.PreferredTierSelectorStrategyConfig;
+import org.apache.druid.client.selector.ServerSelectorStrategy;
+import org.apache.druid.client.selector.TierSelectorStrategy;
+import org.apache.druid.guice.JsonConfigurator;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.util.Properties;
+
+/**
+ * Guice module that configures the {@link TierSelectorStrategy} for {@link 
BrokerServerView#REALTIME_SELECTOR} tiers
+ * in the Broker. The strategy is determined by {@link 
BrokerRealtimeTierSelectorModule#PROPERTY}. If
+ * the property is not configured, then {@link 
RealtimeTierSelectorStrategyProvider#get()} returns
+ * null, which then fallsback to the {@code druid.broker.select.tier} in 
{@link BrokerServerView}.
+ */
+public class BrokerRealtimeTierSelectorModule implements DruidModule
+{
+  private static final Logger log = new 
Logger(BrokerRealtimeTierSelectorModule.class);
+
+  private static final String PROPERTY = "druid.broker.select.realtime.tier";
+
+  @Override
+  public void configure(Binder binder)
+  {
+    binder.bind(Key.get(TierSelectorStrategy.class, 
Names.named(BrokerServerView.REALTIME_SELECTOR)))
+          .toProvider(RealtimeTierSelectorStrategyProvider.class)
+          .in(LazySingleton.class);
+  }
+
+  private static class RealtimeTierSelectorStrategyProvider implements 
Provider<TierSelectorStrategy>
+  {
+    private final Properties properties;
+    private final JsonConfigurator configurator;
+    private final ServerSelectorStrategy serverSelectorStrategy;
+
+    @Inject
+    public RealtimeTierSelectorStrategyProvider(
+        Properties properties,
+        JsonConfigurator configurator,
+        ServerSelectorStrategy serverSelectorStrategy
+    )
+    {
+      this.properties = properties;
+      this.configurator = configurator;
+      this.serverSelectorStrategy = serverSelectorStrategy;
+    }
+
+    @Nullable
+    @Override
+    public TierSelectorStrategy get()
+    {
+      final String realtimeTier = properties.getProperty(PROPERTY);
+      if (realtimeTier == null) {
+        log.info("[%s] is not configured.", PROPERTY);
+        return null;
+      }
+
+      if (CustomTierSelectorStrategy.TYPE.equals(realtimeTier)) {
+        final CustomTierSelectorStrategyConfig config = 
configurator.configurate(
+            properties,
+            "druid.broker.select.realtime.tier.custom",
+            CustomTierSelectorStrategyConfig.class
+        );
+
+        log.info("Creating CustomTierSelectorStrategy for realtime servers 
with config[%s]", config);
+        return new CustomTierSelectorStrategy(serverSelectorStrategy, config);
+      } else if (PreferredTierSelectorStrategy.TYPE.equals(realtimeTier)) {
+        final PreferredTierSelectorStrategyConfig config = 
configurator.configurate(
+            properties,
+            "druid.broker.select.realtime.tier.preferred",
+            PreferredTierSelectorStrategyConfig.class
+        );
+
+        log.info("Creating PreferredTierSelectorStrategy for realtime servers 
with config[%s]", config);
+        return new PreferredTierSelectorStrategy(serverSelectorStrategy, 
config);
+      } else {
+        // For other strategies that don't need config, just fallback to this
+        return configurator.configurate(
+            properties,
+            "druid.broker.select.realtime",
+            TierSelectorStrategy.class
+        );
+      }
+    }
+  }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java 
b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index 99df36c0c43..08fe1cc5156 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -155,6 +155,7 @@ public class CliBroker extends ServerRunnable
           JsonConfigProvider.bind(binder, "druid.broker.select", 
TierSelectorStrategy.class);
           JsonConfigProvider.bind(binder, "druid.broker.select.tier.custom", 
CustomTierSelectorStrategyConfig.class);
           JsonConfigProvider.bind(binder, 
"druid.broker.select.tier.preferred", 
PreferredTierSelectorStrategyConfig.class);
+
           JsonConfigProvider.bind(binder, "druid.broker.balancer", 
ServerSelectorStrategy.class);
           JsonConfigProvider.bind(binder, "druid.broker.retryPolicy", 
RetryQueryRunnerConfig.class);
           JsonConfigProvider.bind(binder, "druid.broker.segment", 
BrokerSegmentWatcherConfig.class);
@@ -209,7 +210,8 @@ public class CliBroker extends ServerRunnable
         new MSQSqlModule(),
         new SqlTaskModule(),
         new DartControllerModule(),
-        new DartControllerMemoryManagementModule()
+        new DartControllerMemoryManagementModule(),
+        new BrokerRealtimeTierSelectorModule()
     );
   }
 }
diff --git a/services/src/test/java/org/apache/druid/cli/CliBrokerTest.java 
b/services/src/test/java/org/apache/druid/cli/CliBrokerTest.java
new file mode 100644
index 00000000000..1474e3f0285
--- /dev/null
+++ b/services/src/test/java/org/apache/druid/cli/CliBrokerTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.cli;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Scopes;
+import com.google.inject.name.Names;
+import org.apache.druid.client.BrokerServerView;
+import org.apache.druid.client.selector.CustomTierSelectorStrategy;
+import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
+import org.apache.druid.client.selector.LowestPriorityTierSelectorStrategy;
+import org.apache.druid.client.selector.PreferredTierSelectorStrategy;
+import org.apache.druid.client.selector.TierSelectorStrategy;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.jackson.JacksonModule;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.validation.Validation;
+import javax.validation.Validator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+public class CliBrokerTest
+{
+
+  @Test
+  public void testDefaultTierSelectorStrategy()
+  {
+    final Injector injector = makeBrokerInjector(new Properties());
+    Assert.assertTrue(injector.getInstance(TierSelectorStrategy.class) 
instanceof HighestPriorityTierSelectorStrategy);
+    Assert.assertNull(
+        injector.getInstance(Key.get(TierSelectorStrategy.class, 
Names.named(BrokerServerView.REALTIME_SELECTOR)))
+    );
+  }
+
+  @Test
+  public void testHistoricalLowestPriorityStrategy()
+  {
+    final Properties properties = new Properties();
+    properties.setProperty("druid.broker.select.tier", "lowestPriority");
+
+    final Injector injector = makeBrokerInjector(properties);
+    Assert.assertTrue(injector.getInstance(TierSelectorStrategy.class) 
instanceof LowestPriorityTierSelectorStrategy);
+    Assert.assertNull(
+        injector.getInstance(Key.get(TierSelectorStrategy.class, 
Names.named(BrokerServerView.REALTIME_SELECTOR)))
+    );
+  }
+
+  @Test
+  public void testRealtimeCustomStrategy()
+  {
+    final Properties properties = new Properties();
+    properties.setProperty("druid.broker.select.realtime.tier", "custom");
+    
properties.setProperty("druid.broker.select.realtime.tier.custom.priorities", 
"[2,1,0]");
+
+    final Injector injector = makeBrokerInjector(properties);
+
+    final TierSelectorStrategy realtime = injector.getInstance(
+        Key.get(TierSelectorStrategy.class, 
Names.named(BrokerServerView.REALTIME_SELECTOR))
+    );
+
+    Assert.assertTrue(realtime instanceof CustomTierSelectorStrategy);
+    Assert.assertEquals(List.of(2, 1, 0), ((CustomTierSelectorStrategy) 
realtime).getConfig().getPriorities());
+
+    Assert.assertTrue(injector.getInstance(TierSelectorStrategy.class) 
instanceof HighestPriorityTierSelectorStrategy);
+  }
+
+  @Test
+  public void testHistoricalAndRealtimeCustomStrategies()
+  {
+    final Properties properties = new Properties();
+    properties.setProperty("druid.broker.select.tier", "custom");
+    properties.setProperty("druid.broker.select.tier.custom.priorities", 
"[0]");
+
+    properties.setProperty("druid.broker.select.realtime.tier", "custom");
+    
properties.setProperty("druid.broker.select.realtime.tier.custom.priorities", 
"[2,1]");
+
+    final Injector injector = makeBrokerInjector(properties);
+    final TierSelectorStrategy historical = 
injector.getInstance(TierSelectorStrategy.class);
+    final TierSelectorStrategy realtime = injector.getInstance(
+        Key.get(TierSelectorStrategy.class, 
Names.named(BrokerServerView.REALTIME_SELECTOR))
+    );
+
+    Assert.assertTrue(historical instanceof CustomTierSelectorStrategy);
+    Assert.assertEquals(List.of(0), ((CustomTierSelectorStrategy) 
historical).getConfig().getPriorities());
+
+    Assert.assertTrue(realtime instanceof CustomTierSelectorStrategy);
+    Assert.assertEquals(List.of(2, 1), ((CustomTierSelectorStrategy) 
realtime).getConfig().getPriorities());
+  }
+
+
+  @Test
+  public void testHistoricalAndRealtimeDifferentStrategies()
+  {
+    final Properties properties = new Properties();
+    properties.setProperty("druid.broker.select.tier", "custom");
+    properties.setProperty("druid.broker.select.tier.custom.priorities", 
"[0]");
+
+    properties.setProperty("druid.broker.select.realtime.tier", 
"lowestPriority");
+
+    final Injector injector = makeBrokerInjector(properties);
+
+    final TierSelectorStrategy historical = 
injector.getInstance(TierSelectorStrategy.class);
+    Assert.assertTrue(historical instanceof CustomTierSelectorStrategy);
+    Assert.assertEquals(List.of(0), ((CustomTierSelectorStrategy) 
historical).getConfig().getPriorities());
+
+    final TierSelectorStrategy realtime = injector.getInstance(
+        Key.get(TierSelectorStrategy.class, 
Names.named(BrokerServerView.REALTIME_SELECTOR))
+    );
+    Assert.assertTrue(realtime instanceof LowestPriorityTierSelectorStrategy);
+  }
+
+  @Test
+  public void testPreferredTierDifferentPreferredStrategies()
+  {
+    final Properties properties = new Properties();
+    properties.setProperty("druid.broker.select.tier", "preferred");
+    properties.setProperty("druid.broker.select.tier.preferred.tier", 
"_default_tier");
+    properties.setProperty("druid.broker.select.tier.preferred.priority", 
"lowest");
+
+    properties.setProperty("druid.broker.select.realtime.tier", "preferred");
+    properties.setProperty("druid.broker.select.realtime.tier.preferred.tier", 
"realtime_tier");
+    
properties.setProperty("druid.broker.select.realtime.tier.preferred.priority", 
"highest");
+
+    final Injector injector = makeBrokerInjector(properties);
+    final TierSelectorStrategy historical = 
injector.getInstance(TierSelectorStrategy.class);
+    Assert.assertTrue(historical instanceof PreferredTierSelectorStrategy);
+    final PreferredTierSelectorStrategy historicalPreferredStrategy = 
(PreferredTierSelectorStrategy) historical;
+    Assert.assertEquals("_default_tier", 
historicalPreferredStrategy.getConfig().getTier());
+    Assert.assertEquals("lowest", 
historicalPreferredStrategy.getConfig().getPriority());
+
+    final TierSelectorStrategy realtime = injector.getInstance(
+        Key.get(TierSelectorStrategy.class, 
Names.named(BrokerServerView.REALTIME_SELECTOR))
+    );
+    Assert.assertTrue(realtime instanceof PreferredTierSelectorStrategy);
+    PreferredTierSelectorStrategy realtimePreferredStrategy = 
(PreferredTierSelectorStrategy) realtime;
+    Assert.assertEquals("realtime_tier", 
realtimePreferredStrategy.getConfig().getTier());
+    Assert.assertEquals("highest", 
realtimePreferredStrategy.getConfig().getPriority());
+  }
+
+  private Injector makeBrokerInjector(final Properties props)
+  {
+    final Injector baseInjector = Guice.createInjector(
+        new JacksonModule(),
+        new LifecycleModule(),
+        binder -> {
+          
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
+          binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
+          binder.bind(Properties.class).toInstance(props);
+        }
+    );
+
+    final CliBroker broker = new CliBroker();
+    broker.configure(props);
+    broker.configure(props, baseInjector);
+    return broker.makeInjector(Set.of(NodeRole.BROKER));
+  }
+}
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
index a5df83b2479..8aa1f5f7bbf 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
@@ -397,6 +397,7 @@ public class BrokerSegmentMetadataCacheConcurrencyTest 
extends BrokerSegmentMeta
         druidClientFactory,
         baseView,
         new HighestPriorityTierSelectorStrategy(new 
RandomServerSelectorStrategy()),
+        null,
         new NoopServiceEmitter(),
         new BrokerSegmentWatcherConfig(),
         filter


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to