This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b8eafc1bf54 Add pluggable table samplers with precomputed broker 
routing entries and tableSampler query option (#17532)
b8eafc1bf54 is described below

commit b8eafc1bf54d8a80770a94be2e0fe59e7e04e13c
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Feb 11 23:07:06 2026 -0800

    Add pluggable table samplers with precomputed broker routing entries and 
tableSampler query option (#17532)
---
 .../broker/broker/helix/BaseBrokerStarter.java     |   2 +
 .../routing/manager/BaseBrokerRoutingManager.java  | 166 +++++++++++++++++--
 .../manager/MultiClusterRoutingManager.java        |   9 +-
 .../tablesampler/FirstNSegmentsTableSampler.java   |  76 +++++++++
 .../broker/routing/tablesampler/TableSampler.java  |  48 ++++++
 .../routing/tablesampler/TableSamplerFactory.java  | 164 +++++++++++++++++++
 .../routing/manager/BrokerRoutingManagerTest.java  |  58 +++++++
 .../manager/MultiClusterRoutingManagerTest.java    |  33 +++-
 .../tablesampler/TableSamplerFactoryTest.java      |  98 ++++++++++++
 .../external/ExternalAnnotatedSampler.java         |  42 +++++
 .../common/utils/config/QueryOptionsUtils.java     |   9 +-
 .../common/utils/config/TableConfigSerDeUtils.java |  14 +-
 .../common/utils/config/QueryOptionsUtilsTest.java |  15 ++
 .../spark/v3/datasource/PinotDataWriter.scala      |   2 +-
 .../apache/pinot/core/routing/RoutingManager.java  |   9 ++
 .../tests/custom/TableSamplerIntegrationTest.java  | 175 +++++++++++++++++++++
 .../apache/pinot/query/routing/WorkerManager.java  |  57 +++++--
 .../index/creator/CLPForwardIndexCreatorTest.java  |   2 +-
 .../segment/local/utils/TableConfigUtilsTest.java  |  21 +--
 .../tablesampler/TableSamplerProvider.java         |  46 ++++++
 .../apache/pinot/spi/config/table/TableConfig.java |  54 ++++++-
 .../config/table/sampler/TableSamplerConfig.java   |  62 ++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |   4 +
 .../spi/utils/builder/TableConfigBuilder.java      |   9 +-
 .../pinot/spi/config/table/TableConfigTest.java    |  34 ++++
 .../airlineStats_offline_table_config.json         |   9 ++
 26 files changed, 1163 insertions(+), 55 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index baa96367ff5..f531faf2aee 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -57,6 +57,7 @@ import 
org.apache.pinot.broker.requesthandler.MultiStageQueryThrottler;
 import 
org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
 import org.apache.pinot.broker.requesthandler.TimeSeriesRequestHandler;
 import org.apache.pinot.broker.routing.manager.BrokerRoutingManager;
+import org.apache.pinot.broker.routing.tablesampler.TableSamplerFactory;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.audit.AuditServiceBinder;
 import org.apache.pinot.common.config.DefaultClusterConfigChangeHandler;
@@ -181,6 +182,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     _clusterName = brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME);
     ServiceStartableUtils.applyClusterConfig(_brokerConf, _zkServers, 
_clusterName, ServiceRole.BROKER);
     applyCustomConfigs(brokerConf);
+    
TableSamplerFactory.init(_brokerConf.subset(CommonConstants.Broker.TABLE_SAMPLER_CONFIG_PREFIX));
     
BrokerContext.getInstance().setQueryOperatorFactoryProvider(createQueryOperatorFactoryProvider(_brokerConf));
 
     PinotInsecureMode.setPinotInInsecureMode(
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
index 320fc2eebbb..2223f7c8325 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
@@ -19,12 +19,14 @@
 package org.apache.pinot.broker.routing.manager;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -38,6 +40,8 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
@@ -63,12 +67,15 @@ import 
org.apache.pinot.broker.routing.segmentpruner.SegmentPruner;
 import org.apache.pinot.broker.routing.segmentpruner.SegmentPrunerFactory;
 import org.apache.pinot.broker.routing.segmentselector.SegmentSelector;
 import org.apache.pinot.broker.routing.segmentselector.SegmentSelectorFactory;
+import org.apache.pinot.broker.routing.tablesampler.TableSampler;
+import org.apache.pinot.broker.routing.tablesampler.TableSamplerFactory;
 import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryManager;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.RoutingTable;
 import org.apache.pinot.core.routing.SegmentsToQuery;
@@ -83,6 +90,7 @@ import 
org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.TimeBoundaryConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -448,14 +456,15 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
 
     // Update routing entry for all tables
     for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+      String tableNameWithType = routingEntry.getTableNameWithType();
       try {
-        Object tableLock = 
getRoutingTableBuildLock(routingEntry.getTableNameWithType());
+        Object tableLock = getRoutingTableBuildLock(tableNameWithType);
         synchronized (tableLock) {
           routingEntry.onInstancesChange(_routableServers, changedServers);
         }
       } catch (Exception e) {
         LOGGER.error("Caught unexpected exception while updating routing entry 
on instances change for table: {}",
-            routingEntry.getTableNameWithType(), e);
+            tableNameWithType, e);
       }
     }
     long updateRoutingEntriesEndTimeMs = System.currentTimeMillis();
@@ -524,14 +533,15 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
     _routableServers = routableServers;
     List<String> changedServers = Collections.singletonList(instanceId);
     for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+      String tableNameWithType = routingEntry.getTableNameWithType();
       try {
-        Object tableLock = 
getRoutingTableBuildLock(routingEntry.getTableNameWithType());
+        Object tableLock = getRoutingTableBuildLock(tableNameWithType);
         synchronized (tableLock) {
           routingEntry.onInstancesChange(_routableServers, changedServers);
         }
       } catch (Exception e) {
         LOGGER.error("Caught unexpected exception while updating routing entry 
when excluding server: {} for table: {}",
-            instanceId, routingEntry.getTableNameWithType(), e);
+            instanceId, tableNameWithType, e);
       }
     }
     LOGGER.info("Excluded server: {} from routing in {}ms (updated {} routing 
entries)", instanceId,
@@ -568,14 +578,15 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
     _routableServers = routableServers;
     List<String> changedServers = Collections.singletonList(instanceId);
     for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+      String tableNameWithType = routingEntry.getTableNameWithType();
       try {
-        Object tableLock = 
getRoutingTableBuildLock(routingEntry.getTableNameWithType());
+        Object tableLock = getRoutingTableBuildLock(tableNameWithType);
         synchronized (tableLock) {
           routingEntry.onInstancesChange(_routableServers, changedServers);
         }
       } catch (Exception e) {
         LOGGER.error("Caught unexpected exception while updating routing entry 
when including server: {} for table: {}",
-            instanceId, routingEntry.getTableNameWithType(), e);
+            instanceId, tableNameWithType, e);
       }
     }
     LOGGER.info("Included server: {} to routing in {}ms (updated {} routing 
entries)", instanceId,
@@ -810,10 +821,52 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
       }
       segmentZkMetadataFetcher.init(idealState, externalView, 
preSelectedOnlineSegments);
 
+      // Build table sampler contexts keyed by normalized sampler name.
+      Map<String, SamplerInfo> samplerInfos = Map.of();
+      List<TableSamplerConfig> tableSamplerConfigs = 
tableConfig.getTableSamplers();
+      if (CollectionUtils.isNotEmpty(tableSamplerConfigs)) {
+        Map<String, SamplerInfo> configuredSamplerInfos = 
Maps.newHashMapWithExpectedSize(tableSamplerConfigs.size());
+        for (TableSamplerConfig samplerConfig : tableSamplerConfigs) {
+          String samplerName = samplerConfig.getName();
+          String samplerType = samplerConfig.getType();
+          if (StringUtils.isBlank(samplerName) || 
StringUtils.isBlank(samplerType)) {
+            LOGGER.warn("Skipping invalid table sampler config for table: {}, 
samplerName: {}, samplerType: {}",
+                tableNameWithType, samplerName, samplerType);
+            continue;
+          }
+          String normalizedSamplerName = normalizeSamplerName(samplerName);
+          if (configuredSamplerInfos.containsKey(normalizedSamplerName)) {
+            LOGGER.warn("Skipping duplicate normalized table sampler name: 
'{}' for table: {}", samplerName,
+                tableNameWithType);
+            continue;
+          }
+          try {
+            TableSampler sampler = TableSamplerFactory.create(samplerType);
+            sampler.init(tableConfig, samplerConfig, _propertyStore);
+            Set<String> samplerPreSelectedOnlineSegments = 
sampler.sampleSegments(preSelectedOnlineSegments);
+            SegmentSelector samplerSegmentSelector = 
SegmentSelectorFactory.getSegmentSelector(tableConfig);
+            samplerSegmentSelector.init(idealState, externalView, 
samplerPreSelectedOnlineSegments);
+            InstanceSelector samplerInstanceSelector =
+                InstanceSelectorFactory.getInstanceSelector(tableConfig, 
_propertyStore, _brokerMetrics,
+                    adaptiveServerSelector, _pinotConfig, _routableServers, 
_enabledServerInstanceMap,
+                    idealState, externalView, 
samplerPreSelectedOnlineSegments);
+            configuredSamplerInfos.put(normalizedSamplerName,
+                new SamplerInfo(sampler, samplerSegmentSelector, 
samplerInstanceSelector));
+          } catch (Exception e) {
+            LOGGER.error("Caught unexpected exception while building routing 
for table sampler: {} for table: {}",
+                samplerName, tableNameWithType, e);
+          }
+        }
+        if (!configuredSamplerInfos.isEmpty()) {
+          samplerInfos = configuredSamplerInfos;
+        }
+      }
+
       RoutingEntry routingEntry =
           new RoutingEntry(tableNameWithType, idealStatePath, 
externalViewPath, segmentPreSelector, segmentSelector,
               segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, segmentZkMetadataFetcher,
-              timeBoundaryManager, partitionMetadataManager, queryTimeoutMs, 
!idealState.isEnabled());
+              timeBoundaryManager, partitionMetadataManager, queryTimeoutMs, 
samplerInfos,
+              !idealState.isEnabled());
       if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
         LOGGER.info("Built routing for table: {}", tableNameWithType);
       } else {
@@ -1048,7 +1101,9 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
     if (routingEntry == null) {
       return null;
     }
-    InstanceSelector.SelectionResult selectionResult = 
routingEntry.calculateRouting(brokerRequest, requestId);
+    String samplerName = extractSamplerName(brokerRequest);
+    InstanceSelector.SelectionResult selectionResult =
+        routingEntry.calculateRouting(brokerRequest, requestId, samplerName);
     return new RoutingTable(getServerInstanceToSegmentsMap(tableNameWithType, 
selectionResult),
         selectionResult.getUnavailableSegments(), 
selectionResult.getNumPrunedSegments());
   }
