This is an automated email from the ASF dual-hosted git repository.

jtuglu1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 111e7338113 feat: add realtimeSegmentsMode query context param (#19486)
111e7338113 is described below

commit 111e7338113220bc62987062c1c40a55f9365703
Author: jtuglu1 <[email protected]>
AuthorDate: Wed May 20 18:12:12 2026 -0700

    feat: add realtimeSegmentsMode query context param (#19486)
    
    This adds the query context parameter realtimeSegmentsMode and deprecates 
realtimeSegmentsOnly. realtimeSegmentsOnly=true maps to 
realtimeSegmentsOnly=exclusive and realtimeSegmentsOnly=false maps to 
realtimeSegmentsOnly=include.
    
    This is useful when performing things like blue/green deployments and you 
only want to query new historical replica ASGs and not touch any "live" nodes 
(neither realtime nor historical).
---
 docs/querying/query-context-reference.md           |  3 +-
 .../java/org/apache/druid/query/QueryContext.java  | 43 ++++++++++-
 .../java/org/apache/druid/query/QueryContexts.java | 42 +++++++++++
 .../org/apache/druid/query/QueryContextTest.java   | 72 +++++++++++++++++++
 .../druid/client/CachingClusteredClient.java       | 18 ++++-
 .../druid/client/CachingClusteredClientTest.java   | 83 +++++++++++++++++++---
 .../query-context-completions.ts                   | 12 +++-
 7 files changed, 258 insertions(+), 15 deletions(-)

diff --git a/docs/querying/query-context-reference.md 
b/docs/querying/query-context-reference.md
index 511e8b13b69..c485c0231c0 100644
--- a/docs/querying/query-context-reference.md
+++ b/docs/querying/query-context-reference.md
@@ -71,7 +71,8 @@ Unless otherwise noted, the following parameters apply to all 
query types, and t
 |`setProcessingThreadNames`|`true`| Whether processing thread names will be 
set to `queryType_dataSource_intervals` while processing a query. This aids in 
interpreting thread dumps, and is on by default. Query overhead can be reduced 
slightly by setting this to `false`. This has a tiny effect in most scenarios, 
but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
 |`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge 
two Project operators when inlining expressions causes complexity to increase. 
Implemented as a workaround to exception `There are not enough rules to produce 
a node with desired properties: convention=DRUID, sort=[]` thrown after 
rejecting the merge of two projects.|
 |`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should 
be queried by brokers. Clone servers are created by the `cloneServers` 
Coordinator dynamic configuration. Possible values are `excludeClones`, 
`includeClones` and `preferClones`. `excludeClones` means that clone 
Historicals are not queried by the broker. `preferClones` indicates that when 
given a choice between the clone Historical and the original Historical which 
is being cloned, the broker chooses the clones [...]
-|`realtimeSegmentsOnly` |`false`| When set to true, only query realtime 
segments. Historical segments are excluded. |
+|`realtimeSegmentsMode` |`include`| Controls whether realtime segments are 
queried. `include` queries all segments, including realtime. `exclude` skips 
realtime segments. `exclusive` queries only realtime segments. |
+|`realtimeSegmentsOnly` |`false`| **Deprecated.** Use 
`realtimeSegmentsMode=exclusive` instead. When set to `true`, this is 
equivalent to `realtimeSegmentsMode=exclusive`. When set to `false`, this is 
equivalent to `realtimeSegmentsMode=include`.|
 
 ## Parameters by query type
 
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java 
b/processing/src/main/java/org/apache/druid/query/QueryContext.java
index 39f325d1089..c2929626002 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContext.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.java.util.common.HumanReadableBytes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.QueryContexts.RealtimeSegmentsMode;
 import org.apache.druid.query.QueryContexts.Vectorize;
 import org.apache.druid.query.filter.InDimFilter;
 import org.apache.druid.query.filter.TypedInFilter;
@@ -781,8 +782,48 @@ public class QueryContext
     return getBoolean(QueryContexts.CTX_PREPLANNED, 
QueryContexts.DEFAULT_PREPLANNED);
   }
 
+  /**
+   * Returns the realtime segments mode for this query. If {@link 
QueryContexts#REALTIME_SEGMENTS_MODE} is absent
+   * or null, falls back to the deprecated {@code realtimeSegmentsOnly} 
boolean: {@code true} maps
+   * to {@link RealtimeSegmentsMode#EXCLUSIVE}; otherwise returns {@link 
RealtimeSegmentsMode#INCLUDE}.
+   * Throws {@link BadQueryContextException} if both fields are set 
simultaneously.
+   */
+  public RealtimeSegmentsMode getRealtimeSegmentsMode()
+  {
+    RealtimeSegmentsMode mode = getEnum(
+        QueryContexts.REALTIME_SEGMENTS_MODE,
+        RealtimeSegmentsMode.class,
+        null
+    );
+    boolean hasDeprecatedFlag = get(QueryContexts.REALTIME_SEGMENTS_ONLY) != 
null;
+    if (mode != null && hasDeprecatedFlag) {
+      throw new BadQueryContextException(
+          StringUtils.format(
+              "Cannot set both [%s] and deprecated [%s]; use [%s] only.",
+              QueryContexts.REALTIME_SEGMENTS_MODE,
+              QueryContexts.REALTIME_SEGMENTS_ONLY,
+              QueryContexts.REALTIME_SEGMENTS_MODE
+          )
+      );
+    }
+    if (mode != null) {
+      return mode;
+    }
+    if (hasDeprecatedFlag) {
+      // Backward-compat: honour the deprecated realtimeSegmentsOnly flag.
+      return getBoolean(QueryContexts.REALTIME_SEGMENTS_ONLY, 
QueryContexts.DEFAULT_REALTIME_SEGMENTS_ONLY)
+             ? RealtimeSegmentsMode.EXCLUSIVE
+             : QueryContexts.DEFAULT_REALTIME_SEGMENTS_MODE;
+    }
+    return QueryContexts.DEFAULT_REALTIME_SEGMENTS_MODE;
+  }
+
+  /**
+   * @deprecated Use {@link #getRealtimeSegmentsMode()} instead.
+   */
+  @Deprecated
   public boolean isRealtimeSegmentsOnly()
   {
-    return getBoolean(QueryContexts.REALTIME_SEGMENTS_ONLY, 
QueryContexts.DEFAULT_REALTIME_SEGMENTS_ONLY);
+    return getRealtimeSegmentsMode() == RealtimeSegmentsMode.EXCLUSIVE;
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java 
b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
index 1cb8aa24cf4..44dffc9a427 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
@@ -146,9 +146,20 @@ public class QueryContexts
   public static final String NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED = 
"COUPLED";
   public static final String NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED = 
"DECOUPLED";
 
+  /**
+   * @deprecated Use {@link #REALTIME_SEGMENTS_MODE} instead.
+   */
+  @Deprecated
   public static final String REALTIME_SEGMENTS_ONLY = "realtimeSegmentsOnly";
+  /**
+   * @deprecated Use {@link #DEFAULT_REALTIME_SEGMENTS_MODE} instead.
+   */
+  @Deprecated
   public static final boolean DEFAULT_REALTIME_SEGMENTS_ONLY = false;
 
+  public static final String REALTIME_SEGMENTS_MODE = "realtimeSegmentsMode";
+  public static final RealtimeSegmentsMode DEFAULT_REALTIME_SEGMENTS_MODE = 
RealtimeSegmentsMode.INCLUDE;
+
   public static final String CTX_PREPLANNED = "prePlanned";
   public static final boolean DEFAULT_PREPLANNED = true;
 
@@ -233,6 +244,37 @@ public class QueryContexts
     }
   }
 