@@ -1086,9 +1141,30 @@ public abstract class BaseBrokerRoutingManager 
implements RoutingManager, Cluste
   @Nullable
   @Override
   public List<String> getSegments(BrokerRequest brokerRequest) {
+    return getSegments(brokerRequest, extractSamplerName(brokerRequest));
+  }
+
+  @Nullable
+  @Override
+  public List<String> getSegments(BrokerRequest brokerRequest, @Nullable 
String samplerName) {
     String tableNameWithType = brokerRequest.getQuerySource().getTableName();
     RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
-    return routingEntry != null ? routingEntry.getSegments(brokerRequest) : 
null;
+    if (routingEntry == null) {
+      return null;
+    }
+    return routingEntry.getSegments(brokerRequest, samplerName);
+  }
+
+  private static String normalizeSamplerName(String samplerName) {
+    return samplerName.trim().toLowerCase(Locale.ROOT);
+  }
+
+  @Nullable
+  static String extractSamplerName(BrokerRequest brokerRequest) {
+    if (!brokerRequest.isSetPinotQuery()) {
+      return null;
+    }
+    return 
QueryOptionsUtils.getTableSampler(brokerRequest.getPinotQuery().getQueryOptions());
   }
 
   @Override
@@ -1172,6 +1248,18 @@ public abstract class BaseBrokerRoutingManager 
implements RoutingManager, Cluste
     }
   }
 
+  private static final class SamplerInfo {
+    final TableSampler _tableSampler;
+    final SegmentSelector _segmentSelector;
+    final InstanceSelector _instanceSelector;
+
+    SamplerInfo(TableSampler tableSampler, SegmentSelector segmentSelector, 
InstanceSelector instanceSelector) {
+      _tableSampler = tableSampler;
+      _segmentSelector = segmentSelector;
+      _instanceSelector = instanceSelector;
+    }
+  }
+
   private static class RoutingEntry {
     final String _tableNameWithType;
     final String _idealStatePath;
@@ -1182,6 +1270,7 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
     final SegmentPartitionMetadataManager _partitionMetadataManager;
     final InstanceSelector _instanceSelector;
     final Long _queryTimeoutMs;
+    final Map<String, SamplerInfo> _samplerInfos;
     final SegmentZkMetadataFetcher _segmentZkMetadataFetcher;
 
     // Cache IdealState and ExternalView version for the last update
@@ -1197,7 +1286,7 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
         InstanceSelector instanceSelector, int lastUpdateIdealStateVersion, 
int lastUpdateExternalViewVersion,
         SegmentZkMetadataFetcher segmentZkMetadataFetcher, @Nullable 
TimeBoundaryManager timeBoundaryManager,
         @Nullable SegmentPartitionMetadataManager partitionMetadataManager, 
@Nullable Long queryTimeoutMs,
-        boolean disabled) {
+        Map<String, SamplerInfo> samplerInfos, boolean disabled) {
       _tableNameWithType = tableNameWithType;
       _idealStatePath = idealStatePath;
       _externalViewPath = externalViewPath;
@@ -1210,6 +1299,7 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
       _timeBoundaryManager = timeBoundaryManager;
       _partitionMetadataManager = partitionMetadataManager;
       _queryTimeoutMs = queryTimeoutMs;
+      _samplerInfos = samplerInfos;
       _segmentZkMetadataFetcher = segmentZkMetadataFetcher;
       _disabled = disabled;
     }
@@ -1248,6 +1338,40 @@ public abstract class BaseBrokerRoutingManager 
implements RoutingManager, Cluste
       return _disabled;
     }
 
+    private void updateSamplerInfos(IdealState idealState, ExternalView 
externalView,
+        Set<String> preSelectedOnlineSegments) {
+      if (_samplerInfos.isEmpty()) {
+        return;
+      }
+      for (Map.Entry<String, SamplerInfo> entry : _samplerInfos.entrySet()) {
+        String samplerName = entry.getKey();
+        SamplerInfo samplerInfo = entry.getValue();
+        try {
+          Set<String> samplerPreSelectedOnlineSegments =
+              
samplerInfo._tableSampler.sampleSegments(preSelectedOnlineSegments);
+          samplerInfo._segmentSelector.onAssignmentChange(idealState, 
externalView, samplerPreSelectedOnlineSegments);
+          samplerInfo._instanceSelector.onAssignmentChange(idealState, 
externalView, samplerPreSelectedOnlineSegments);
+        } catch (Exception e) {
+          LOGGER.error("Caught unexpected exception while sampling segments 
for sampler: {} for table: {}", samplerName,
+              _tableNameWithType, e);
+        }
+      }
+    }
+
+    @Nullable
+    private SamplerInfo getSamplerInfo(@Nullable String samplerName) {
+      if (StringUtils.isBlank(samplerName)) {
+        return null;
+      }
+      SamplerInfo samplerInfo = 
_samplerInfos.get(normalizeSamplerName(samplerName));
+      if (samplerInfo != null) {
+        return samplerInfo;
+      }
+      LOGGER.warn("Requested sampler '{}' not found for table '{}'; falling 
back to default routing entry",
+          samplerName, _tableNameWithType);
+      return null;
+    }
+
     // NOTE: The change gets applied in sequence, and before change applied to 
all components, there could be some
     // inconsistency between components, which is fine because the 
inconsistency only exists for the newly changed
     // segments and only lasts for a very short time.
@@ -1260,6 +1384,7 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
       if (_timeBoundaryManager != null) {
         _timeBoundaryManager.onAssignmentChange(idealState, externalView, 
preSelectedOnlineSegments);
       }
+      updateSamplerInfos(idealState, externalView, preSelectedOnlineSegments);
       _lastUpdateIdealStateVersion = idealState.getStat().getVersion();
       _lastUpdateExternalViewVersion = externalView.getStat().getVersion();
       _disabled = !idealState.isEnabled();
@@ -1267,6 +1392,11 @@ public abstract class BaseBrokerRoutingManager 
implements RoutingManager, Cluste
 
     void onInstancesChange(Set<String> enabledInstances, List<String> 
changedInstances) {
       _instanceSelector.onInstancesChange(enabledInstances, changedInstances);
+      if (!_samplerInfos.isEmpty()) {
+        for (SamplerInfo samplerInfo : _samplerInfos.values()) {
+          samplerInfo._instanceSelector.onInstancesChange(enabledInstances, 
changedInstances);
+        }
+      }
     }
 
     void refreshSegment(String segment) {
@@ -1276,8 +1406,12 @@ public abstract class BaseBrokerRoutingManager 
implements RoutingManager, Cluste
       }
     }
 