+  /**
+   * Classifies segments by whether a historical replica exists
+   * (see {@link 
org.apache.druid.client.selector.ServerSelector#isRealtimeSegment()}: a segment 
is
+   * "realtime" only when it has realtime servers and zero historical servers).
+   */
+  public enum RealtimeSegmentsMode
+  {
+    /** Query all segments, including realtime (default). */
+    INCLUDE,
+    /** Query only realtime segments. */
+    EXCLUSIVE,
+    /** Skip realtime segments; query only historical. */
+    EXCLUDE;
+
+    @JsonCreator
+    public static RealtimeSegmentsMode fromString(String str)
+    {
+      if (str == null) {
+        return null;
+      }
+      return RealtimeSegmentsMode.valueOf(StringUtils.toUpperCase(str));
+    }
+
+    @Override
+    @JsonValue
+    public String toString()
+    {
+      return StringUtils.toLowerCase(name());
+    }
+  }
+
   private QueryContexts()
   {
   }
diff --git 
a/processing/src/test/java/org/apache/druid/query/QueryContextTest.java 
b/processing/src/test/java/org/apache/druid/query/QueryContextTest.java
index 0cb931e50d2..d5550bc28dc 100644
--- a/processing/src/test/java/org/apache/druid/query/QueryContextTest.java
+++ b/processing/src/test/java/org/apache/druid/query/QueryContextTest.java
@@ -430,6 +430,7 @@ public class QueryContextTest
   }
 
   @Test
+  @SuppressWarnings("deprecation")
   public void testIsRealtimeSegmentsOnly()
   {
     assertFalse(QueryContext.empty().isRealtimeSegmentsOnly());
@@ -440,6 +441,77 @@ public class QueryContextTest
     );
   }
 
+  @Test
+  public void testGetRealtimeSegmentsMode()
+  {
+    assertEquals(
+        QueryContexts.RealtimeSegmentsMode.INCLUDE,
+        QueryContext.empty().getRealtimeSegmentsMode()
+    );
+    assertEquals(
+        QueryContexts.RealtimeSegmentsMode.EXCLUSIVE,
+        QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, 
"exclusive"))
+                   .getRealtimeSegmentsMode()
+    );
+    assertEquals(
+        QueryContexts.RealtimeSegmentsMode.EXCLUDE,
+        QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, 
"exclude"))
+                   .getRealtimeSegmentsMode()
+    );
+    assertEquals(
+        QueryContexts.RealtimeSegmentsMode.INCLUDE,
+        QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, 
"include"))
+                   .getRealtimeSegmentsMode()
+    );
+  }
+
+  @Test
+  public void testGetRealtimeSegmentsModeBackwardCompat()
+  {
+    // realtimeSegmentsOnly=true maps to EXCLUSIVE
+    assertEquals(
+        QueryContexts.RealtimeSegmentsMode.EXCLUSIVE,
+        QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_ONLY, 
true))
+                   .getRealtimeSegmentsMode()
+    );
+    // realtimeSegmentsOnly=false maps to INCLUDE (default)
+    assertEquals(
+        QueryContexts.RealtimeSegmentsMode.INCLUDE,
+        QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_ONLY, 
false))
+                   .getRealtimeSegmentsMode()
+    );
+  }
+
+  @Test
+  public void testGetRealtimeSegmentsModeConflictThrows()
+  {
+    BadQueryContextException e = assertThrows(
+        BadQueryContextException.class,
+        () -> QueryContext.of(ImmutableMap.of(
+            QueryContexts.REALTIME_SEGMENTS_ONLY, true,
+            QueryContexts.REALTIME_SEGMENTS_MODE, "exclude"
+        )).getRealtimeSegmentsMode()
+    );
+    assertEquals(
+        "Cannot set both [realtimeSegmentsMode] and deprecated 
[realtimeSegmentsOnly]; use [realtimeSegmentsMode] only.",
+        e.getMessage()
+    );
+  }
+
+  @Test
+  public void testGetRealtimeSegmentsModeInvalidValue()
+  {
+    BadQueryContextException e = assertThrows(
+        BadQueryContextException.class,
+        () -> 
QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, 
"badvalue"))
+                         .getRealtimeSegmentsMode()
+    );
+    assertEquals(
+        "Expected key [realtimeSegmentsMode] to be referring to one of the 
values [INCLUDE,EXCLUSIVE,EXCLUDE] of enum [RealtimeSegmentsMode], but got 
[badvalue]",
+        e.getMessage()
+    );
+  }
+
   @Test
   public void testSerialization() throws Exception
   {
diff --git 
a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java 
b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index eb0a5a83997..9305b1b88e9 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -61,6 +61,7 @@ import org.apache.druid.query.Queries;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryContexts.RealtimeSegmentsMode;
 import org.apache.druid.query.QueryMetrics;
 import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunner;
@@ -444,7 +445,7 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
       final Set<SegmentServerSelector> segments = new LinkedHashSet<>();
       final SegmentPruner segmentPruner = ev.getSegmentPruner();
 
-      boolean isRealtimeSegmentOnly = query.context().isRealtimeSegmentsOnly();
+      RealtimeSegmentsMode realtimeSegmentsMode = 
query.context().getRealtimeSegmentsMode();
       // Filter unneeded chunks based on partition dimension
       for (TimelineObjectHolder<String, ServerSelector> holder : 
serversLookup) {
         final Collection<PartitionChunk<ServerSelector>> filteredChunks;
@@ -458,8 +459,19 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
         }
         for (PartitionChunk<ServerSelector> chunk : filteredChunks) {
           ServerSelector server = chunk.getObject();
-          if (isRealtimeSegmentOnly && !server.isRealtimeSegment()) {
-            continue; // Skip historical segments when only realtime segments 
are requested
+          switch (realtimeSegmentsMode) {
+            case EXCLUSIVE:
+              if (!server.isRealtimeSegment()) {
+                continue;
+              }
+              break;
+            case EXCLUDE:
+              if (server.isRealtimeSegment()) {
+                continue;
+              }
+              break;
+            case INCLUDE:
+              break;
           }
           final SegmentDescriptor segment = new SegmentDescriptor(
               holder.getInterval(),
diff --git 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index 4f76b7b52c1..94c259314de 100644
--- 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -3132,27 +3132,94 @@ public class CachingClusteredClientTest
     selector.addServerAndUpdateSegment(new QueryableDruidServer(servers[0], 
null), dataSegment);
     timeline.add(interval, "ver", new SingleElementPartitionChunk<>(selector));
 
-    final TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder()
+    // include (default): historical segment is included
+    final TimeBoundaryQuery queryInclude = Druids.newTimeBoundaryQueryBuilder()
             .dataSource(DATA_SOURCE)
             .intervals(new 
MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
-            .context(ImmutableMap.of("realtimeSegmentsOnly", false))
+            .context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, 
"include"))
             .randomQueryId()
             .build();
 
-    final TimeBoundaryQuery query2 = Druids.newTimeBoundaryQueryBuilder()
+    // exclusive: only realtime segments — historical segment is excluded
+    final TimeBoundaryQuery queryExclusive = 
Druids.newTimeBoundaryQueryBuilder()
+            .dataSource(DATA_SOURCE)
+            .intervals(new 
MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
+            .context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, 
"exclusive"))
+            .randomQueryId()
+            .build();
+
+    // backward compat: realtimeSegmentsOnly=true maps to EXCLUSIVE
+    final TimeBoundaryQuery queryLegacyTrue = 
Druids.newTimeBoundaryQueryBuilder()
             .dataSource(DATA_SOURCE)
             .intervals(new 
MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
-            .context(ImmutableMap.of("realtimeSegmentsOnly", true))
+            .context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_ONLY, 
true))
             .randomQueryId()
             .build();
 
     final ResponseContext responseContext = initializeResponseContext();
 
-    getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext);
-    getDefaultQueryRunner().run(QueryPlus.wrap(query2), responseContext);
+    getDefaultQueryRunner().run(QueryPlus.wrap(queryInclude), responseContext);
+    getDefaultQueryRunner().run(QueryPlus.wrap(queryExclusive), 
responseContext);
+    getDefaultQueryRunner().run(QueryPlus.wrap(queryLegacyTrue), 
responseContext);
+
+    final Map<String, Integer> remainingResponseMap = (Map<String, Integer>) 
responseContext.get(ResponseContext.Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS);
+    Assert.assertEquals(1, 
remainingResponseMap.get(queryInclude.getId()).intValue());
+    Assert.assertEquals(0, 
remainingResponseMap.get(queryExclusive.getId()).intValue());
+    Assert.assertEquals(0, 
remainingResponseMap.get(queryLegacyTrue.getId()).intValue());
+  }
+
+  @Test
+  public void testRealtimeSegmentsModeExclude()
+  {
+    final Interval interval = Intervals.of("2016-01-01/2016-01-02");
+    final Interval queryInterval = 
Intervals.of("2016-01-01T14:00:00/2016-01-02T14:00:00");
+    final DataSegment dataSegment = new DataSegment(
+            "dataSource",
+            interval,
+            "ver",
+            ImmutableMap.of("type", "hdfs", "path", "/tmp"),
+            ImmutableList.of("product"),
+            ImmutableList.of("visited_sum"),
+            NoneShardSpec.instance(),
+            9,
+            12334
+    );
+
+    // selector backed only by a realtime server — isRealtimeSegment() == true
+    final DruidServer realtimeServer = new DruidServer(
+            "rt1", "rt1", null, 10, null, ServerType.REALTIME, 
DruidServer.DEFAULT_TIER, 0
+    );
+    final ServerSelector realtimeSelector = new ServerSelector(
+            dataSegment,
+            new HighestPriorityTierSelectorStrategy(new 
RandomServerSelectorStrategy()),
+            HistoricalFilter.IDENTITY_FILTER
+    );
+    realtimeSelector.addServerAndUpdateSegment(new 
QueryableDruidServer(realtimeServer, null), dataSegment);
+    timeline.add(interval, "ver", new 
SingleElementPartitionChunk<>(realtimeSelector));
+
+    // exclude: realtime-only segment is skipped
+    final TimeBoundaryQuery queryExclude = Druids.newTimeBoundaryQueryBuilder()
+            .dataSource(DATA_SOURCE)
+            .intervals(new 
MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
+            .context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, 
"exclude"))
+            .randomQueryId()
+            .build();
+
+    // include: realtime-only segment is included
+    final TimeBoundaryQuery queryInclude = Druids.newTimeBoundaryQueryBuilder()
+            .dataSource(DATA_SOURCE)
+            .intervals(new 
MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
+            .context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, 
"include"))
+            .randomQueryId()
+            .build();
+
+    final ResponseContext responseContext = initializeResponseContext();
+    getDefaultQueryRunner().run(QueryPlus.wrap(queryExclude), responseContext);
+    getDefaultQueryRunner().run(QueryPlus.wrap(queryInclude), responseContext);
+
     final Map<String, Integer> remainingResponseMap = (Map<String, Integer>) 