-    InstanceSelector.SelectionResult calculateRouting(BrokerRequest 
brokerRequest, long requestId) {
-      Set<String> selectedSegments = _segmentSelector.select(brokerRequest);
+    InstanceSelector.SelectionResult calculateRouting(BrokerRequest 
brokerRequest, long requestId,
+        @Nullable String samplerName) {
+      SamplerInfo samplerInfo = getSamplerInfo(samplerName);
+      SegmentSelector segmentSelector = samplerInfo != null ? 
samplerInfo._segmentSelector : _segmentSelector;
+      InstanceSelector instanceSelector = samplerInfo != null ? 
samplerInfo._instanceSelector : _instanceSelector;
+      Set<String> selectedSegments = segmentSelector.select(brokerRequest);
       int numTotalSelectedSegments = selectedSegments.size();
       if (!selectedSegments.isEmpty()) {
         for (SegmentPruner segmentPruner : _segmentPruners) {
@@ -1287,7 +1421,7 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
       int numPrunedSegments = numTotalSelectedSegments - 
selectedSegments.size();
       if (!selectedSegments.isEmpty()) {
         InstanceSelector.SelectionResult selectionResult =
-            _instanceSelector.select(brokerRequest, new 
ArrayList<>(selectedSegments), requestId);
+            instanceSelector.select(brokerRequest, new 
ArrayList<>(selectedSegments), requestId);
         selectionResult.setNumPrunedSegments(numPrunedSegments);
         return selectionResult;
       } else {
@@ -1296,8 +1430,10 @@ public abstract class BaseBrokerRoutingManager 
implements RoutingManager, Cluste
       }
     }
 
-    List<String> getSegments(BrokerRequest brokerRequest) {
-      Set<String> selectedSegments = _segmentSelector.select(brokerRequest);
+    List<String> getSegments(BrokerRequest brokerRequest, @Nullable String 
samplerName) {
+      SamplerInfo samplerInfo = getSamplerInfo(samplerName);
+      SegmentSelector segmentSelector = samplerInfo != null ? 
samplerInfo._segmentSelector : _segmentSelector;
+      Set<String> selectedSegments = segmentSelector.select(brokerRequest);
       if (!selectedSegments.isEmpty()) {
         for (SegmentPruner segmentPruner : _segmentPruners) {
           selectedSegments = segmentPruner.prune(brokerRequest, 
selectedSegments);
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManager.java
index 0f247786c76..265e25235b4 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManager.java
@@ -187,14 +187,19 @@ public class MultiClusterRoutingManager implements 
RoutingManager {
 
   @Override
   public List<String> getSegments(BrokerRequest brokerRequest) {
+    return getSegments(brokerRequest, 
BaseBrokerRoutingManager.extractSamplerName(brokerRequest));
+  }
+
+  @Override
+  public List<String> getSegments(BrokerRequest brokerRequest, @Nullable 
String samplerName) {
     List<String> combined = new ArrayList<>();
-    List<String> localSegments = 
_localClusterRoutingManager.getSegments(brokerRequest);
+    List<String> localSegments = 
_localClusterRoutingManager.getSegments(brokerRequest, samplerName);
     if (localSegments != null) {
       combined.addAll(localSegments);
     }
     for (BaseBrokerRoutingManager remoteCluster : 
_remoteClusterRoutingManagers) {
       try {
-        List<String> remoteSegments = remoteCluster.getSegments(brokerRequest);
+        List<String> remoteSegments = remoteCluster.getSegments(brokerRequest, 
samplerName);
         if (remoteSegments != null) {
           combined.addAll(remoteSegments);
         }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/FirstNSegmentsTableSampler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/FirstNSegmentsTableSampler.java
new file mode 100644
index 00000000000..5df93ce2f53
--- /dev/null
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/FirstNSegmentsTableSampler.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.tablesampler;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.annotations.tablesampler.TableSamplerProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+
+
+/**
+ * Selects the first N segments after sorting segment names lexicographically.
+ *
+ * <p>Config:
+ * <ul>
+ *   <li>{@code properties.numSegments}: positive integer</li>
+ * </ul>
+ */
+@TableSamplerProvider(name = FirstNSegmentsTableSampler.TYPE)
+public class FirstNSegmentsTableSampler implements TableSampler {
+  public static final String TYPE = "firstN";
+  public static final String PROP_NUM_SEGMENTS = "numSegments";
+
+  private int _numSegments;
+
+  @Override
+  public void init(TableConfig tableConfig, TableSamplerConfig samplerConfig,
+      ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    Map<String, String> props = samplerConfig.getProperties();
+    if (MapUtils.isEmpty(props) || !props.containsKey(PROP_NUM_SEGMENTS)) {
+      throw new IllegalArgumentException(
+          "Missing required property '" + PROP_NUM_SEGMENTS + "' for table 
sampler type '" + TYPE + "'");
+    }
+    _numSegments = Integer.parseInt(props.get(PROP_NUM_SEGMENTS));
+    if (_numSegments <= 0) {
+      throw new IllegalArgumentException("'" + PROP_NUM_SEGMENTS + "' must be 
positive");
+    }
+  }
+
+  @Override
+  public Set<String> sampleSegments(Set<String> onlineSegments) {
+    if (onlineSegments.isEmpty()) {
+      return Collections.emptySet();
+    }
+    if (onlineSegments.size() <= _numSegments) {
+      return onlineSegments;
+    }
+    List<String> sorted = new ArrayList<>(onlineSegments);
+    Collections.sort(sorted);
+    return new HashSet<>(sorted.subList(0, _numSegments));
+  }
+}
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSampler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSampler.java
new file mode 100644
index 00000000000..1bc88ffc1ba
--- /dev/null
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSampler.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.tablesampler;
+
+import java.util.Set;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+
+
+/**
+ * A {@code TableSampler} deterministically selects a subset of segments from 
the set of online segments for a table.
+ *
+ * <p>Selection is performed during routing table build/update so there is no 
additional per-query overhead beyond
+ * selecting the pre-built routing entry.
+ */
+public interface TableSampler {
+
+  /**
+   * Initializes the sampler for a specific table and sampler config.
+   */
+  void init(TableConfig tableConfig, TableSamplerConfig samplerConfig, 
ZkHelixPropertyStore<ZNRecord> propertyStore);
+
+  /**
+   * Selects a subset of segments from the provided online segments.
+   *
+   * <p>Implementations must not mutate the input set because the same 
pre-selected segment set can be reused by
+   * multiple samplers. Implementations must return a non-null set.
+   */
+  Set<String> sampleSegments(Set<String> onlineSegments);
+}
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactory.java
new file mode 100644
index 00000000000..1be8b236952
--- /dev/null
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactory.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.tablesampler;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.annotations.tablesampler.TableSamplerProvider;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TableSamplerFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableSamplerFactory.class);
+  private static final String ANNOTATION_PACKAGES_KEY = "annotation.packages";
+  // Keep this list in sync with built-in TableSampler locations; additional 
packages can be configured via broker
+  // config, but defaults should always include built-ins.
+  private static final List<String> DEFAULT_ANNOTATION_PACKAGES =
+      List.of("org.apache.pinot.broker.routing.tablesampler");
+  private static final Map<String, String> REGISTERED_TABLE_SAMPLER_CLASS_MAP 
= new ConcurrentHashMap<>();
+
+  private TableSamplerFactory() {
+  }
+
+  public static void init(PinotConfiguration tableSamplerConfig) {
+    if (tableSamplerConfig == null) {
+      return;
+    }
+    registerAnnotatedTableSamplers(tableSamplerConfig);
+  }
+
+  public static void register(String alias, String className) {
+    if (StringUtils.isBlank(alias)) {
+      LOGGER.warn("Skipping table sampler registration because alias is 
blank");
+      return;
+    }
+    if (StringUtils.isBlank(className)) {
+      LOGGER.warn("Skipping table sampler registration for alias '{}' because 
class name is blank", alias);
+      return;
+    }
+    String normalizedAlias = normalizeType(alias);
+    String trimmedClassName = className.trim();
+    String previousClassName = 
REGISTERED_TABLE_SAMPLER_CLASS_MAP.put(normalizedAlias, trimmedClassName);
+    if (previousClassName == null) {
+      LOGGER.info("Registered table sampler alias '{}' -> '{}'", alias, 
trimmedClassName);
+    } else if (!previousClassName.equals(trimmedClassName)) {
+      LOGGER.warn("Overriding table sampler alias '{}' from '{}' to '{}'", 
alias, previousClassName, trimmedClassName);
+    }
+  }
+
+  public static TableSampler create(String type) {
+    String resolvedClassName = resolveClassName(type);
+    String classNameToLoad = resolvedClassName != null ? resolvedClassName : 
type;
+    try {
+      return PluginManager.get().createInstance(classNameToLoad);
+    } catch (Exception e) {
+      String errorMessage = resolvedClassName != null
+          ? String.format("Failed to create TableSampler for alias '%s' mapped 
to class '%s'", type,
+              resolvedClassName)
+          : "Failed to create TableSampler for type: " + type;
+      throw new RuntimeException(errorMessage, e);
+    }
+  }
+
+  @VisibleForTesting
+  static void clearRegistry() {
+    REGISTERED_TABLE_SAMPLER_CLASS_MAP.clear();
+  }
+
+  private static void registerAnnotatedTableSamplers(PinotConfiguration 
tableSamplerConfig) {
+    List<String> configuredPackages = 
getConfiguredAnnotationPackages(tableSamplerConfig);
+    LinkedHashSet<String> combinedPackages = new 
LinkedHashSet<>(DEFAULT_ANNOTATION_PACKAGES);
+    for (String packageName : configuredPackages) {
+      if (StringUtils.isNotBlank(packageName)) {
+        combinedPackages.add(packageName.trim());
+      }
+    }
+    List<String> sanitizedPackages = new ArrayList<>(combinedPackages);
+    if (sanitizedPackages.isEmpty()) {
+      LOGGER.info("No table sampler annotation packages configured");
+      return;
+    }
+    Set<Class<?>> samplerClasses =
+        PinotReflectionUtils.getClassesThroughReflection(sanitizedPackages, 
".*", TableSamplerProvider.class);
+    for (Class<?> samplerClass : samplerClasses) {
+      TableSamplerProvider annotation = 
samplerClass.getAnnotation(TableSamplerProvider.class);
+      if (annotation == null || !annotation.enabled()) {
+        continue;
+      }
+      if (!TableSampler.class.isAssignableFrom(samplerClass)) {
+        LOGGER.warn("Skipping table sampler class '{}' because it does not 
implement TableSampler",
+            samplerClass.getName());
+        continue;
+      }
+      if (!Modifier.isPublic(samplerClass.getModifiers()) || 
Modifier.isAbstract(samplerClass.getModifiers())) {
+        LOGGER.warn("Skipping table sampler class '{}' because it is not a 
public concrete class",
+            samplerClass.getName());
+        continue;
+      }
+      String alias = annotation.name();
+      if (StringUtils.isBlank(alias)) {
+        LOGGER.warn("Skipping table sampler class '{}' because annotation name 
is blank", samplerClass.getName());
+        continue;
+      }
+      register(alias, samplerClass.getName());
+    }
+  }
+
+  private static List<String> 
getConfiguredAnnotationPackages(PinotConfiguration tableSamplerConfig) {
+    String configuredPackages = 
tableSamplerConfig.getProperty(ANNOTATION_PACKAGES_KEY, "");
+    if (StringUtils.isBlank(configuredPackages)) {
+      return List.of();
+    }
+    List<String> packageList = new ArrayList<>();
+    for (String packageName : configuredPackages.split(",")) {
+      if (StringUtils.isNotBlank(packageName)) {
+        packageList.add(packageName.trim());
+      }
+    }
+    return packageList;
+  }
+
+  private static String resolveClassName(String type) {
+    if (StringUtils.isBlank(type)) {
+      return null;
+    }
+    return REGISTERED_TABLE_SAMPLER_CLASS_MAP.get(normalizeType(type));
+  }
+
+  /**
+   * Normalizes a table sampler alias for registry lookup.
+   *
+   * <p>Both registration and lookup go through this method, so aliases are 
matched case-insensitively after trimming.
+   */
+  private static String normalizeType(String type) {
+    return type.trim().toLowerCase(Locale.ROOT);
+  }
+}
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/BrokerRoutingManagerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/BrokerRoutingManagerTest.java
index e03153e2f08..59786554584 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/BrokerRoutingManagerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/BrokerRoutingManagerTest.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.broker.routing.manager;
 
+import java.lang.reflect.Constructor;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Consumer;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
@@ -30,7 +32,17 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.broker.routing.instanceselector.InstanceSelector;
+import 
org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetcher;
+import 
org.apache.pinot.broker.routing.segmentpartition.SegmentPartitionMetadataManager;
+import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
+import org.apache.pinot.broker.routing.segmentpruner.SegmentPruner;
+import org.apache.pinot.broker.routing.segmentselector.SegmentSelector;
+import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryManager;
 import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.core.routing.TablePartitionInfo;
+import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo;
+import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -49,6 +61,7 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 
 
@@ -57,6 +70,7 @@ public class BrokerRoutingManagerTest {
   private static final String SERVER_HOST = "localhost";
   private static final int SERVER_PORT = 8000;
   private static final String INSTANCE_CONFIGS_PATH = "/CONFIGS/PARTICIPANT";
+  private static final String TEST_TABLE = "testTable_OFFLINE";
 
   private AutoCloseable _mocks;
 
@@ -204,6 +218,50 @@ public class BrokerRoutingManagerTest {
     verify(_serverReenableCallback, never()).accept(any());
   }
 
+  @Test
+  public void testSamplerContextSharesTimeBoundaryAndPartitionMetadata()
+      throws Exception {
+    TimeBoundaryManager timeBoundaryManager = mock(TimeBoundaryManager.class);
+    SegmentPartitionMetadataManager partitionMetadataManager = 
mock(SegmentPartitionMetadataManager.class);
+    TimeBoundaryInfo expectedTimeBoundaryInfo = new 
TimeBoundaryInfo("DaysSinceEpoch", "20000");
+    TablePartitionInfo expectedPartitionInfo =
+        new TablePartitionInfo(TEST_TABLE, "partitionCol", "Modulo", 2,
+            List.of(Collections.emptyList(), Collections.emptyList()), 
Collections.emptyList());
+    TablePartitionReplicatedServersInfo expectedReplicatedServersInfo = 
mock(TablePartitionReplicatedServersInfo.class);
+    
when(timeBoundaryManager.getTimeBoundaryInfo()).thenReturn(expectedTimeBoundaryInfo);
+    
when(partitionMetadataManager.getTablePartitionInfo()).thenReturn(expectedPartitionInfo);
+    
when(partitionMetadataManager.getTablePartitionReplicatedServersInfo()).thenReturn(expectedReplicatedServersInfo);
+
+    Object routingEntry = createRoutingEntry(TEST_TABLE, timeBoundaryManager, 
partitionMetadataManager, Map.of());
+    putRoutingEntry(TEST_TABLE, routingEntry);
+
+    assertSame(_routingManager.getTimeBoundaryInfo(TEST_TABLE), 
expectedTimeBoundaryInfo);
+    assertSame(_routingManager.getTablePartitionInfo(TEST_TABLE), 
expectedPartitionInfo);
+    
assertSame(_routingManager.getTablePartitionReplicatedServersInfo(TEST_TABLE), 
expectedReplicatedServersInfo);
+  }
+
+  private static Object createRoutingEntry(String tableNameWithType, 
TimeBoundaryManager timeBoundaryManager,
+      SegmentPartitionMetadataManager partitionMetadataManager, Map<String, ?> 
samplerInfos)
+      throws Exception {
+    Class<?> routingEntryClass = 
Class.forName(BaseBrokerRoutingManager.class.getName() + "$RoutingEntry");
+    Constructor<?> constructor = 
routingEntryClass.getDeclaredConstructor(String.class, String.class, 
String.class,
+        SegmentPreSelector.class, SegmentSelector.class, List.class, 
InstanceSelector.class, int.class, int.class,
+        SegmentZkMetadataFetcher.class, TimeBoundaryManager.class, 
SegmentPartitionMetadataManager.class, Long.class,
+        Map.class, boolean.class);
+    constructor.setAccessible(true);
+    return constructor.newInstance(tableNameWithType, "/IDEALSTATES/" + 
tableNameWithType,
+        "/EXTERNALVIEW/" + tableNameWithType, mock(SegmentPreSelector.class), 
mock(SegmentSelector.class),
+        Collections.<SegmentPruner>emptyList(), mock(InstanceSelector.class), 
1, 1,
+        mock(SegmentZkMetadataFetcher.class), timeBoundaryManager, 
partitionMetadataManager, null, samplerInfos,
+        false);
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private void putRoutingEntry(String tableNameWithType, Object routingEntry) {
+    Map routingEntries = _routingManager._routingEntryMap;
+    routingEntries.put(tableNameWithType, routingEntry);
+  }
+
   /**
    * Creates a ZNRecord representing an enabled server instance.
    */
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManagerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManagerTest.java
index f18cfbaf037..b734e2d635a 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManagerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManagerTest.java
@@ -198,9 +198,9 @@ public class MultiClusterRoutingManagerTest {
     BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE);
     List<String> remoteSegments = Arrays.asList("seg1");
 
-    
when(_localClusterRoutingManager.getSegments(brokerRequest)).thenReturn(null);
-    
when(_remoteClusterRoutingManager1.getSegments(brokerRequest)).thenReturn(remoteSegments);
-    
when(_remoteClusterRoutingManager2.getSegments(brokerRequest)).thenReturn(null);
+    when(_localClusterRoutingManager.getSegments(brokerRequest, 
null)).thenReturn(null);
+    when(_remoteClusterRoutingManager1.getSegments(brokerRequest, 
null)).thenReturn(remoteSegments);
+    when(_remoteClusterRoutingManager2.getSegments(brokerRequest, 
null)).thenReturn(null);
 
     List<String> result = 
_multiClusterRoutingManager.getSegments(brokerRequest);
 
@@ -212,15 +212,36 @@ public class MultiClusterRoutingManagerTest {
   @Test
   public void testGetSegmentsReturnsNullWhenAllNull() {
     BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE);
-    
when(_localClusterRoutingManager.getSegments(brokerRequest)).thenReturn(null);
-    
when(_remoteClusterRoutingManager1.getSegments(brokerRequest)).thenReturn(null);
-    
when(_remoteClusterRoutingManager2.getSegments(brokerRequest)).thenReturn(null);
+    when(_localClusterRoutingManager.getSegments(brokerRequest, 
null)).thenReturn(null);
+    when(_remoteClusterRoutingManager1.getSegments(brokerRequest, 
null)).thenReturn(null);
+    when(_remoteClusterRoutingManager2.getSegments(brokerRequest, 
null)).thenReturn(null);
 
     List<String> result = 
_multiClusterRoutingManager.getSegments(brokerRequest);
 
     assertNull(result);
   }
 
+  @Test
+  public void testGetSegmentsWithSamplerName() {
+    BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE);
+    List<String> localSegments = Arrays.asList("localSeg");
+    List<String> remoteSegments = Arrays.asList("remoteSeg");
+
+    when(_localClusterRoutingManager.getSegments(brokerRequest, 
"firstOnly")).thenReturn(localSegments);
+    when(_remoteClusterRoutingManager1.getSegments(brokerRequest, 
"firstOnly")).thenReturn(remoteSegments);
+    when(_remoteClusterRoutingManager2.getSegments(brokerRequest, 
"firstOnly")).thenReturn(null);
+
+    List<String> result = 
_multiClusterRoutingManager.getSegments(brokerRequest, "firstOnly");
+
+    assertNotNull(result);
+    assertEquals(result.size(), 2);
+    assertTrue(result.contains("localSeg"));
+    assertTrue(result.contains("remoteSeg"));
+    verify(_localClusterRoutingManager, never()).getSegments(brokerRequest);
+    verify(_remoteClusterRoutingManager1, never()).getSegments(brokerRequest);
+    verify(_remoteClusterRoutingManager2, never()).getSegments(brokerRequest);
+  }
+
   @Test
   public void testGetEnabledServerInstanceMapCombinesAll() {
     ServerInstance server1 = createMockServerInstance("server1");
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactoryTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactoryTest.java
new file mode 100644
index 00000000000..f3c575d2891
--- /dev/null
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactoryTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.tablesampler;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import 
org.apache.pinot.broker.routing.tablesampler.external.ExternalAnnotatedSampler;
+import org.apache.pinot.spi.annotations.tablesampler.TableSamplerProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TableSamplerFactoryTest {
+
+  @BeforeMethod
+  public void setUp() {
+    TableSamplerFactory.clearRegistry();
+  }
+
+  @Test
+  public void testRegisterAndCreate() {
+    TableSamplerFactory.register("customFirst", 
FirstNSegmentsTableSampler.class.getName());
+
+    TableSampler sampler = TableSamplerFactory.create("customFirst");
+
+    Assert.assertTrue(sampler instanceof FirstNSegmentsTableSampler);
+  }
+
+  @Test
+  public void testDefaultAnnotationRegistration() {
+    TableSamplerFactory.init(new PinotConfiguration());
+
+    TableSampler sampler = TableSamplerFactory.create("annotatedSampler");
+
+    Assert.assertTrue(sampler instanceof AnnotatedSampler);
+  }
+
+  @Test
+  public void testConfiguredPackagesDoNotDisableDefault() {
+    PinotConfiguration config = new PinotConfiguration(
+        Map.of(CommonConstants.Broker.TABLE_SAMPLER_CONFIG_PREFIX + 
".annotation.packages", "com.acme.missing"));
+
+    
TableSamplerFactory.init(config.subset(CommonConstants.Broker.TABLE_SAMPLER_CONFIG_PREFIX));
+
+    TableSampler sampler = TableSamplerFactory.create("annotatedSampler");
+
+    Assert.assertTrue(sampler instanceof AnnotatedSampler);
+  }
+
+  @Test
+  public void testConfiguredPackagesLoadExternalSampler() {
+    PinotConfiguration config = new PinotConfiguration(Map.of(
+        CommonConstants.Broker.TABLE_SAMPLER_CONFIG_PREFIX + 
".annotation.packages",
+        "org.apache.pinot.broker.routing.tablesampler.external"));
+
+    
TableSamplerFactory.init(config.subset(CommonConstants.Broker.TABLE_SAMPLER_CONFIG_PREFIX));
+
+    TableSampler sampler = 
TableSamplerFactory.create(ExternalAnnotatedSampler.TYPE);
+
+    Assert.assertTrue(sampler instanceof ExternalAnnotatedSampler);
+  }
+
+  @TableSamplerProvider(name = "annotatedSampler")
+  public static class AnnotatedSampler implements TableSampler {
+    @Override
+    public void init(TableConfig tableConfig, TableSamplerConfig samplerConfig,
+        ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    }
+
+    @Override
+    public Set<String> sampleSegments(Set<String> onlineSegments) {
+      return onlineSegments;
+    }
+  }
+}
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/tablesampler/external/ExternalAnnotatedSampler.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/tablesampler/external/ExternalAnnotatedSampler.java
new file mode 100644
index 00000000000..0a30a8a2210
--- /dev/null
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/tablesampler/external/ExternalAnnotatedSampler.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.tablesampler.external;
+
+import java.util.Set;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.broker.routing.tablesampler.TableSampler;
+import org.apache.pinot.spi.annotations.tablesampler.TableSamplerProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+
+@TableSamplerProvider(name = ExternalAnnotatedSampler.TYPE)
+public class ExternalAnnotatedSampler implements TableSampler {
+  public static final String TYPE = "externalAnnotatedSampler";
+
+  @Override
+  public void init(TableConfig tableConfig, TableSamplerConfig samplerConfig,
+      ZkHelixPropertyStore<ZNRecord> propertyStore) {
+  }
+
+  @Override
+  public Set<String> sampleSegments(Set<String> onlineSegments) {
+    return onlineSegments;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 6cd5f1da334..ede03f8e73b 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -41,7 +41,6 @@ import 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.WindowOv
  * Utils to parse query options.
  */
 public class QueryOptionsUtils {
-
   private QueryOptionsUtils() {
   }
 
@@ -111,6 +110,14 @@ public class QueryOptionsUtils {
     return checkedParseLongPositive(QueryOptionKey.TIMEOUT_MS, 
timeoutMsString);
   }
 
+  @Nullable
+  public static String getTableSampler(@Nullable Map<String, String> 
queryOptions) {
+    if (queryOptions == null || queryOptions.isEmpty()) {
+      return null;
+    }
+    return queryOptions.get(QueryOptionKey.TABLE_SAMPLER);
+  }
+
   @Nullable
   public static Long getExtraPassiveTimeoutMs(Map<String, String> 
queryOptions) {
     String extraPassiveTimeoutMsString = 
queryOptions.get(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
index 2575c2f6b8c..28f0100e0b8 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
@@ -45,6 +45,7 @@ import 
org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
@@ -176,10 +177,17 @@ public class TableConfigSerDeUtils {
       });
     }
 
+    List<TableSamplerConfig> tableSamplerConfigs = null;
+    String tableSamplerConfigsString = 
simpleFields.get(TableConfig.TABLE_SAMPLERS_KEY);
+    if (tableSamplerConfigsString != null) {
+      tableSamplerConfigs = 
JsonUtils.stringToObject(tableSamplerConfigsString, new TypeReference<>() {
+      });
+    }
+
     return new TableConfig(tableName, tableType, validationConfig, 
tenantConfig, indexingConfig, customConfig,
         quotaConfig, taskConfig, routingConfig, queryConfig, 
instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
         dedupConfig, dimensionTableConfig, ingestionConfig, tierConfigList, 
isDimTable, tunerConfigList,
-        instancePartitionsMap, segmentAssignmentConfigMap);
+        instancePartitionsMap, segmentAssignmentConfigMap, 
tableSamplerConfigs);
   }
 
   public static ZNRecord toZNRecord(TableConfig tableConfig)
@@ -254,6 +262,10 @@ public class TableConfigSerDeUtils {
       simpleFields.put(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY,
           JsonUtils.objectToString(segmentAssignmentConfigMap));
     }
+    List<TableSamplerConfig> tableSamplerConfigs = 
tableConfig.getTableSamplers();
+    if (tableSamplerConfigs != null) {
+      simpleFields.put(TableConfig.TABLE_SAMPLERS_KEY, 
JsonUtils.objectToString(tableSamplerConfigs));
+    }
 
     ZNRecord znRecord = new ZNRecord(tableConfig.getTableName());
     znRecord.setSimpleFields(simpleFields);
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
index 11d09427e59..6547db2880a 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
@@ -29,6 +29,7 @@ import org.testng.annotations.Test;
 
 import static 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.*;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.fail;
 
 
@@ -61,6 +62,20 @@ public class QueryOptionsUtilsTest {
     assertEquals(resolved.get(USE_MULTISTAGE_ENGINE), "false");
   }
 
+  @Test
+  public void shouldResolveSamplerOptionCaseInsensitively() {
+    Map<String, String> resolved = 
QueryOptionsUtils.resolveCaseInsensitiveOptions(Map.of("SAMPLER", "firstOnly"));
+
+    assertEquals(resolved.get(TABLE_SAMPLER), "firstOnly");
+  }
+
+  @Test
+  public void shouldExtractTableSamplerOption() {
+    assertEquals(QueryOptionsUtils.getTableSampler(Map.of(TABLE_SAMPLER, 
"firstOnly")), "firstOnly");
+    assertNull(QueryOptionsUtils.getTableSampler(Map.of()));
+    assertNull(QueryOptionsUtils.getTableSampler(null));
+  }
+
   @Test
   public void shouldReadIgnoreMissingSegmentsOption() {
     // Given:
diff --git 
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
 
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
index 41f86e4d2db..460b00d9757 100644
--- 
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
+++ 
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
@@ -185,7 +185,7 @@ class PinotDataWriter[InternalRow](
       new TableCustomConfig(null),
       null, null, null, null, null, null, null,
       null, null, null, null, false, null, null,
-      null)
+      null, null)
 
     val segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, 
pinotSchema)
     segmentGeneratorConfig.setTableName(tableName)
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
index e0e00cdd04e..5f1150c25cf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
@@ -83,6 +83,15 @@ public interface RoutingManager {
   @Nullable
   List<String> getSegments(BrokerRequest brokerRequest);
 
+  /**
+   * Returns the segments that are relevant for the given broker request and 
optional sampler name.
+   * Returns {@code null} if the table does not exist.
+   */
+  @Nullable
+  default List<String> getSegments(BrokerRequest brokerRequest, @Nullable 
String samplerName) {
+    return getSegments(brokerRequest);
+  }
+
   /**
    * Validate routing exist for a table
    *
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TableSamplerIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TableSamplerIntegrationTest.java
new file mode 100644
index 00000000000..819da87ee0f
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TableSamplerIntegrationTest.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.RoutingConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class TableSamplerIntegrationTest extends 
CustomDataQueryClusterIntegrationTest {
+  private static final int DAYS = 7;
+  private static final int SEGMENTS_PER_DAY = 10;
+  private static final int RECORDS_PER_SEGMENT = 1;
+  private static final int BASE_DAY = 20000;
+
+  private static final String DAYS_SINCE_EPOCH_COL = "DaysSinceEpoch";
+  private static final String PARTITION_KEY_COL = "PartitionKey";
+
+  @Override
+  public String getTableName() {
+    return "TableSamplerIntegrationTest";
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return (long) DAYS * SEGMENTS_PER_DAY * RECORDS_PER_SEGMENT;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(getTableName())
+        .addDateTime(DAYS_SINCE_EPOCH_COL, FieldSpec.DataType.INT, 
"1:DAYS:EPOCH", "1:DAYS")
+        .addSingleValueDimension(PARTITION_KEY_COL, FieldSpec.DataType.INT)
+        .build();
+  }
+
+  @Override
+  public TableConfig createOfflineTableConfig() {
+    Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new 
HashMap<>();
+    columnPartitionConfigMap.put(PARTITION_KEY_COL, new 
ColumnPartitionConfig("Modulo", 2));
+
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName())
+        .setTimeColumnName(DAYS_SINCE_EPOCH_COL)
+        .setTimeType("DAYS")
+        .setSegmentPartitionConfig(new 
SegmentPartitionConfig(columnPartitionConfigMap))
+        .setRoutingConfig(
+            new RoutingConfig(null, 
List.of(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE), null, null))
+        .build();
+    tableConfig.setTableSamplers(List.of(
+        new TableSamplerConfig("firstOnly", "firstN", Map.of("numSegments", 
"1")),
+        new TableSamplerConfig("firstTwo", "firstN", Map.of("numSegments", 
"2"))));
+    return tableConfig;
+  }
+
+  @Override
+  public List<File> createAvroFiles()
+      throws Exception {
+    var fieldAssembler = SchemaBuilder.record("myRecord").fields();
+    fieldAssembler.name(DAYS_SINCE_EPOCH_COL).type().intType().noDefault();
+    fieldAssembler.name(PARTITION_KEY_COL).type().intType().noDefault();
+    var avroSchema = fieldAssembler.endRecord();
+
+    List<File> files = new ArrayList<>();
+    for (int day = 0; day < DAYS; day++) {
+      int dayValue = BASE_DAY + day;
+      int partitionKey = day % 2;
+      for (int seg = 0; seg < SEGMENTS_PER_DAY; seg++) {
+        File avroFile = new File(_tempDir, "data_day_" + day + "_seg_" + seg + 
".avro");
+        try (DataFileWriter<GenericData.Record> fileWriter =
+            new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+          fileWriter.create(avroSchema, avroFile);
+          for (int docId = 0; docId < RECORDS_PER_SEGMENT; docId++) {
+            GenericData.Record record = new GenericData.Record(avroSchema);
+            record.put(DAYS_SINCE_EPOCH_COL, dayValue);
+            record.put(PARTITION_KEY_COL, partitionKey);
+            fileWriter.append(record);
+          }
+        }
+        files.add(avroFile);
+      }
+    }
+    return files;
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testFirstNSamplerForGroupByDay(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    JsonNode full = postQuery("SELECT DaysSinceEpoch, COUNT(*) AS cnt FROM " + 
getTableName()
+        + " GROUP BY DaysSinceEpoch ORDER BY DaysSinceEpoch");
+    JsonNode fullRows = full.path("resultTable").path("rows");
+    Assert.assertEquals(fullRows.size(), DAYS);
+    for (int i = 0; i < DAYS; i++) {
+      Assert.assertEquals(fullRows.get(i).get(0).asInt(), BASE_DAY + i);
+      Assert.assertEquals(fullRows.get(i).get(1).asLong(), (long) 
SEGMENTS_PER_DAY * RECORDS_PER_SEGMENT);
+    }
+    Assert.assertEquals(full.path("numSegmentsQueried").asInt(), DAYS * 
SEGMENTS_PER_DAY);
+
+    JsonNode sampled = postQuery(withSampler("SELECT DaysSinceEpoch, COUNT(*) 
AS cnt FROM " + getTableName()
+        + " GROUP BY DaysSinceEpoch ORDER BY DaysSinceEpoch", "firstOnly"));
+    JsonNode sampledRows = sampled.path("resultTable").path("rows");
+    Assert.assertEquals(sampledRows.size(), 1);
+    Assert.assertEquals(sampledRows.get(0).get(0).asInt(), BASE_DAY);
+    Assert.assertEquals(sampledRows.get(0).get(1).asLong(), (long) 
RECORDS_PER_SEGMENT);
+    Assert.assertEquals(sampled.path("numSegmentsQueried").asInt(), 1);
+
+    JsonNode sampledTwo = postQuery(withSampler("SELECT DaysSinceEpoch, 
COUNT(*) AS cnt FROM " + getTableName()
+        + " GROUP BY DaysSinceEpoch ORDER BY DaysSinceEpoch", "firstTwo"));
+    JsonNode sampledTwoRows = sampledTwo.path("resultTable").path("rows");
+    long sampledTwoCount = 0L;
+    for (JsonNode row : sampledTwoRows) {
+      sampledTwoCount += row.get(1).asLong();
+    }
+    Assert.assertEquals(sampledTwoCount, 2L * RECORDS_PER_SEGMENT);
+    Assert.assertEquals(sampledTwo.path("numSegmentsQueried").asInt(), 2);
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testSamplerRoutingStillAppliesPartitionPruning(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    JsonNode full = postQuery("SELECT COUNT(*) AS cnt FROM " + getTableName() 
+ " WHERE " + PARTITION_KEY_COL + " = 1");
+    
Assert.assertEquals(full.path("resultTable").path("rows").get(0).get(0).asLong(),
+        3L * SEGMENTS_PER_DAY * RECORDS_PER_SEGMENT);
+
+    JsonNode sampled = postQuery(
+        withSampler("SELECT COUNT(*) AS cnt FROM " + getTableName() + " WHERE 
" + PARTITION_KEY_COL + " = 1",
+            "firstTwo"));
+    
Assert.assertEquals(sampled.path("resultTable").path("rows").get(0).get(0).asLong(),
 0L);
+    if (!useMultiStageQueryEngine) {
+      Assert.assertEquals(sampled.path("numSegmentsQueried").asInt(), 0);
+    }
+  }
+
+  private static String withSampler(String query, String samplerName) {
+    return "SET sampler='" + samplerName + "'; " + query;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 7446bb2e88f..a65d1cfcfc6 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -34,11 +34,13 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.calcite.rel.rules.ImmutableTableOptions;
 import org.apache.pinot.calcite.rel.rules.TableOptions;
 import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.routing.LogicalTableRouteInfo;
 import org.apache.pinot.core.routing.LogicalTableRouteProvider;
 import org.apache.pinot.core.routing.RoutingManager;
@@ -445,7 +447,7 @@ public class WorkerManager {
     Map<String, String> tableOptions = metadata.getTableOptions();
     if (tableOptions != null) {
       if 
(Boolean.parseBoolean(tableOptions.get(PinotHintOptions.TableHintOptions.IS_REPLICATED)))
 {
-        setSegmentsForReplicatedLeafFragment(metadata);
+        setSegmentsForReplicatedLeafFragment(metadata, context);
         return;
       }
 
@@ -485,7 +487,8 @@ public class WorkerManager {
   private void 
assignWorkersToNonPartitionedLeafFragment(DispatchablePlanMetadata metadata,
       DispatchablePlanContext context) {
     String tableName = metadata.getScannedTables().get(0);
-    Map<String, RoutingTable> routingTableMap = getRoutingTable(tableName, 
context.getRequestId());
+    Map<String, RoutingTable> routingTableMap =
+        getRoutingTable(tableName, context.getRequestId(), 
context.getPlannerContext().getOptions());
     Preconditions.checkState(!routingTableMap.isEmpty(), "Unable to find 
routing entries for table: %s", tableName);
 
     // acquire time boundary info if it is a hybrid table.
@@ -553,40 +556,52 @@ public class WorkerManager {
    * @return keyed-map from table type(s) to routing table(s).
    */
   private Map<String, RoutingTable> getRoutingTable(String tableName, long 
requestId) {
+    return getRoutingTable(tableName, requestId, Map.of());
+  }
+
+  private Map<String, RoutingTable> getRoutingTable(String tableName, long 
requestId,
+      Map<String, String> queryOptions) {
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
     if (tableType == null) {
       // Raw table name
       Map<String, RoutingTable> routingTableMap = new HashMap<>(4);
       RoutingTable offlineRoutingTable =
-          
getRoutingTableHelper(TableNameBuilder.OFFLINE.tableNameWithType(tableName), 
requestId);
+          
getRoutingTableHelper(TableNameBuilder.OFFLINE.tableNameWithType(tableName), 
requestId, queryOptions);
       if (offlineRoutingTable != null) {
         routingTableMap.put(TableType.OFFLINE.name(), offlineRoutingTable);
       }
       RoutingTable realtimeRoutingTable =
-          
getRoutingTableHelper(TableNameBuilder.REALTIME.tableNameWithType(tableName), 
requestId);
+          
getRoutingTableHelper(TableNameBuilder.REALTIME.tableNameWithType(tableName), 
requestId, queryOptions);
       if (realtimeRoutingTable != null) {
         routingTableMap.put(TableType.REALTIME.name(), realtimeRoutingTable);
       }
       return routingTableMap;
     } else {
       // Table name with type
-      RoutingTable routingTable = getRoutingTableHelper(tableName, requestId);
+      RoutingTable routingTable = getRoutingTableHelper(tableName, requestId, 
queryOptions);
       return routingTable != null ? Map.of(tableType.name(), routingTable) : 
Map.of();
     }
   }
 
   @Nullable
-  private RoutingTable getRoutingTableHelper(String tableNameWithType, long 
requestId) {
-    return _routingManager.getRoutingTable(
-        CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM \"" + 
tableNameWithType + "\""), requestId);
+  private RoutingTable getRoutingTableHelper(String tableNameWithType, long 
requestId,
+      Map<String, String> queryOptions) {
+    BrokerRequest brokerRequest =
+        CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM \"" + 
tableNameWithType + "\"");
+    if (MapUtils.isNotEmpty(queryOptions) && brokerRequest.isSetPinotQuery()) {
+      // Ensure query options (e.g. sampler) are visible to routing selection.
+      brokerRequest.getPinotQuery().setQueryOptions(new 
HashMap<>(queryOptions));
+    }
+    return _routingManager.getRoutingTable(brokerRequest, requestId);
   }
 
   // --------------------------------------------------------------------------
   // Replicated non-partitioned leaf stage assignment
   // --------------------------------------------------------------------------
-  private void setSegmentsForReplicatedLeafFragment(DispatchablePlanMetadata 
metadata) {
+  private void setSegmentsForReplicatedLeafFragment(DispatchablePlanMetadata 
metadata,
+      DispatchablePlanContext context) {
     String tableName = metadata.getScannedTables().get(0);
-    Map<String, List<String>> segmentsMap = getSegments(tableName);
+    Map<String, List<String>> segmentsMap = getSegments(tableName, 
context.getPlannerContext().getOptions());
     Preconditions.checkState(!segmentsMap.isEmpty(), "Unable to find segments 
for table: %s", tableName);
 
     // Acquire time boundary info if it is a hybrid table.
@@ -609,31 +624,34 @@ public class WorkerManager {
    * Returns the segments for the given table, keyed by table type.
    * TODO: It doesn't handle unavailable segments.
    */
-  private Map<String, List<String>> getSegments(String tableName) {
+  private Map<String, List<String>> getSegments(String tableName, Map<String, 
String> queryOptions) {
+    String samplerName = MapUtils.isNotEmpty(queryOptions) ? 
QueryOptionsUtils.getTableSampler(queryOptions) : null;
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
     if (tableType == null) {
       // Raw table name
       Map<String, List<String>> segmentsMap = new HashMap<>(4);
-      List<String> offlineSegments = 
setSegmentsHelper(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
+      List<String> offlineSegments =
+          
setSegmentsHelper(TableNameBuilder.OFFLINE.tableNameWithType(tableName), 
samplerName);
       if (CollectionUtils.isNotEmpty(offlineSegments)) {
         segmentsMap.put(TableType.OFFLINE.name(), offlineSegments);
       }
-      List<String> realtimeSegments = 
setSegmentsHelper(TableNameBuilder.REALTIME.tableNameWithType(tableName));
+      List<String> realtimeSegments =
+          
setSegmentsHelper(TableNameBuilder.REALTIME.tableNameWithType(tableName), 
samplerName);
       if (CollectionUtils.isNotEmpty(realtimeSegments)) {
         segmentsMap.put(TableType.REALTIME.name(), realtimeSegments);
       }
       return segmentsMap;
     } else {
       // Table name with type
-      List<String> segments = setSegmentsHelper(tableName);
+      List<String> segments = setSegmentsHelper(tableName, samplerName);
       return CollectionUtils.isNotEmpty(segments) ? Map.of(tableType.name(), 
segments) : Map.of();
     }
   }
 
   @Nullable
-  private List<String> setSegmentsHelper(String tableNameWithType) {
+  private List<String> setSegmentsHelper(String tableNameWithType, @Nullable 
String samplerName) {
     return _routingManager.getSegments(
-        CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM \"" + 
tableNameWithType + "\""));
+        CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM \"" + 
tableNameWithType + "\""), samplerName);
   }
 
   private void 
assignWorkersToNonPartitionedLeafFragmentForLogicalTable(DispatchablePlanMetadata
 metadata,
@@ -647,15 +665,22 @@ public class WorkerManager {
     }
     BrokerRequest offlineBrokerRequest = null;
     BrokerRequest realtimeBrokerRequest = null;
+    Map<String, String> queryOptions = 
context.getPlannerContext().getOptions();
 
     if (logicalTableRouteInfo.hasOffline()) {
       offlineBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest(
           "SELECT * FROM \"" + logicalTableRouteInfo.getOfflineTableName() + 
"\"");
+      if (MapUtils.isNotEmpty(queryOptions) && 
offlineBrokerRequest.isSetPinotQuery()) {
+        offlineBrokerRequest.getPinotQuery().setQueryOptions(new 
HashMap<>(queryOptions));
+      }
     }
 
     if (logicalTableRouteInfo.hasRealtime()) {
       realtimeBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest(
           "SELECT * FROM \"" + logicalTableRouteInfo.getRealtimeTableName() + 
"\"");
+      if (MapUtils.isNotEmpty(queryOptions) && 
realtimeBrokerRequest.isSetPinotQuery()) {
+        realtimeBrokerRequest.getPinotQuery().setQueryOptions(new 
HashMap<>(queryOptions));
+      }
     }
 
     tableRouteProvider.calculateRoutes(logicalTableRouteInfo, _routingManager, 
offlineBrokerRequest,
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java
index 140da3d22c5..a739e183e70 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java
@@ -80,7 +80,7 @@ public class CLPForwardIndexCreatorTest implements 
PinotBuffersAfterMethodCheckR
     TableConfig tableConfig =
         new TableConfig("mytable", TableType.REALTIME.name(), new 
SegmentsValidationAndRetentionConfig(),
             new TenantConfig(null, null, null), new IndexingConfig(), new 
TableCustomConfig(null), null, null, null,
-            null, null, null, null, null, null, null, null, false, null, null, 
null);
+            null, null, null, null, null, null, null, null, false, null, null, 
null, null);
     List<FieldConfig> fieldConfigList = new ArrayList<>();
     fieldConfigList.add(new FieldConfig("column1", 
FieldConfig.EncodingType.RAW, Collections.EMPTY_LIST,
         FieldConfig.CompressionCodec.CLP, Collections.EMPTY_MAP));
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index ba2495355d3..35a1c6208c6 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -3224,7 +3224,7 @@ public class TableConfigUtilsTest {
         new TableConfig("table", TableType.OFFLINE.name(), new 
SegmentsValidationAndRetentionConfig(),
             new TenantConfig("DefaultTenant", "DefaultTenant", null), new 
IndexingConfig(), new TableCustomConfig(null),
             null, null, null, null, Map.of("OFFLINE", config), null, null, 
null, null, null, null, false, null, null,
-            null);
+            null, null);
 
     // Should not throw
     TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig);
@@ -3240,7 +3240,7 @@ public class TableConfigUtilsTest {
         new TableConfig("table", TableType.REALTIME.name(), new 
SegmentsValidationAndRetentionConfig(),
             new TenantConfig("DefaultTenant", "DefaultTenant", null), new 
IndexingConfig(), new TableCustomConfig(null),
             null, null, null, null, Map.of("CONSUMING", config), null, null, 
null, null, null, null, false, null, null,
-            null);
+            null, null);
 
     // Should not throw
     TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig);
@@ -3251,7 +3251,7 @@ public class TableConfigUtilsTest {
     TableConfig tableConfig =
         new TableConfig("table", TableType.OFFLINE.name(), new 
SegmentsValidationAndRetentionConfig(),
             new TenantConfig("DefaultTenant", "DefaultTenant", null), new 
IndexingConfig(), new TableCustomConfig(null),
-            null, null, null, null, null, null, null, null, null, null, null, 
false, null, null, null);
+            null, null, null, null, null, null, null, null, null, null, null, 
false, null, null, null, null);
 
     assertThrows(IllegalStateException.class,
         () -> 
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig));
@@ -3262,7 +3262,7 @@ public class TableConfigUtilsTest {
     TableConfig tableConfig =
         new TableConfig("table", TableType.REALTIME.name(), new 
SegmentsValidationAndRetentionConfig(),
             new TenantConfig("DefaultTenant", "DefaultTenant", null), new 
IndexingConfig(), new TableCustomConfig(null),
-            null, null, null, null, null, null, null, null, null, null, null, 
false, null, null, null);
+            null, null, null, null, null, null, null, null, null, null, null, 
false, null, null, null, null);
 
     assertThrows(IllegalStateException.class,
         () -> 
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig));
@@ -3278,7 +3278,7 @@ public class TableConfigUtilsTest {
         new TableConfig("table", TableType.OFFLINE.name(), new 
SegmentsValidationAndRetentionConfig(),
             new TenantConfig("DefaultTenant", "DefaultTenant", null), new 
IndexingConfig(), new TableCustomConfig(null),
             null, null, null, null, Map.of("OFFLINE", config), null, null, 
null, null, null, null, false, null, null,
-            null);
+            null, null);
 
     assertThrows(IllegalStateException.class,
         () -> 
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig));
@@ -3294,7 +3294,7 @@ public class TableConfigUtilsTest {
         new TableConfig("table", TableType.REALTIME.name(), new 
SegmentsValidationAndRetentionConfig(),
             new TenantConfig("DefaultTenant", "DefaultTenant", null), new 
IndexingConfig(), new TableCustomConfig(null),
             null, null, null, null, Map.of("CONSUMING", config), null, null, 
null, null, null, null, false, null, null,
-            null);
+            null, null);
 
     assertThrows(IllegalStateException.class,
         () -> 
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig));
@@ -3310,7 +3310,7 @@ public class TableConfigUtilsTest {
         new TableConfig("table", TableType.OFFLINE.name(), new 
SegmentsValidationAndRetentionConfig(),
             new TenantConfig("DefaultTenant", "DefaultTenant", null), new 
IndexingConfig(), new TableCustomConfig(null),
             null, null, null, null, Map.of("OFFLINE", config), null, null, 
null, null, null, null, false, null, null,
-            null);
+            null, null);
 
     assertThrows(IllegalStateException.class,
         () -> 
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig));
@@ -3326,7 +3326,7 @@ public class TableConfigUtilsTest {
         new TableConfig("table", TableType.REALTIME.name(), new 
SegmentsValidationAndRetentionConfig(),
             new TenantConfig("DefaultTenant", "DefaultTenant", null), new 
IndexingConfig(), new TableCustomConfig(null),
             null, null, null, null, Map.of("CONSUMING", config), null, null, 
null, null, null, null, false, null, null,
-            null);
+            null, null);
 
     assertThrows(IllegalStateException.class,
         () -> 
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig));
@@ -3337,7 +3337,7 @@ public class TableConfigUtilsTest {
     TableConfig tableConfig =
         new TableConfig("table", TableType.OFFLINE.name(), new 
SegmentsValidationAndRetentionConfig(),
             new TenantConfig("DefaultTenant", "DefaultTenant", null), new 
IndexingConfig(), new TableCustomConfig(null),
-            null, null, null, null, null, null, null, null, null, null, null, 
true, null, null, null);
+            null, null, null, null, null, null, null, null, null, null, null, 
true, null, null, null, null);
 
     // Should not throw
     TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig);
@@ -3350,7 +3350,8 @@ public class TableConfigUtilsTest {
     TableConfig tableConfig =
         new TableConfig("table", TableType.OFFLINE.name(), new 
SegmentsValidationAndRetentionConfig(),
             new TenantConfig("DefaultTenant", "DefaultTenant", null), new 
IndexingConfig(), new TableCustomConfig(null),
-            null, null, null, null, null, null, null, null, null, null, null, 
true, null, instancePartitionsMap, null);
+            null, null, null, null, null, null, null, null, null, null, null, 
true, null, instancePartitionsMap, null,
+            null);
 
     // Should not throw
     TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/tablesampler/TableSamplerProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/tablesampler/TableSamplerProvider.java
new file mode 100644
index 00000000000..21f499fc26b
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/tablesampler/TableSamplerProvider.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.annotations.tablesampler;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+/**
+ * Annotation for table sampler providers.
+ *
+ * NOTE:
+ *   - The annotated class must implement {@code 
org.apache.pinot.broker.routing.tablesampler.TableSampler}.
+ *   - The annotated class must be public and concrete with a no-arg 
constructor.
+ *   - The class must be discoverable via the packages configured with
+ *     {@code pinot.broker.table.sampler.annotation.packages}.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface TableSamplerProvider {
+
+  /**
+   * Alias name for the sampler (used in table config).
+   */
+  String name();
+
+  boolean enabled() default true;
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index 6f0e132fa7e..5df33cc7b92 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -23,14 +23,19 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyDescription;
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.config.BaseJsonConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
@@ -58,6 +63,7 @@ public class TableConfig extends BaseJsonConfig {
   public static final String TIER_CONFIGS_LIST_KEY = "tierConfigs";
   public static final String TUNER_CONFIG_LIST_KEY = "tunerConfigs";
   public static final String TIER_OVERWRITES_KEY = "tierOverwrites";
+  public static final String TABLE_SAMPLERS_KEY = "tableSamplers";
 
   // Double underscore is reserved for real-time segment name delimiter
   public static final String TABLE_NAME_FORBIDDEN_SUBSTRING = "__";
@@ -113,6 +119,9 @@ public class TableConfig extends BaseJsonConfig {
   @JsonPropertyDescription(value = "Configs for Table config tuner")
   private List<TunerConfig> _tunerConfigList;
 
+  @JsonPropertyDescription(value = "Configs for table samplers")
+  private List<TableSamplerConfig> _tableSamplers;
+
   @JsonCreator
   public TableConfig(@JsonProperty(value = TABLE_NAME_KEY, required = true) 
String tableName,
       @JsonProperty(value = TABLE_TYPE_KEY, required = true) String tableType,
@@ -138,7 +147,8 @@ public class TableConfig extends BaseJsonConfig {
       @JsonProperty(INSTANCE_PARTITIONS_MAP_CONFIG_KEY) @Nullable
       Map<InstancePartitionsType, String> instancePartitionsMap,
       @JsonProperty(SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY) @Nullable
-      Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap) {
+      Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap,
+      @JsonProperty(TABLE_SAMPLERS_KEY) @Nullable List<TableSamplerConfig> 
tableSamplers) {
     Preconditions.checkArgument(tableName != null, "'tableName' must be 
configured");
     
Preconditions.checkArgument(!tableName.contains(TABLE_NAME_FORBIDDEN_SUBSTRING),
         "'tableName' cannot contain double underscore ('__')");
@@ -169,6 +179,7 @@ public class TableConfig extends BaseJsonConfig {
     _tunerConfigList = tunerConfigList;
     _instancePartitionsMap = instancePartitionsMap;
     _segmentAssignmentConfigMap = segmentAssignmentConfigMap;
+    _tableSamplers = sanitizeAndValidateTableSamplers(tableSamplers);
   }
 
   public TableConfig(TableConfig tableConfig) {
@@ -193,6 +204,7 @@ public class TableConfig extends BaseJsonConfig {
     _tunerConfigList = tableConfig.getTunerConfigsList();
     _instancePartitionsMap = tableConfig.getInstancePartitionsMap();
     _segmentAssignmentConfigMap = tableConfig.getSegmentAssignmentConfigMap();
+    _tableSamplers = 
sanitizeAndValidateTableSamplers(tableConfig.getTableSamplers());
   }
 
   @JsonProperty(TABLE_NAME_KEY)
@@ -254,6 +266,46 @@ public class TableConfig extends BaseJsonConfig {
     _customConfig = customConfig;
   }
 
+  @JsonProperty(TABLE_SAMPLERS_KEY)
+  @Nullable
+  public List<TableSamplerConfig> getTableSamplers() {
+    return _tableSamplers;
+  }
+
+  public void setTableSamplers(@Nullable List<TableSamplerConfig> 
tableSamplers) {
+    _tableSamplers = sanitizeAndValidateTableSamplers(tableSamplers);
+  }
+
+  @Nullable
+  private static List<TableSamplerConfig> sanitizeAndValidateTableSamplers(
+      @Nullable List<TableSamplerConfig> tableSamplers) {
+    if (tableSamplers == null || tableSamplers.isEmpty()) {
+      return null;
+    }
+    List<TableSamplerConfig> sanitized = new ArrayList<>(tableSamplers.size());
+    Set<String> seenNormalizedNames = new HashSet<>();
+    for (TableSamplerConfig config : tableSamplers) {
+      if (config != null) {
+        String name = config.getName();
+        if (name != null) {
+          String trimmedName = name.trim();
+          if (trimmedName.isEmpty()) {
+            throw new IllegalArgumentException("Table sampler name cannot be 
blank");
+          }
+          String normalizedName = trimmedName.toLowerCase(Locale.ROOT);
+          if (!seenNormalizedNames.add(normalizedName)) {
+            throw new IllegalArgumentException("Duplicate table sampler name: 
" + trimmedName);
+          }
+        }
+        sanitized.add(config);
+      }
+    }
+    if (sanitized.isEmpty()) {
+      return null;
+    }
+    return sanitized;
+  }
+
   @JsonProperty(QUOTA_CONFIG_KEY)
   @Nullable
   public QuotaConfig getQuotaConfig() {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/sampler/TableSamplerConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/sampler/TableSamplerConfig.java
new file mode 100644
index 00000000000..bf804c173c3
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/sampler/TableSamplerConfig.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table.sampler;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+/**
+ * Configuration for a table sampler.
+ *
+ * Samplers are defined in {@link 
org.apache.pinot.spi.config.table.TableConfig} and can be selected at query time
+ * via a query option. The sampler type can be one of the built-in types, a 
fully qualified class name, or an alias
+ * discovered via broker-side annotation scanning. Additional annotation 
packages can be configured via
+ * {@code pinot.broker.table.sampler.annotation.packages}.
+ */
+public class TableSamplerConfig extends BaseJsonConfig {
+  private final String _name;
+  private final String _type;
+  private final Map<String, String> _properties;
+
+  @JsonCreator
+  public TableSamplerConfig(@JsonProperty(value = "name", required = true) 
String name,
+      @JsonProperty(value = "type", required = true) String type,
+      @JsonProperty("properties") @Nullable Map<String, String> properties) {
+    _name = name;
+    _type = type;
+    _properties = properties;
+  }
+
+  public String getName() {
+    return _name;
+  }
+
+  public String getType() {
+    return _type;
+  }
+
+  @Nullable
+  public Map<String, String> getProperties() {
+    return _properties;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 2c97a55df07..bc05f2c7720 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -336,6 +336,9 @@ public class CommonConstants {
     public static final String ACCESS_CONTROL_CONFIG_PREFIX = 
"pinot.broker.access.control";
     public static final String METRICS_CONFIG_PREFIX = "pinot.broker.metrics";
     public static final String EVENT_LISTENER_CONFIG_PREFIX = 
"pinot.broker.event.listener";
+    // Prefix for table sampler configs:
+    // - pinot.broker.table.sampler.annotation.packages=<comma-separated 
packages>
+    public static final String TABLE_SAMPLER_CONFIG_PREFIX = 
"pinot.broker.table.sampler";
     public static final String CONFIG_OF_METRICS_NAME_PREFIX = 
"pinot.broker.metrics.prefix";
     public static final String DEFAULT_METRICS_NAME_PREFIX = "pinot.broker.";
 
@@ -647,6 +650,7 @@ public class CommonConstants {
         public static final String USE_STAR_TREE = "useStarTree";
         public static final String SCAN_STAR_TREE_NODES = "scanStarTreeNodes";
         public static final String ROUTING_OPTIONS = "routingOptions";
+        public static final String TABLE_SAMPLER = "sampler";
         public static final String USE_SCAN_REORDER_OPTIMIZATION = 
"useScanReorderOpt";
         public static final String MAX_EXECUTION_THREADS = 
"maxExecutionThreads";
         public static final String COLLECT_GC_STATS = "collectGCStats";
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index 07d518fb4b7..52bc8023f7e 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -51,6 +51,7 @@ import 
org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
 
 
 public class TableConfigBuilder {
@@ -125,6 +126,7 @@ public class TableConfigBuilder {
   private TableTaskConfig _taskConfig;
   private RoutingConfig _routingConfig;
   private QueryConfig _queryConfig;
+  private List<TableSamplerConfig> _tableSamplers;
   private Map<String, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
   private Map<InstancePartitionsType, String> _instancePartitionsMap;
   private Map<String, SegmentAssignmentConfig> _segmentAssignmentConfigMap;
@@ -412,6 +414,11 @@ public class TableConfigBuilder {
     return this;
   }
 
+  public TableConfigBuilder setTableSamplers(List<TableSamplerConfig> 
tableSamplers) {
+    _tableSamplers = tableSamplers;
+    return this;
+  }
+
   public TableConfigBuilder setInstanceAssignmentConfigMap(
       Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap) {
     _instanceAssignmentConfigMap = instanceAssignmentConfigMap;
@@ -538,6 +545,6 @@ public class TableConfigBuilder {
     return new TableConfig(_tableName, _tableType.toString(), 
validationConfig, tenantConfig, indexingConfig,
         _customConfig, _quotaConfig, _taskConfig, _routingConfig, 
_queryConfig, _instanceAssignmentConfigMap,
         _fieldConfigList, _upsertConfig, _dedupConfig, _dimensionTableConfig, 
_ingestionConfig, _tierConfigList,
-        _isDimTable, _tunerConfigList, _instancePartitionsMap, 
_segmentAssignmentConfigMap);
+        _isDimTable, _tunerConfigList, _instancePartitionsMap, 
_segmentAssignmentConfigMap, _tableSamplers);
   }
 }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/TableConfigTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/TableConfigTest.java
index 4143b7ce2df..82119214e7a 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/TableConfigTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/TableConfigTest.java
@@ -27,8 +27,10 @@ import java.util.Map;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -104,4 +106,36 @@ public class TableConfigTest {
     assertEquals(config, copy);
     assertEquals(config.toJsonString(), copy.toJsonString());
   }
+
+  @Test
+  public void testDuplicateTableSamplerNamesRejected() {
+    TableConfig config = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+    List<TableSamplerConfig> duplicateSamplers = List.of(
+        new TableSamplerConfig("sampler1", "firstN", Map.of("numSegments", 
"10")),
+        new TableSamplerConfig("sampler1", "firstN", Map.of("numSegments", 
"1")));
+    IllegalArgumentException e = 
Assert.expectThrows(IllegalArgumentException.class,
+        () -> config.setTableSamplers(duplicateSamplers));
+    assertTrue(e.getMessage().contains("Duplicate table sampler name: 
sampler1"));
+  }
+
+  @Test
+  public void testDuplicateTableSamplerNamesRejectedAfterNormalization() {
+    TableConfig config = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+    List<TableSamplerConfig> duplicateSamplers = List.of(
+        new TableSamplerConfig("sampler1", "firstN", Map.of("numSegments", 
"10")),
+        new TableSamplerConfig(" Sampler1 ", "firstN", Map.of("numSegments", 
"1")));
+    IllegalArgumentException e = 
Assert.expectThrows(IllegalArgumentException.class,
+        () -> config.setTableSamplers(duplicateSamplers));
+    assertTrue(e.getMessage().contains("Duplicate table sampler name: 
Sampler1"));
+  }
+
+  @Test
+  public void testBlankTableSamplerNameRejected() {
+    TableConfig config = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+    List<TableSamplerConfig> samplers =
+        List.of(new TableSamplerConfig("  ", "firstN", Map.of("numSegments", 
"10")));
+    IllegalArgumentException e =
+        Assert.expectThrows(IllegalArgumentException.class, () -> 
config.setTableSamplers(samplers));
+    assertTrue(e.getMessage().contains("Table sampler name cannot be blank"));
+  }
 }
diff --git 
a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
 
b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
index 5558a95609a..80e59c59493 100644
--- 
a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
@@ -108,6 +108,15 @@
   "metadata": {
     "customConfigs": {}
   },
+  "tableSamplers": [
+    {
+      "name": "small",
+      "type": "firstN",
+      "properties": {
+        "numSegments": "1"
+      }
+    }
+  ],
   "ingestionConfig": {
     "transformConfigs": [
       {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to