responseContext.get(ResponseContext.Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS);
-    Assert.assertEquals(1, remainingResponseMap.get(query.getId()).intValue());
-    Assert.assertEquals(0, 
remainingResponseMap.get(query2.getId()).intValue());
+    Assert.assertEquals(0, 
remainingResponseMap.get(queryExclude.getId()).intValue());
+    Assert.assertEquals(1, 
remainingResponseMap.get(queryInclude.getId()).intValue());
   }
 
   @SuppressWarnings("unchecked")
diff --git 
a/web-console/src/dialogs/edit-context-dialog/query-context-completions.ts 
b/web-console/src/dialogs/edit-context-dialog/query-context-completions.ts
index 9e3cb8dca8d..d7e67b7f492 100644
--- a/web-console/src/dialogs/edit-context-dialog/query-context-completions.ts
+++ b/web-console/src/dialogs/edit-context-dialog/query-context-completions.ts
@@ -73,8 +73,8 @@ export const QUERY_CONTEXT_COMPLETIONS: JsonCompletionRule[] 
= [
         documentation: 'Enable vectorized query execution',
       },
       {
-        value: 'realtimeSegmentsOnly',
-        documentation: 'Whether to query only realtime segments',
+        value: 'realtimeSegmentsMode',
+        documentation: 'Controls whether realtime segments are queried',
       },
       {
         value: 'vectorSize',
@@ -152,4 +152,12 @@ export const QUERY_CONTEXT_COMPLETIONS: 
JsonCompletionRule[] = [
       { value: 'force', documentation: 'Force vectorized execution' },
     ],
   },
+  {
+    path: '$.realtimeSegmentsMode',
+    completions: [
+      { value: 'include', documentation: 'Query all segments, including 
realtime (default)' },
+      { value: 'exclude', documentation: 'Skip realtime segments; query only 
historical' },
+      { value: 'exclusive', documentation: 'Query only realtime segments' },
+    ],
+  },
 ];


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to