This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b8eafc1bf54 Add pluggable table samplers with precomputed broker
routing entries and tableSampler query option (#17532)
b8eafc1bf54 is described below
commit b8eafc1bf54d8a80770a94be2e0fe59e7e04e13c
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Feb 11 23:07:06 2026 -0800
Add pluggable table samplers with precomputed broker routing entries and
tableSampler query option (#17532)
---
.../broker/broker/helix/BaseBrokerStarter.java | 2 +
.../routing/manager/BaseBrokerRoutingManager.java | 166 +++++++++++++++++--
.../manager/MultiClusterRoutingManager.java | 9 +-
.../tablesampler/FirstNSegmentsTableSampler.java | 76 +++++++++
.../broker/routing/tablesampler/TableSampler.java | 48 ++++++
.../routing/tablesampler/TableSamplerFactory.java | 164 +++++++++++++++++++
.../routing/manager/BrokerRoutingManagerTest.java | 58 +++++++
.../manager/MultiClusterRoutingManagerTest.java | 33 +++-
.../tablesampler/TableSamplerFactoryTest.java | 98 ++++++++++++
.../external/ExternalAnnotatedSampler.java | 42 +++++
.../common/utils/config/QueryOptionsUtils.java | 9 +-
.../common/utils/config/TableConfigSerDeUtils.java | 14 +-
.../common/utils/config/QueryOptionsUtilsTest.java | 15 ++
.../spark/v3/datasource/PinotDataWriter.scala | 2 +-
.../apache/pinot/core/routing/RoutingManager.java | 9 ++
.../tests/custom/TableSamplerIntegrationTest.java | 175 +++++++++++++++++++++
.../apache/pinot/query/routing/WorkerManager.java | 57 +++++--
.../index/creator/CLPForwardIndexCreatorTest.java | 2 +-
.../segment/local/utils/TableConfigUtilsTest.java | 21 +--
.../tablesampler/TableSamplerProvider.java | 46 ++++++
.../apache/pinot/spi/config/table/TableConfig.java | 54 ++++++-
.../config/table/sampler/TableSamplerConfig.java | 62 ++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 4 +
.../spi/utils/builder/TableConfigBuilder.java | 9 +-
.../pinot/spi/config/table/TableConfigTest.java | 34 ++++
.../airlineStats_offline_table_config.json | 9 ++
26 files changed, 1163 insertions(+), 55 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index baa96367ff5..f531faf2aee 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -57,6 +57,7 @@ import
org.apache.pinot.broker.requesthandler.MultiStageQueryThrottler;
import
org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.TimeSeriesRequestHandler;
import org.apache.pinot.broker.routing.manager.BrokerRoutingManager;
+import org.apache.pinot.broker.routing.tablesampler.TableSamplerFactory;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.audit.AuditServiceBinder;
import org.apache.pinot.common.config.DefaultClusterConfigChangeHandler;
@@ -181,6 +182,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
_clusterName = brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME);
ServiceStartableUtils.applyClusterConfig(_brokerConf, _zkServers,
_clusterName, ServiceRole.BROKER);
applyCustomConfigs(brokerConf);
+
TableSamplerFactory.init(_brokerConf.subset(CommonConstants.Broker.TABLE_SAMPLER_CONFIG_PREFIX));
BrokerContext.getInstance().setQueryOperatorFactoryProvider(createQueryOperatorFactoryProvider(_brokerConf));
PinotInsecureMode.setPinotInInsecureMode(
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
index 320fc2eebbb..2223f7c8325 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
@@ -19,12 +19,14 @@
package org.apache.pinot.broker.routing.manager;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -38,6 +40,8 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import javax.annotation.Nullable;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
@@ -63,12 +67,15 @@ import
org.apache.pinot.broker.routing.segmentpruner.SegmentPruner;
import org.apache.pinot.broker.routing.segmentpruner.SegmentPrunerFactory;
import org.apache.pinot.broker.routing.segmentselector.SegmentSelector;
import org.apache.pinot.broker.routing.segmentselector.SegmentSelectorFactory;
+import org.apache.pinot.broker.routing.tablesampler.TableSampler;
+import org.apache.pinot.broker.routing.tablesampler.TableSamplerFactory;
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryManager;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.SegmentsToQuery;
@@ -83,6 +90,7 @@ import
org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.TimeBoundaryConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -448,14 +456,15 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
// Update routing entry for all tables
for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+ String tableNameWithType = routingEntry.getTableNameWithType();
try {
- Object tableLock =
getRoutingTableBuildLock(routingEntry.getTableNameWithType());
+ Object tableLock = getRoutingTableBuildLock(tableNameWithType);
synchronized (tableLock) {
routingEntry.onInstancesChange(_routableServers, changedServers);
}
} catch (Exception e) {
LOGGER.error("Caught unexpected exception while updating routing entry
on instances change for table: {}",
- routingEntry.getTableNameWithType(), e);
+ tableNameWithType, e);
}
}
long updateRoutingEntriesEndTimeMs = System.currentTimeMillis();
@@ -524,14 +533,15 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
_routableServers = routableServers;
List<String> changedServers = Collections.singletonList(instanceId);
for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+ String tableNameWithType = routingEntry.getTableNameWithType();
try {
- Object tableLock =
getRoutingTableBuildLock(routingEntry.getTableNameWithType());
+ Object tableLock = getRoutingTableBuildLock(tableNameWithType);
synchronized (tableLock) {
routingEntry.onInstancesChange(_routableServers, changedServers);
}
} catch (Exception e) {
LOGGER.error("Caught unexpected exception while updating routing entry
when excluding server: {} for table: {}",
- instanceId, routingEntry.getTableNameWithType(), e);
+ instanceId, tableNameWithType, e);
}
}
LOGGER.info("Excluded server: {} from routing in {}ms (updated {} routing
entries)", instanceId,
@@ -568,14 +578,15 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
_routableServers = routableServers;
List<String> changedServers = Collections.singletonList(instanceId);
for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+ String tableNameWithType = routingEntry.getTableNameWithType();
try {
- Object tableLock =
getRoutingTableBuildLock(routingEntry.getTableNameWithType());
+ Object tableLock = getRoutingTableBuildLock(tableNameWithType);
synchronized (tableLock) {
routingEntry.onInstancesChange(_routableServers, changedServers);
}
} catch (Exception e) {
LOGGER.error("Caught unexpected exception while updating routing entry
when including server: {} for table: {}",
- instanceId, routingEntry.getTableNameWithType(), e);
+ instanceId, tableNameWithType, e);
}
}
LOGGER.info("Included server: {} to routing in {}ms (updated {} routing
entries)", instanceId,
@@ -810,10 +821,52 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
}
segmentZkMetadataFetcher.init(idealState, externalView,
preSelectedOnlineSegments);
+ // Build table sampler contexts keyed by normalized sampler name.
+ Map<String, SamplerInfo> samplerInfos = Map.of();
+ List<TableSamplerConfig> tableSamplerConfigs =
tableConfig.getTableSamplers();
+ if (CollectionUtils.isNotEmpty(tableSamplerConfigs)) {
+ Map<String, SamplerInfo> configuredSamplerInfos =
Maps.newHashMapWithExpectedSize(tableSamplerConfigs.size());
+ for (TableSamplerConfig samplerConfig : tableSamplerConfigs) {
+ String samplerName = samplerConfig.getName();
+ String samplerType = samplerConfig.getType();
+ if (StringUtils.isBlank(samplerName) ||
StringUtils.isBlank(samplerType)) {
+ LOGGER.warn("Skipping invalid table sampler config for table: {},
samplerName: {}, samplerType: {}",
+ tableNameWithType, samplerName, samplerType);
+ continue;
+ }
+ String normalizedSamplerName = normalizeSamplerName(samplerName);
+ if (configuredSamplerInfos.containsKey(normalizedSamplerName)) {
+ LOGGER.warn("Skipping duplicate normalized table sampler name:
'{}' for table: {}", samplerName,
+ tableNameWithType);
+ continue;
+ }
+ try {
+ TableSampler sampler = TableSamplerFactory.create(samplerType);
+ sampler.init(tableConfig, samplerConfig, _propertyStore);
+ Set<String> samplerPreSelectedOnlineSegments =
sampler.sampleSegments(preSelectedOnlineSegments);
+ SegmentSelector samplerSegmentSelector =
SegmentSelectorFactory.getSegmentSelector(tableConfig);
+ samplerSegmentSelector.init(idealState, externalView,
samplerPreSelectedOnlineSegments);
+ InstanceSelector samplerInstanceSelector =
+ InstanceSelectorFactory.getInstanceSelector(tableConfig,
_propertyStore, _brokerMetrics,
+ adaptiveServerSelector, _pinotConfig, _routableServers,
_enabledServerInstanceMap,
+ idealState, externalView,
samplerPreSelectedOnlineSegments);
+ configuredSamplerInfos.put(normalizedSamplerName,
+ new SamplerInfo(sampler, samplerSegmentSelector,
samplerInstanceSelector));
+ } catch (Exception e) {
+ LOGGER.error("Caught unexpected exception while building routing
for table sampler: {} for table: {}",
+ samplerName, tableNameWithType, e);
+ }
+ }
+ if (!configuredSamplerInfos.isEmpty()) {
+ samplerInfos = configuredSamplerInfos;
+ }
+ }
+
RoutingEntry routingEntry =
new RoutingEntry(tableNameWithType, idealStatePath,
externalViewPath, segmentPreSelector, segmentSelector,
segmentPruners, instanceSelector, idealStateVersion,
externalViewVersion, segmentZkMetadataFetcher,
- timeBoundaryManager, partitionMetadataManager, queryTimeoutMs,
!idealState.isEnabled());
+ timeBoundaryManager, partitionMetadataManager, queryTimeoutMs,
samplerInfos,
+ !idealState.isEnabled());
if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
LOGGER.info("Built routing for table: {}", tableNameWithType);
} else {
@@ -1048,7 +1101,9 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
if (routingEntry == null) {
return null;
}
- InstanceSelector.SelectionResult selectionResult =
routingEntry.calculateRouting(brokerRequest, requestId);
+ String samplerName = extractSamplerName(brokerRequest);
+ InstanceSelector.SelectionResult selectionResult =
+ routingEntry.calculateRouting(brokerRequest, requestId, samplerName);
return new RoutingTable(getServerInstanceToSegmentsMap(tableNameWithType,
selectionResult),
selectionResult.getUnavailableSegments(),
selectionResult.getNumPrunedSegments());
}
@@ -1086,9 +1141,30 @@ public abstract class BaseBrokerRoutingManager
implements RoutingManager, Cluste
@Nullable
@Override
public List<String> getSegments(BrokerRequest brokerRequest) {
+ return getSegments(brokerRequest, extractSamplerName(brokerRequest));
+ }
+
+ @Nullable
+ @Override
+ public List<String> getSegments(BrokerRequest brokerRequest, @Nullable
String samplerName) {
String tableNameWithType = brokerRequest.getQuerySource().getTableName();
RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
- return routingEntry != null ? routingEntry.getSegments(brokerRequest) :
null;
+ if (routingEntry == null) {
+ return null;
+ }
+ return routingEntry.getSegments(brokerRequest, samplerName);
+ }
+
+ private static String normalizeSamplerName(String samplerName) {
+ return samplerName.trim().toLowerCase(Locale.ROOT);
+ }
+
+ @Nullable
+ static String extractSamplerName(BrokerRequest brokerRequest) {
+ if (!brokerRequest.isSetPinotQuery()) {
+ return null;
+ }
+ return
QueryOptionsUtils.getTableSampler(brokerRequest.getPinotQuery().getQueryOptions());
}
@Override
@@ -1172,6 +1248,18 @@ public abstract class BaseBrokerRoutingManager
implements RoutingManager, Cluste
}
}
+ private static final class SamplerInfo {
+ final TableSampler _tableSampler;
+ final SegmentSelector _segmentSelector;
+ final InstanceSelector _instanceSelector;
+
+ SamplerInfo(TableSampler tableSampler, SegmentSelector segmentSelector,
InstanceSelector instanceSelector) {
+ _tableSampler = tableSampler;
+ _segmentSelector = segmentSelector;
+ _instanceSelector = instanceSelector;
+ }
+ }
+
private static class RoutingEntry {
final String _tableNameWithType;
final String _idealStatePath;
@@ -1182,6 +1270,7 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
final SegmentPartitionMetadataManager _partitionMetadataManager;
final InstanceSelector _instanceSelector;
final Long _queryTimeoutMs;
+ final Map<String, SamplerInfo> _samplerInfos;
final SegmentZkMetadataFetcher _segmentZkMetadataFetcher;
// Cache IdealState and ExternalView version for the last update
@@ -1197,7 +1286,7 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
InstanceSelector instanceSelector, int lastUpdateIdealStateVersion,
int lastUpdateExternalViewVersion,
SegmentZkMetadataFetcher segmentZkMetadataFetcher, @Nullable
TimeBoundaryManager timeBoundaryManager,
@Nullable SegmentPartitionMetadataManager partitionMetadataManager,
@Nullable Long queryTimeoutMs,
- boolean disabled) {
+ Map<String, SamplerInfo> samplerInfos, boolean disabled) {
_tableNameWithType = tableNameWithType;
_idealStatePath = idealStatePath;
_externalViewPath = externalViewPath;
@@ -1210,6 +1299,7 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
_timeBoundaryManager = timeBoundaryManager;
_partitionMetadataManager = partitionMetadataManager;
_queryTimeoutMs = queryTimeoutMs;
+ _samplerInfos = samplerInfos;
_segmentZkMetadataFetcher = segmentZkMetadataFetcher;
_disabled = disabled;
}
@@ -1248,6 +1338,40 @@ public abstract class BaseBrokerRoutingManager
implements RoutingManager, Cluste
return _disabled;
}
+ private void updateSamplerInfos(IdealState idealState, ExternalView
externalView,
+ Set<String> preSelectedOnlineSegments) {
+ if (_samplerInfos.isEmpty()) {
+ return;
+ }
+ for (Map.Entry<String, SamplerInfo> entry : _samplerInfos.entrySet()) {
+ String samplerName = entry.getKey();
+ SamplerInfo samplerInfo = entry.getValue();
+ try {
+ Set<String> samplerPreSelectedOnlineSegments =
+
samplerInfo._tableSampler.sampleSegments(preSelectedOnlineSegments);
+ samplerInfo._segmentSelector.onAssignmentChange(idealState,
externalView, samplerPreSelectedOnlineSegments);
+ samplerInfo._instanceSelector.onAssignmentChange(idealState,
externalView, samplerPreSelectedOnlineSegments);
+ } catch (Exception e) {
+ LOGGER.error("Caught unexpected exception while sampling segments
for sampler: {} for table: {}", samplerName,
+ _tableNameWithType, e);
+ }
+ }
+ }
+
+ @Nullable
+ private SamplerInfo getSamplerInfo(@Nullable String samplerName) {
+ if (StringUtils.isBlank(samplerName)) {
+ return null;
+ }
+ SamplerInfo samplerInfo =
_samplerInfos.get(normalizeSamplerName(samplerName));
+ if (samplerInfo != null) {
+ return samplerInfo;
+ }
+ LOGGER.warn("Requested sampler '{}' not found for table '{}'; falling
back to default routing entry",
+ samplerName, _tableNameWithType);
+ return null;
+ }
+
// NOTE: The change gets applied in sequence, and before change applied to
all components, there could be some
// inconsistency between components, which is fine because the
inconsistency only exists for the newly changed
// segments and only lasts for a very short time.
@@ -1260,6 +1384,7 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
if (_timeBoundaryManager != null) {
_timeBoundaryManager.onAssignmentChange(idealState, externalView,
preSelectedOnlineSegments);
}
+ updateSamplerInfos(idealState, externalView, preSelectedOnlineSegments);
_lastUpdateIdealStateVersion = idealState.getStat().getVersion();
_lastUpdateExternalViewVersion = externalView.getStat().getVersion();
_disabled = !idealState.isEnabled();
@@ -1267,6 +1392,11 @@ public abstract class BaseBrokerRoutingManager
implements RoutingManager, Cluste
void onInstancesChange(Set<String> enabledInstances, List<String>
changedInstances) {
_instanceSelector.onInstancesChange(enabledInstances, changedInstances);
+ if (!_samplerInfos.isEmpty()) {
+ for (SamplerInfo samplerInfo : _samplerInfos.values()) {
+ samplerInfo._instanceSelector.onInstancesChange(enabledInstances,
changedInstances);
+ }
+ }
}
void refreshSegment(String segment) {
@@ -1276,8 +1406,12 @@ public abstract class BaseBrokerRoutingManager
implements RoutingManager, Cluste
}
}
- InstanceSelector.SelectionResult calculateRouting(BrokerRequest
brokerRequest, long requestId) {
- Set<String> selectedSegments = _segmentSelector.select(brokerRequest);
+ InstanceSelector.SelectionResult calculateRouting(BrokerRequest
brokerRequest, long requestId,
+ @Nullable String samplerName) {
+ SamplerInfo samplerInfo = getSamplerInfo(samplerName);
+ SegmentSelector segmentSelector = samplerInfo != null ?
samplerInfo._segmentSelector : _segmentSelector;
+ InstanceSelector instanceSelector = samplerInfo != null ?
samplerInfo._instanceSelector : _instanceSelector;
+ Set<String> selectedSegments = segmentSelector.select(brokerRequest);
int numTotalSelectedSegments = selectedSegments.size();
if (!selectedSegments.isEmpty()) {
for (SegmentPruner segmentPruner : _segmentPruners) {
@@ -1287,7 +1421,7 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
int numPrunedSegments = numTotalSelectedSegments -
selectedSegments.size();
if (!selectedSegments.isEmpty()) {
InstanceSelector.SelectionResult selectionResult =
- _instanceSelector.select(brokerRequest, new
ArrayList<>(selectedSegments), requestId);
+ instanceSelector.select(brokerRequest, new
ArrayList<>(selectedSegments), requestId);
selectionResult.setNumPrunedSegments(numPrunedSegments);
return selectionResult;
} else {
@@ -1296,8 +1430,10 @@ public abstract class BaseBrokerRoutingManager
implements RoutingManager, Cluste
}
}
- List<String> getSegments(BrokerRequest brokerRequest) {
- Set<String> selectedSegments = _segmentSelector.select(brokerRequest);
+ List<String> getSegments(BrokerRequest brokerRequest, @Nullable String
samplerName) {
+ SamplerInfo samplerInfo = getSamplerInfo(samplerName);
+ SegmentSelector segmentSelector = samplerInfo != null ?
samplerInfo._segmentSelector : _segmentSelector;
+ Set<String> selectedSegments = segmentSelector.select(brokerRequest);
if (!selectedSegments.isEmpty()) {
for (SegmentPruner segmentPruner : _segmentPruners) {
selectedSegments = segmentPruner.prune(brokerRequest,
selectedSegments);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManager.java
index 0f247786c76..265e25235b4 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManager.java
@@ -187,14 +187,19 @@ public class MultiClusterRoutingManager implements
RoutingManager {
@Override
public List<String> getSegments(BrokerRequest brokerRequest) {
+ return getSegments(brokerRequest,
BaseBrokerRoutingManager.extractSamplerName(brokerRequest));
+ }
+
+ @Override
+ public List<String> getSegments(BrokerRequest brokerRequest, @Nullable
String samplerName) {
List<String> combined = new ArrayList<>();
- List<String> localSegments =
_localClusterRoutingManager.getSegments(brokerRequest);
+ List<String> localSegments =
_localClusterRoutingManager.getSegments(brokerRequest, samplerName);
if (localSegments != null) {
combined.addAll(localSegments);
}
for (BaseBrokerRoutingManager remoteCluster :
_remoteClusterRoutingManagers) {
try {
- List<String> remoteSegments = remoteCluster.getSegments(brokerRequest);
+ List<String> remoteSegments = remoteCluster.getSegments(brokerRequest,
samplerName);
if (remoteSegments != null) {
combined.addAll(remoteSegments);
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/FirstNSegmentsTableSampler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/FirstNSegmentsTableSampler.java
new file mode 100644
index 00000000000..5df93ce2f53
--- /dev/null
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/FirstNSegmentsTableSampler.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.tablesampler;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.annotations.tablesampler.TableSamplerProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+
+
+/**
+ * Selects the first N segments after sorting segment names lexicographically.
+ *
+ * <p>Config:
+ * <ul>
+ * <li>{@code properties.numSegments}: positive integer</li>
+ * </ul>
+ */
+@TableSamplerProvider(name = FirstNSegmentsTableSampler.TYPE)
+public class FirstNSegmentsTableSampler implements TableSampler {
+ public static final String TYPE = "firstN";
+ public static final String PROP_NUM_SEGMENTS = "numSegments";
+
+ private int _numSegments;
+
+ @Override
+ public void init(TableConfig tableConfig, TableSamplerConfig samplerConfig,
+ ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ Map<String, String> props = samplerConfig.getProperties();
+ if (MapUtils.isEmpty(props) || !props.containsKey(PROP_NUM_SEGMENTS)) {
+ throw new IllegalArgumentException(
+ "Missing required property '" + PROP_NUM_SEGMENTS + "' for table
sampler type '" + TYPE + "'");
+ }
+ _numSegments = Integer.parseInt(props.get(PROP_NUM_SEGMENTS));
+ if (_numSegments <= 0) {
+ throw new IllegalArgumentException("'" + PROP_NUM_SEGMENTS + "' must be
positive");
+ }
+ }
+
+ @Override
+ public Set<String> sampleSegments(Set<String> onlineSegments) {
+ if (onlineSegments.isEmpty()) {
+ return Collections.emptySet();
+ }
+ if (onlineSegments.size() <= _numSegments) {
+ return onlineSegments;
+ }
+ List<String> sorted = new ArrayList<>(onlineSegments);
+ Collections.sort(sorted);
+ return new HashSet<>(sorted.subList(0, _numSegments));
+ }
+}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSampler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSampler.java
new file mode 100644
index 00000000000..1bc88ffc1ba
--- /dev/null
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSampler.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.tablesampler;
+
+import java.util.Set;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+
+
+/**
+ * A {@code TableSampler} deterministically selects a subset of segments from
the set of online segments for a table.
+ *
+ * <p>Selection is performed during routing table build/update so there is no
additional per-query overhead beyond
+ * selecting the pre-built routing entry.
+ */
+public interface TableSampler {
+
+ /**
+ * Initializes the sampler for a specific table and sampler config.
+ */
+ void init(TableConfig tableConfig, TableSamplerConfig samplerConfig,
ZkHelixPropertyStore<ZNRecord> propertyStore);
+
+ /**
+ * Selects a subset of segments from the provided online segments.
+ *
+ * <p>Implementations must not mutate the input set because the same
pre-selected segment set can be reused by
+ * multiple samplers. Implementations must return a non-null set.
+ */
+ Set<String> sampleSegments(Set<String> onlineSegments);
+}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactory.java
new file mode 100644
index 00000000000..1be8b236952
--- /dev/null
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactory.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.tablesampler;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.annotations.tablesampler.TableSamplerProvider;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TableSamplerFactory {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TableSamplerFactory.class);
+ private static final String ANNOTATION_PACKAGES_KEY = "annotation.packages";
+ // Keep this list in sync with built-in TableSampler locations; additional
packages can be configured via broker
+ // config, but defaults should always include built-ins.
+ private static final List<String> DEFAULT_ANNOTATION_PACKAGES =
+ List.of("org.apache.pinot.broker.routing.tablesampler");
+ private static final Map<String, String> REGISTERED_TABLE_SAMPLER_CLASS_MAP
= new ConcurrentHashMap<>();
+
+ private TableSamplerFactory() {
+ }
+
+ public static void init(PinotConfiguration tableSamplerConfig) {
+ if (tableSamplerConfig == null) {
+ return;
+ }
+ registerAnnotatedTableSamplers(tableSamplerConfig);
+ }
+
+ public static void register(String alias, String className) {
+ if (StringUtils.isBlank(alias)) {
+ LOGGER.warn("Skipping table sampler registration because alias is
blank");
+ return;
+ }
+ if (StringUtils.isBlank(className)) {
+ LOGGER.warn("Skipping table sampler registration for alias '{}' because
class name is blank", alias);
+ return;
+ }
+ String normalizedAlias = normalizeType(alias);
+ String trimmedClassName = className.trim();
+ String previousClassName =
REGISTERED_TABLE_SAMPLER_CLASS_MAP.put(normalizedAlias, trimmedClassName);
+ if (previousClassName == null) {
+ LOGGER.info("Registered table sampler alias '{}' -> '{}'", alias,
trimmedClassName);
+ } else if (!previousClassName.equals(trimmedClassName)) {
+ LOGGER.warn("Overriding table sampler alias '{}' from '{}' to '{}'",
alias, previousClassName, trimmedClassName);
+ }
+ }
+
+ public static TableSampler create(String type) {
+ String resolvedClassName = resolveClassName(type);
+ String classNameToLoad = resolvedClassName != null ? resolvedClassName :
type;
+ try {
+ return PluginManager.get().createInstance(classNameToLoad);
+ } catch (Exception e) {
+ String errorMessage = resolvedClassName != null
+ ? String.format("Failed to create TableSampler for alias '%s' mapped
to class '%s'", type,
+ resolvedClassName)
+ : "Failed to create TableSampler for type: " + type;
+ throw new RuntimeException(errorMessage, e);
+ }
+ }
+
+ @VisibleForTesting
+ static void clearRegistry() {
+ REGISTERED_TABLE_SAMPLER_CLASS_MAP.clear();
+ }
+
+ private static void registerAnnotatedTableSamplers(PinotConfiguration
tableSamplerConfig) {
+ List<String> configuredPackages =
getConfiguredAnnotationPackages(tableSamplerConfig);
+ LinkedHashSet<String> combinedPackages = new
LinkedHashSet<>(DEFAULT_ANNOTATION_PACKAGES);
+ for (String packageName : configuredPackages) {
+ if (StringUtils.isNotBlank(packageName)) {
+ combinedPackages.add(packageName.trim());
+ }
+ }
+ List<String> sanitizedPackages = new ArrayList<>(combinedPackages);
+ if (sanitizedPackages.isEmpty()) {
+ LOGGER.info("No table sampler annotation packages configured");
+ return;
+ }
+ Set<Class<?>> samplerClasses =
+ PinotReflectionUtils.getClassesThroughReflection(sanitizedPackages,
".*", TableSamplerProvider.class);
+ for (Class<?> samplerClass : samplerClasses) {
+ TableSamplerProvider annotation =
samplerClass.getAnnotation(TableSamplerProvider.class);
+ if (annotation == null || !annotation.enabled()) {
+ continue;
+ }
+ if (!TableSampler.class.isAssignableFrom(samplerClass)) {
+ LOGGER.warn("Skipping table sampler class '{}' because it does not
implement TableSampler",
+ samplerClass.getName());
+ continue;
+ }
+ if (!Modifier.isPublic(samplerClass.getModifiers()) ||
Modifier.isAbstract(samplerClass.getModifiers())) {
+ LOGGER.warn("Skipping table sampler class '{}' because it is not a
public concrete class",
+ samplerClass.getName());
+ continue;
+ }
+ String alias = annotation.name();
+ if (StringUtils.isBlank(alias)) {
+ LOGGER.warn("Skipping table sampler class '{}' because annotation name
is blank", samplerClass.getName());
+ continue;
+ }
+ register(alias, samplerClass.getName());
+ }
+ }
+
+ private static List<String>
getConfiguredAnnotationPackages(PinotConfiguration tableSamplerConfig) {
+ String configuredPackages =
tableSamplerConfig.getProperty(ANNOTATION_PACKAGES_KEY, "");
+ if (StringUtils.isBlank(configuredPackages)) {
+ return List.of();
+ }
+ List<String> packageList = new ArrayList<>();
+ for (String packageName : configuredPackages.split(",")) {
+ if (StringUtils.isNotBlank(packageName)) {
+ packageList.add(packageName.trim());
+ }
+ }
+ return packageList;
+ }
+
+ private static String resolveClassName(String type) {
+ if (StringUtils.isBlank(type)) {
+ return null;
+ }
+ return REGISTERED_TABLE_SAMPLER_CLASS_MAP.get(normalizeType(type));
+ }
+
+ /**
+ * Normalizes a table sampler alias for registry lookup.
+ *
+ * <p>Both registration and lookup go through this method, so aliases are
matched case-insensitively after trimming.
+ */
+ private static String normalizeType(String type) {
+ return type.trim().toLowerCase(Locale.ROOT);
+ }
+}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/BrokerRoutingManagerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/BrokerRoutingManagerTest.java
index e03153e2f08..59786554584 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/BrokerRoutingManagerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/BrokerRoutingManagerTest.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.broker.routing.manager;
+import java.lang.reflect.Constructor;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.function.Consumer;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
@@ -30,7 +32,17 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.broker.routing.instanceselector.InstanceSelector;
+import
org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetcher;
+import
org.apache.pinot.broker.routing.segmentpartition.SegmentPartitionMetadataManager;
+import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
+import org.apache.pinot.broker.routing.segmentpruner.SegmentPruner;
+import org.apache.pinot.broker.routing.segmentselector.SegmentSelector;
+import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryManager;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.core.routing.TablePartitionInfo;
+import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo;
+import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -49,6 +61,7 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
@@ -57,6 +70,7 @@ public class BrokerRoutingManagerTest {
private static final String SERVER_HOST = "localhost";
private static final int SERVER_PORT = 8000;
private static final String INSTANCE_CONFIGS_PATH = "/CONFIGS/PARTICIPANT";
+ private static final String TEST_TABLE = "testTable_OFFLINE";
private AutoCloseable _mocks;
@@ -204,6 +218,50 @@ public class BrokerRoutingManagerTest {
verify(_serverReenableCallback, never()).accept(any());
}
+ @Test
+ public void testSamplerContextSharesTimeBoundaryAndPartitionMetadata()
+ throws Exception {
+ TimeBoundaryManager timeBoundaryManager = mock(TimeBoundaryManager.class);
+ SegmentPartitionMetadataManager partitionMetadataManager =
mock(SegmentPartitionMetadataManager.class);
+ TimeBoundaryInfo expectedTimeBoundaryInfo = new
TimeBoundaryInfo("DaysSinceEpoch", "20000");
+ TablePartitionInfo expectedPartitionInfo =
+ new TablePartitionInfo(TEST_TABLE, "partitionCol", "Modulo", 2,
+ List.of(Collections.emptyList(), Collections.emptyList()),
Collections.emptyList());
+ TablePartitionReplicatedServersInfo expectedReplicatedServersInfo =
mock(TablePartitionReplicatedServersInfo.class);
+
when(timeBoundaryManager.getTimeBoundaryInfo()).thenReturn(expectedTimeBoundaryInfo);
+
when(partitionMetadataManager.getTablePartitionInfo()).thenReturn(expectedPartitionInfo);
+
when(partitionMetadataManager.getTablePartitionReplicatedServersInfo()).thenReturn(expectedReplicatedServersInfo);
+
+ Object routingEntry = createRoutingEntry(TEST_TABLE, timeBoundaryManager,
partitionMetadataManager, Map.of());
+ putRoutingEntry(TEST_TABLE, routingEntry);
+
+ assertSame(_routingManager.getTimeBoundaryInfo(TEST_TABLE),
expectedTimeBoundaryInfo);
+ assertSame(_routingManager.getTablePartitionInfo(TEST_TABLE),
expectedPartitionInfo);
+
assertSame(_routingManager.getTablePartitionReplicatedServersInfo(TEST_TABLE),
expectedReplicatedServersInfo);
+ }
+
+ private static Object createRoutingEntry(String tableNameWithType,
TimeBoundaryManager timeBoundaryManager,
+ SegmentPartitionMetadataManager partitionMetadataManager, Map<String, ?>
samplerInfos)
+ throws Exception {
+ Class<?> routingEntryClass =
Class.forName(BaseBrokerRoutingManager.class.getName() + "$RoutingEntry");
+ Constructor<?> constructor =
routingEntryClass.getDeclaredConstructor(String.class, String.class,
String.class,
+ SegmentPreSelector.class, SegmentSelector.class, List.class,
InstanceSelector.class, int.class, int.class,
+ SegmentZkMetadataFetcher.class, TimeBoundaryManager.class,
SegmentPartitionMetadataManager.class, Long.class,
+ Map.class, boolean.class);
+ constructor.setAccessible(true);
+ return constructor.newInstance(tableNameWithType, "/IDEALSTATES/" +
tableNameWithType,
+ "/EXTERNALVIEW/" + tableNameWithType, mock(SegmentPreSelector.class),
mock(SegmentSelector.class),
+ Collections.<SegmentPruner>emptyList(), mock(InstanceSelector.class),
1, 1,
+ mock(SegmentZkMetadataFetcher.class), timeBoundaryManager,
partitionMetadataManager, null, samplerInfos,
+ false);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private void putRoutingEntry(String tableNameWithType, Object routingEntry) {
+ Map routingEntries = _routingManager._routingEntryMap;
+ routingEntries.put(tableNameWithType, routingEntry);
+ }
+
/**
* Creates a ZNRecord representing an enabled server instance.
*/
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManagerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManagerTest.java
index f18cfbaf037..b734e2d635a 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManagerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManagerTest.java
@@ -198,9 +198,9 @@ public class MultiClusterRoutingManagerTest {
BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE);
List<String> remoteSegments = Arrays.asList("seg1");
-
when(_localClusterRoutingManager.getSegments(brokerRequest)).thenReturn(null);
-
when(_remoteClusterRoutingManager1.getSegments(brokerRequest)).thenReturn(remoteSegments);
-
when(_remoteClusterRoutingManager2.getSegments(brokerRequest)).thenReturn(null);
+ when(_localClusterRoutingManager.getSegments(brokerRequest,
null)).thenReturn(null);
+ when(_remoteClusterRoutingManager1.getSegments(brokerRequest,
null)).thenReturn(remoteSegments);
+ when(_remoteClusterRoutingManager2.getSegments(brokerRequest,
null)).thenReturn(null);
List<String> result =
_multiClusterRoutingManager.getSegments(brokerRequest);
@@ -212,15 +212,36 @@ public class MultiClusterRoutingManagerTest {
@Test
public void testGetSegmentsReturnsNullWhenAllNull() {
BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE);
-
when(_localClusterRoutingManager.getSegments(brokerRequest)).thenReturn(null);
-
when(_remoteClusterRoutingManager1.getSegments(brokerRequest)).thenReturn(null);
-
when(_remoteClusterRoutingManager2.getSegments(brokerRequest)).thenReturn(null);
+ when(_localClusterRoutingManager.getSegments(brokerRequest,
null)).thenReturn(null);
+ when(_remoteClusterRoutingManager1.getSegments(brokerRequest,
null)).thenReturn(null);
+ when(_remoteClusterRoutingManager2.getSegments(brokerRequest,
null)).thenReturn(null);
List<String> result =
_multiClusterRoutingManager.getSegments(brokerRequest);
assertNull(result);
}
+ @Test
+ public void testGetSegmentsWithSamplerName() {
+ BrokerRequest brokerRequest = createMockBrokerRequest(TEST_TABLE);
+ List<String> localSegments = Arrays.asList("localSeg");
+ List<String> remoteSegments = Arrays.asList("remoteSeg");
+
+ when(_localClusterRoutingManager.getSegments(brokerRequest,
"firstOnly")).thenReturn(localSegments);
+ when(_remoteClusterRoutingManager1.getSegments(brokerRequest,
"firstOnly")).thenReturn(remoteSegments);
+ when(_remoteClusterRoutingManager2.getSegments(brokerRequest,
"firstOnly")).thenReturn(null);
+
+ List<String> result =
_multiClusterRoutingManager.getSegments(brokerRequest, "firstOnly");
+
+ assertNotNull(result);
+ assertEquals(result.size(), 2);
+ assertTrue(result.contains("localSeg"));
+ assertTrue(result.contains("remoteSeg"));
+ verify(_localClusterRoutingManager, never()).getSegments(brokerRequest);
+ verify(_remoteClusterRoutingManager1, never()).getSegments(brokerRequest);
+ verify(_remoteClusterRoutingManager2, never()).getSegments(brokerRequest);
+ }
+
@Test
public void testGetEnabledServerInstanceMapCombinesAll() {
ServerInstance server1 = createMockServerInstance("server1");
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactoryTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactoryTest.java
new file mode 100644
index 00000000000..f3c575d2891
--- /dev/null
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactoryTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.tablesampler;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import
org.apache.pinot.broker.routing.tablesampler.external.ExternalAnnotatedSampler;
+import org.apache.pinot.spi.annotations.tablesampler.TableSamplerProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TableSamplerFactoryTest {
+
+ @BeforeMethod
+ public void setUp() {
+ TableSamplerFactory.clearRegistry();
+ }
+
+ @Test
+ public void testRegisterAndCreate() {
+ TableSamplerFactory.register("customFirst",
FirstNSegmentsTableSampler.class.getName());
+
+ TableSampler sampler = TableSamplerFactory.create("customFirst");
+
+ Assert.assertTrue(sampler instanceof FirstNSegmentsTableSampler);
+ }
+
+ @Test
+ public void testDefaultAnnotationRegistration() {
+ TableSamplerFactory.init(new PinotConfiguration());
+
+ TableSampler sampler = TableSamplerFactory.create("annotatedSampler");
+
+ Assert.assertTrue(sampler instanceof AnnotatedSampler);
+ }
+
+ @Test
+ public void testConfiguredPackagesDoNotDisableDefault() {
+ PinotConfiguration config = new PinotConfiguration(
+ Map.of(CommonConstants.Broker.TABLE_SAMPLER_CONFIG_PREFIX +
".annotation.packages", "com.acme.missing"));
+
+
TableSamplerFactory.init(config.subset(CommonConstants.Broker.TABLE_SAMPLER_CONFIG_PREFIX));
+
+ TableSampler sampler = TableSamplerFactory.create("annotatedSampler");
+
+ Assert.assertTrue(sampler instanceof AnnotatedSampler);
+ }
+
+ @Test
+ public void testConfiguredPackagesLoadExternalSampler() {
+ PinotConfiguration config = new PinotConfiguration(Map.of(
+ CommonConstants.Broker.TABLE_SAMPLER_CONFIG_PREFIX +
".annotation.packages",
+ "org.apache.pinot.broker.routing.tablesampler.external"));
+
+
TableSamplerFactory.init(config.subset(CommonConstants.Broker.TABLE_SAMPLER_CONFIG_PREFIX));
+
+ TableSampler sampler =
TableSamplerFactory.create(ExternalAnnotatedSampler.TYPE);
+
+ Assert.assertTrue(sampler instanceof ExternalAnnotatedSampler);
+ }
+
+ @TableSamplerProvider(name = "annotatedSampler")
+ public static class AnnotatedSampler implements TableSampler {
+ @Override
+ public void init(TableConfig tableConfig, TableSamplerConfig samplerConfig,
+ ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ }
+
+ @Override
+ public Set<String> sampleSegments(Set<String> onlineSegments) {
+ return onlineSegments;
+ }
+ }
+}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/tablesampler/external/ExternalAnnotatedSampler.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/tablesampler/external/ExternalAnnotatedSampler.java
new file mode 100644
index 00000000000..0a30a8a2210
--- /dev/null
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/tablesampler/external/ExternalAnnotatedSampler.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.tablesampler.external;
+
+import java.util.Set;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.broker.routing.tablesampler.TableSampler;
+import org.apache.pinot.spi.annotations.tablesampler.TableSamplerProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+
+@TableSamplerProvider(name = ExternalAnnotatedSampler.TYPE)
+public class ExternalAnnotatedSampler implements TableSampler {
+ public static final String TYPE = "externalAnnotatedSampler";
+
+ @Override
+ public void init(TableConfig tableConfig, TableSamplerConfig samplerConfig,
+ ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ }
+
+ @Override
+ public Set<String> sampleSegments(Set<String> onlineSegments) {
+ return onlineSegments;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 6cd5f1da334..ede03f8e73b 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -41,7 +41,6 @@ import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.WindowOv
* Utils to parse query options.
*/
public class QueryOptionsUtils {
-
private QueryOptionsUtils() {
}
@@ -111,6 +110,14 @@ public class QueryOptionsUtils {
return checkedParseLongPositive(QueryOptionKey.TIMEOUT_MS,
timeoutMsString);
}
+ @Nullable
+ public static String getTableSampler(@Nullable Map<String, String>
queryOptions) {
+ if (queryOptions == null || queryOptions.isEmpty()) {
+ return null;
+ }
+ return queryOptions.get(QueryOptionKey.TABLE_SAMPLER);
+ }
+
@Nullable
public static Long getExtraPassiveTimeoutMs(Map<String, String>
queryOptions) {
String extraPassiveTimeoutMsString =
queryOptions.get(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
index 2575c2f6b8c..28f0100e0b8 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
@@ -45,6 +45,7 @@ import
org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -176,10 +177,17 @@ public class TableConfigSerDeUtils {
});
}
+ List<TableSamplerConfig> tableSamplerConfigs = null;
+ String tableSamplerConfigsString =
simpleFields.get(TableConfig.TABLE_SAMPLERS_KEY);
+ if (tableSamplerConfigsString != null) {
+ tableSamplerConfigs =
JsonUtils.stringToObject(tableSamplerConfigsString, new TypeReference<>() {
+ });
+ }
+
return new TableConfig(tableName, tableType, validationConfig,
tenantConfig, indexingConfig, customConfig,
quotaConfig, taskConfig, routingConfig, queryConfig,
instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
dedupConfig, dimensionTableConfig, ingestionConfig, tierConfigList,
isDimTable, tunerConfigList,
- instancePartitionsMap, segmentAssignmentConfigMap);
+ instancePartitionsMap, segmentAssignmentConfigMap,
tableSamplerConfigs);
}
public static ZNRecord toZNRecord(TableConfig tableConfig)
@@ -254,6 +262,10 @@ public class TableConfigSerDeUtils {
simpleFields.put(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY,
JsonUtils.objectToString(segmentAssignmentConfigMap));
}
+ List<TableSamplerConfig> tableSamplerConfigs =
tableConfig.getTableSamplers();
+ if (tableSamplerConfigs != null) {
+ simpleFields.put(TableConfig.TABLE_SAMPLERS_KEY,
JsonUtils.objectToString(tableSamplerConfigs));
+ }
ZNRecord znRecord = new ZNRecord(tableConfig.getTableName());
znRecord.setSimpleFields(simpleFields);
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
index 11d09427e59..6547db2880a 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
@@ -29,6 +29,7 @@ import org.testng.annotations.Test;
import static
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.*;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;
@@ -61,6 +62,20 @@ public class QueryOptionsUtilsTest {
assertEquals(resolved.get(USE_MULTISTAGE_ENGINE), "false");
}
+ @Test
+ public void shouldResolveSamplerOptionCaseInsensitively() {
+ Map<String, String> resolved =
QueryOptionsUtils.resolveCaseInsensitiveOptions(Map.of("SAMPLER", "firstOnly"));
+
+ assertEquals(resolved.get(TABLE_SAMPLER), "firstOnly");
+ }
+
+ @Test
+ public void shouldExtractTableSamplerOption() {
+ assertEquals(QueryOptionsUtils.getTableSampler(Map.of(TABLE_SAMPLER,
"firstOnly")), "firstOnly");
+ assertNull(QueryOptionsUtils.getTableSampler(Map.of()));
+ assertNull(QueryOptionsUtils.getTableSampler(null));
+ }
+
@Test
public void shouldReadIgnoreMissingSegmentsOption() {
// Given:
diff --git
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
index 41f86e4d2db..460b00d9757 100644
---
a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
+++
b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataWriter.scala
@@ -185,7 +185,7 @@ class PinotDataWriter[InternalRow](
new TableCustomConfig(null),
null, null, null, null, null, null, null,
null, null, null, null, false, null, null,
- null)
+ null, null)
val segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig,
pinotSchema)
segmentGeneratorConfig.setTableName(tableName)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
index e0e00cdd04e..5f1150c25cf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
@@ -83,6 +83,15 @@ public interface RoutingManager {
@Nullable
List<String> getSegments(BrokerRequest brokerRequest);
+ /**
+ * Returns the segments that are relevant for the given broker request and
optional sampler name.
+ * Returns {@code null} if the table does not exist.
+ */
+ @Nullable
+ default List<String> getSegments(BrokerRequest brokerRequest, @Nullable
String samplerName) {
+ return getSegments(brokerRequest);
+ }
+
/**
* Validate routing exist for a table
*
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TableSamplerIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TableSamplerIntegrationTest.java
new file mode 100644
index 00000000000..819da87ee0f
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TableSamplerIntegrationTest.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.RoutingConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class TableSamplerIntegrationTest extends
CustomDataQueryClusterIntegrationTest {
+ private static final int DAYS = 7;
+ private static final int SEGMENTS_PER_DAY = 10;
+ private static final int RECORDS_PER_SEGMENT = 1;
+ private static final int BASE_DAY = 20000;
+
+ private static final String DAYS_SINCE_EPOCH_COL = "DaysSinceEpoch";
+ private static final String PARTITION_KEY_COL = "PartitionKey";
+
+ @Override
+ public String getTableName() {
+ return "TableSamplerIntegrationTest";
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ return (long) DAYS * SEGMENTS_PER_DAY * RECORDS_PER_SEGMENT;
+ }
+
+ @Override
+ public Schema createSchema() {
+ return new Schema.SchemaBuilder().setSchemaName(getTableName())
+ .addDateTime(DAYS_SINCE_EPOCH_COL, FieldSpec.DataType.INT,
"1:DAYS:EPOCH", "1:DAYS")
+ .addSingleValueDimension(PARTITION_KEY_COL, FieldSpec.DataType.INT)
+ .build();
+ }
+
+ @Override
+ public TableConfig createOfflineTableConfig() {
+ Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new
HashMap<>();
+ columnPartitionConfigMap.put(PARTITION_KEY_COL, new
ColumnPartitionConfig("Modulo", 2));
+
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName())
+ .setTimeColumnName(DAYS_SINCE_EPOCH_COL)
+ .setTimeType("DAYS")
+ .setSegmentPartitionConfig(new
SegmentPartitionConfig(columnPartitionConfigMap))
+ .setRoutingConfig(
+ new RoutingConfig(null,
List.of(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE), null, null))
+ .build();
+ tableConfig.setTableSamplers(List.of(
+ new TableSamplerConfig("firstOnly", "firstN", Map.of("numSegments",
"1")),
+ new TableSamplerConfig("firstTwo", "firstN", Map.of("numSegments",
"2"))));
+ return tableConfig;
+ }
+
+ @Override
+ public List<File> createAvroFiles()
+ throws Exception {
+ var fieldAssembler = SchemaBuilder.record("myRecord").fields();
+ fieldAssembler.name(DAYS_SINCE_EPOCH_COL).type().intType().noDefault();
+ fieldAssembler.name(PARTITION_KEY_COL).type().intType().noDefault();
+ var avroSchema = fieldAssembler.endRecord();
+
+ List<File> files = new ArrayList<>();
+ for (int day = 0; day < DAYS; day++) {
+ int dayValue = BASE_DAY + day;
+ int partitionKey = day % 2;
+ for (int seg = 0; seg < SEGMENTS_PER_DAY; seg++) {
+ File avroFile = new File(_tempDir, "data_day_" + day + "_seg_" + seg +
".avro");
+ try (DataFileWriter<GenericData.Record> fileWriter =
+ new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+ fileWriter.create(avroSchema, avroFile);
+ for (int docId = 0; docId < RECORDS_PER_SEGMENT; docId++) {
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ record.put(DAYS_SINCE_EPOCH_COL, dayValue);
+ record.put(PARTITION_KEY_COL, partitionKey);
+ fileWriter.append(record);
+ }
+ }
+ files.add(avroFile);
+ }
+ }
+ return files;
+ }
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testFirstNSamplerForGroupByDay(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+ JsonNode full = postQuery("SELECT DaysSinceEpoch, COUNT(*) AS cnt FROM " +
getTableName()
+ + " GROUP BY DaysSinceEpoch ORDER BY DaysSinceEpoch");
+ JsonNode fullRows = full.path("resultTable").path("rows");
+ Assert.assertEquals(fullRows.size(), DAYS);
+ for (int i = 0; i < DAYS; i++) {
+ Assert.assertEquals(fullRows.get(i).get(0).asInt(), BASE_DAY + i);
+ Assert.assertEquals(fullRows.get(i).get(1).asLong(), (long)
SEGMENTS_PER_DAY * RECORDS_PER_SEGMENT);
+ }
+ Assert.assertEquals(full.path("numSegmentsQueried").asInt(), DAYS *
SEGMENTS_PER_DAY);
+
+ JsonNode sampled = postQuery(withSampler("SELECT DaysSinceEpoch, COUNT(*)
AS cnt FROM " + getTableName()
+ + " GROUP BY DaysSinceEpoch ORDER BY DaysSinceEpoch", "firstOnly"));
+ JsonNode sampledRows = sampled.path("resultTable").path("rows");
+ Assert.assertEquals(sampledRows.size(), 1);
+ Assert.assertEquals(sampledRows.get(0).get(0).asInt(), BASE_DAY);
+ Assert.assertEquals(sampledRows.get(0).get(1).asLong(), (long)
RECORDS_PER_SEGMENT);
+ Assert.assertEquals(sampled.path("numSegmentsQueried").asInt(), 1);
+
+ JsonNode sampledTwo = postQuery(withSampler("SELECT DaysSinceEpoch,
COUNT(*) AS cnt FROM " + getTableName()
+ + " GROUP BY DaysSinceEpoch ORDER BY DaysSinceEpoch", "firstTwo"));
+ JsonNode sampledTwoRows = sampledTwo.path("resultTable").path("rows");
+ long sampledTwoCount = 0L;
+ for (JsonNode row : sampledTwoRows) {
+ sampledTwoCount += row.get(1).asLong();
+ }
+ Assert.assertEquals(sampledTwoCount, 2L * RECORDS_PER_SEGMENT);
+ Assert.assertEquals(sampledTwo.path("numSegmentsQueried").asInt(), 2);
+ }
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testSamplerRoutingStillAppliesPartitionPruning(boolean
useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+ JsonNode full = postQuery("SELECT COUNT(*) AS cnt FROM " + getTableName()
+ " WHERE " + PARTITION_KEY_COL + " = 1");
+
Assert.assertEquals(full.path("resultTable").path("rows").get(0).get(0).asLong(),
+ 3L * SEGMENTS_PER_DAY * RECORDS_PER_SEGMENT);
+
+ JsonNode sampled = postQuery(
+ withSampler("SELECT COUNT(*) AS cnt FROM " + getTableName() + " WHERE
" + PARTITION_KEY_COL + " = 1",
+ "firstTwo"));
+
Assert.assertEquals(sampled.path("resultTable").path("rows").get(0).get(0).asLong(),
0L);
+ if (!useMultiStageQueryEngine) {
+ Assert.assertEquals(sampled.path("numSegmentsQueried").asInt(), 0);
+ }
+ }
+
+ private static String withSampler(String query, String samplerName) {
+ return "SET sampler='" + samplerName + "'; " + query;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 7446bb2e88f..a65d1cfcfc6 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -34,11 +34,13 @@ import java.util.Set;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.calcite.rel.rules.ImmutableTableOptions;
import org.apache.pinot.calcite.rel.rules.TableOptions;
import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.routing.LogicalTableRouteInfo;
import org.apache.pinot.core.routing.LogicalTableRouteProvider;
import org.apache.pinot.core.routing.RoutingManager;
@@ -445,7 +447,7 @@ public class WorkerManager {
Map<String, String> tableOptions = metadata.getTableOptions();
if (tableOptions != null) {
if
(Boolean.parseBoolean(tableOptions.get(PinotHintOptions.TableHintOptions.IS_REPLICATED)))
{
- setSegmentsForReplicatedLeafFragment(metadata);
+ setSegmentsForReplicatedLeafFragment(metadata, context);
return;
}
@@ -485,7 +487,8 @@ public class WorkerManager {
private void
assignWorkersToNonPartitionedLeafFragment(DispatchablePlanMetadata metadata,
DispatchablePlanContext context) {
String tableName = metadata.getScannedTables().get(0);
- Map<String, RoutingTable> routingTableMap = getRoutingTable(tableName,
context.getRequestId());
+ Map<String, RoutingTable> routingTableMap =
+ getRoutingTable(tableName, context.getRequestId(),
context.getPlannerContext().getOptions());
Preconditions.checkState(!routingTableMap.isEmpty(), "Unable to find
routing entries for table: %s", tableName);
// acquire time boundary info if it is a hybrid table.
@@ -553,40 +556,52 @@ public class WorkerManager {
* @return keyed-map from table type(s) to routing table(s).
*/
private Map<String, RoutingTable> getRoutingTable(String tableName, long
requestId) {
+ return getRoutingTable(tableName, requestId, Map.of());
+ }
+
+ private Map<String, RoutingTable> getRoutingTable(String tableName, long
requestId,
+ Map<String, String> queryOptions) {
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType == null) {
// Raw table name
Map<String, RoutingTable> routingTableMap = new HashMap<>(4);
RoutingTable offlineRoutingTable =
-
getRoutingTableHelper(TableNameBuilder.OFFLINE.tableNameWithType(tableName),
requestId);
+
getRoutingTableHelper(TableNameBuilder.OFFLINE.tableNameWithType(tableName),
requestId, queryOptions);
if (offlineRoutingTable != null) {
routingTableMap.put(TableType.OFFLINE.name(), offlineRoutingTable);
}
RoutingTable realtimeRoutingTable =
-
getRoutingTableHelper(TableNameBuilder.REALTIME.tableNameWithType(tableName),
requestId);
+
getRoutingTableHelper(TableNameBuilder.REALTIME.tableNameWithType(tableName),
requestId, queryOptions);
if (realtimeRoutingTable != null) {
routingTableMap.put(TableType.REALTIME.name(), realtimeRoutingTable);
}
return routingTableMap;
} else {
// Table name with type
- RoutingTable routingTable = getRoutingTableHelper(tableName, requestId);
+ RoutingTable routingTable = getRoutingTableHelper(tableName, requestId,
queryOptions);
return routingTable != null ? Map.of(tableType.name(), routingTable) :
Map.of();
}
}
@Nullable
- private RoutingTable getRoutingTableHelper(String tableNameWithType, long
requestId) {
- return _routingManager.getRoutingTable(
- CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM \"" +
tableNameWithType + "\""), requestId);
+ private RoutingTable getRoutingTableHelper(String tableNameWithType, long
requestId,
+ Map<String, String> queryOptions) {
+ BrokerRequest brokerRequest =
+ CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM \"" +
tableNameWithType + "\"");
+ if (MapUtils.isNotEmpty(queryOptions) && brokerRequest.isSetPinotQuery()) {
+ // Ensure query options (e.g. sampler) are visible to routing selection.
+ brokerRequest.getPinotQuery().setQueryOptions(new
HashMap<>(queryOptions));
+ }
+ return _routingManager.getRoutingTable(brokerRequest, requestId);
}
// --------------------------------------------------------------------------
// Replicated non-partitioned leaf stage assignment
// --------------------------------------------------------------------------
- private void setSegmentsForReplicatedLeafFragment(DispatchablePlanMetadata
metadata) {
+ private void setSegmentsForReplicatedLeafFragment(DispatchablePlanMetadata
metadata,
+ DispatchablePlanContext context) {
String tableName = metadata.getScannedTables().get(0);
- Map<String, List<String>> segmentsMap = getSegments(tableName);
+ Map<String, List<String>> segmentsMap = getSegments(tableName,
context.getPlannerContext().getOptions());
Preconditions.checkState(!segmentsMap.isEmpty(), "Unable to find segments
for table: %s", tableName);
// Acquire time boundary info if it is a hybrid table.
@@ -609,31 +624,34 @@ public class WorkerManager {
* Returns the segments for the given table, keyed by table type.
* TODO: It doesn't handle unavailable segments.
*/
- private Map<String, List<String>> getSegments(String tableName) {
+ private Map<String, List<String>> getSegments(String tableName, Map<String,
String> queryOptions) {
+ String samplerName = MapUtils.isNotEmpty(queryOptions) ?
QueryOptionsUtils.getTableSampler(queryOptions) : null;
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType == null) {
// Raw table name
Map<String, List<String>> segmentsMap = new HashMap<>(4);
- List<String> offlineSegments =
setSegmentsHelper(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
+ List<String> offlineSegments =
+
setSegmentsHelper(TableNameBuilder.OFFLINE.tableNameWithType(tableName),
samplerName);
if (CollectionUtils.isNotEmpty(offlineSegments)) {
segmentsMap.put(TableType.OFFLINE.name(), offlineSegments);
}
- List<String> realtimeSegments =
setSegmentsHelper(TableNameBuilder.REALTIME.tableNameWithType(tableName));
+ List<String> realtimeSegments =
+
setSegmentsHelper(TableNameBuilder.REALTIME.tableNameWithType(tableName),
samplerName);
if (CollectionUtils.isNotEmpty(realtimeSegments)) {
segmentsMap.put(TableType.REALTIME.name(), realtimeSegments);
}
return segmentsMap;
} else {
// Table name with type
- List<String> segments = setSegmentsHelper(tableName);
+ List<String> segments = setSegmentsHelper(tableName, samplerName);
return CollectionUtils.isNotEmpty(segments) ? Map.of(tableType.name(),
segments) : Map.of();
}
}
@Nullable
- private List<String> setSegmentsHelper(String tableNameWithType) {
+ private List<String> setSegmentsHelper(String tableNameWithType, @Nullable
String samplerName) {
return _routingManager.getSegments(
- CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM \"" +
tableNameWithType + "\""));
+ CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM \"" +
tableNameWithType + "\""), samplerName);
}
private void
assignWorkersToNonPartitionedLeafFragmentForLogicalTable(DispatchablePlanMetadata
metadata,
@@ -647,15 +665,22 @@ public class WorkerManager {
}
BrokerRequest offlineBrokerRequest = null;
BrokerRequest realtimeBrokerRequest = null;
+ Map<String, String> queryOptions =
context.getPlannerContext().getOptions();
if (logicalTableRouteInfo.hasOffline()) {
offlineBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest(
"SELECT * FROM \"" + logicalTableRouteInfo.getOfflineTableName() +
"\"");
+ if (MapUtils.isNotEmpty(queryOptions) &&
offlineBrokerRequest.isSetPinotQuery()) {
+ offlineBrokerRequest.getPinotQuery().setQueryOptions(new
HashMap<>(queryOptions));
+ }
}
if (logicalTableRouteInfo.hasRealtime()) {
realtimeBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest(
"SELECT * FROM \"" + logicalTableRouteInfo.getRealtimeTableName() +
"\"");
+ if (MapUtils.isNotEmpty(queryOptions) &&
realtimeBrokerRequest.isSetPinotQuery()) {
+ realtimeBrokerRequest.getPinotQuery().setQueryOptions(new
HashMap<>(queryOptions));
+ }
}
tableRouteProvider.calculateRoutes(logicalTableRouteInfo, _routingManager,
offlineBrokerRequest,
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java
index 140da3d22c5..a739e183e70 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java
@@ -80,7 +80,7 @@ public class CLPForwardIndexCreatorTest implements
PinotBuffersAfterMethodCheckR
TableConfig tableConfig =
new TableConfig("mytable", TableType.REALTIME.name(), new
SegmentsValidationAndRetentionConfig(),
new TenantConfig(null, null, null), new IndexingConfig(), new
TableCustomConfig(null), null, null, null,
- null, null, null, null, null, null, null, null, false, null, null,
null);
+ null, null, null, null, null, null, null, null, false, null, null,
null, null);
List<FieldConfig> fieldConfigList = new ArrayList<>();
fieldConfigList.add(new FieldConfig("column1",
FieldConfig.EncodingType.RAW, Collections.EMPTY_LIST,
FieldConfig.CompressionCodec.CLP, Collections.EMPTY_MAP));
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index ba2495355d3..35a1c6208c6 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -3224,7 +3224,7 @@ public class TableConfigUtilsTest {
new TableConfig("table", TableType.OFFLINE.name(), new
SegmentsValidationAndRetentionConfig(),
new TenantConfig("DefaultTenant", "DefaultTenant", null), new
IndexingConfig(), new TableCustomConfig(null),
null, null, null, null, Map.of("OFFLINE", config), null, null,
null, null, null, null, false, null, null,
- null);
+ null, null);
// Should not throw
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig);
@@ -3240,7 +3240,7 @@ public class TableConfigUtilsTest {
new TableConfig("table", TableType.REALTIME.name(), new
SegmentsValidationAndRetentionConfig(),
new TenantConfig("DefaultTenant", "DefaultTenant", null), new
IndexingConfig(), new TableCustomConfig(null),
null, null, null, null, Map.of("CONSUMING", config), null, null,
null, null, null, null, false, null, null,
- null);
+ null, null);
// Should not throw
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig);
@@ -3251,7 +3251,7 @@ public class TableConfigUtilsTest {
TableConfig tableConfig =
new TableConfig("table", TableType.OFFLINE.name(), new
SegmentsValidationAndRetentionConfig(),
new TenantConfig("DefaultTenant", "DefaultTenant", null), new
IndexingConfig(), new TableCustomConfig(null),
- null, null, null, null, null, null, null, null, null, null, null,
false, null, null, null);
+ null, null, null, null, null, null, null, null, null, null, null,
false, null, null, null, null);
assertThrows(IllegalStateException.class,
() ->
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig));
@@ -3262,7 +3262,7 @@ public class TableConfigUtilsTest {
TableConfig tableConfig =
new TableConfig("table", TableType.REALTIME.name(), new
SegmentsValidationAndRetentionConfig(),
new TenantConfig("DefaultTenant", "DefaultTenant", null), new
IndexingConfig(), new TableCustomConfig(null),
- null, null, null, null, null, null, null, null, null, null, null,
false, null, null, null);
+ null, null, null, null, null, null, null, null, null, null, null,
false, null, null, null, null);
assertThrows(IllegalStateException.class,
() ->
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig));
@@ -3278,7 +3278,7 @@ public class TableConfigUtilsTest {
new TableConfig("table", TableType.OFFLINE.name(), new
SegmentsValidationAndRetentionConfig(),
new TenantConfig("DefaultTenant", "DefaultTenant", null), new
IndexingConfig(), new TableCustomConfig(null),
null, null, null, null, Map.of("OFFLINE", config), null, null,
null, null, null, null, false, null, null,
- null);
+ null, null);
assertThrows(IllegalStateException.class,
() ->
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig));
@@ -3294,7 +3294,7 @@ public class TableConfigUtilsTest {
new TableConfig("table", TableType.REALTIME.name(), new
SegmentsValidationAndRetentionConfig(),
new TenantConfig("DefaultTenant", "DefaultTenant", null), new
IndexingConfig(), new TableCustomConfig(null),
null, null, null, null, Map.of("CONSUMING", config), null, null,
null, null, null, null, false, null, null,
- null);
+ null, null);
assertThrows(IllegalStateException.class,
() ->
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig));
@@ -3310,7 +3310,7 @@ public class TableConfigUtilsTest {
new TableConfig("table", TableType.OFFLINE.name(), new
SegmentsValidationAndRetentionConfig(),
new TenantConfig("DefaultTenant", "DefaultTenant", null), new
IndexingConfig(), new TableCustomConfig(null),
null, null, null, null, Map.of("OFFLINE", config), null, null,
null, null, null, null, false, null, null,
- null);
+ null, null);
assertThrows(IllegalStateException.class,
() ->
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig));
@@ -3326,7 +3326,7 @@ public class TableConfigUtilsTest {
new TableConfig("table", TableType.REALTIME.name(), new
SegmentsValidationAndRetentionConfig(),
new TenantConfig("DefaultTenant", "DefaultTenant", null), new
IndexingConfig(), new TableCustomConfig(null),
null, null, null, null, Map.of("CONSUMING", config), null, null,
null, null, null, null, false, null, null,
- null);
+ null, null);
assertThrows(IllegalStateException.class,
() ->
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig));
@@ -3337,7 +3337,7 @@ public class TableConfigUtilsTest {
TableConfig tableConfig =
new TableConfig("table", TableType.OFFLINE.name(), new
SegmentsValidationAndRetentionConfig(),
new TenantConfig("DefaultTenant", "DefaultTenant", null), new
IndexingConfig(), new TableCustomConfig(null),
- null, null, null, null, null, null, null, null, null, null, null,
true, null, null, null);
+ null, null, null, null, null, null, null, null, null, null, null,
true, null, null, null, null);
// Should not throw
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig);
@@ -3350,7 +3350,8 @@ public class TableConfigUtilsTest {
TableConfig tableConfig =
new TableConfig("table", TableType.OFFLINE.name(), new
SegmentsValidationAndRetentionConfig(),
new TenantConfig("DefaultTenant", "DefaultTenant", null), new
IndexingConfig(), new TableCustomConfig(null),
- null, null, null, null, null, null, null, null, null, null, null,
true, null, instancePartitionsMap, null);
+ null, null, null, null, null, null, null, null, null, null, null,
true, null, instancePartitionsMap, null,
+ null);
// Should not throw
TableConfigUtils.validateInstancePoolsAndReplicaGroups(tableConfig);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/tablesampler/TableSamplerProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/tablesampler/TableSamplerProvider.java
new file mode 100644
index 00000000000..21f499fc26b
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/tablesampler/TableSamplerProvider.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.annotations.tablesampler;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+/**
+ * Annotation for table sampler providers.
+ *
+ * NOTE:
+ * - The annotated class must implement {@code
org.apache.pinot.broker.routing.tablesampler.TableSampler}.
+ * - The annotated class must be public and concrete with a no-arg
constructor.
+ * - The class must be discoverable via the packages configured with
+ * {@code pinot.broker.table.sampler.annotation.packages}.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface TableSamplerProvider {
+
+ /**
+ * Alias name for the sampler (used in table config).
+ */
+ String name();
+
+ boolean enabled() default true;
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index 6f0e132fa7e..5df33cc7b92 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -23,14 +23,19 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.spi.config.BaseJsonConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -58,6 +63,7 @@ public class TableConfig extends BaseJsonConfig {
public static final String TIER_CONFIGS_LIST_KEY = "tierConfigs";
public static final String TUNER_CONFIG_LIST_KEY = "tunerConfigs";
public static final String TIER_OVERWRITES_KEY = "tierOverwrites";
+ public static final String TABLE_SAMPLERS_KEY = "tableSamplers";
// Double underscore is reserved for real-time segment name delimiter
public static final String TABLE_NAME_FORBIDDEN_SUBSTRING = "__";
@@ -113,6 +119,9 @@ public class TableConfig extends BaseJsonConfig {
@JsonPropertyDescription(value = "Configs for Table config tuner")
private List<TunerConfig> _tunerConfigList;
+ @JsonPropertyDescription(value = "Configs for table samplers")
+ private List<TableSamplerConfig> _tableSamplers;
+
@JsonCreator
public TableConfig(@JsonProperty(value = TABLE_NAME_KEY, required = true)
String tableName,
@JsonProperty(value = TABLE_TYPE_KEY, required = true) String tableType,
@@ -138,7 +147,8 @@ public class TableConfig extends BaseJsonConfig {
@JsonProperty(INSTANCE_PARTITIONS_MAP_CONFIG_KEY) @Nullable
Map<InstancePartitionsType, String> instancePartitionsMap,
@JsonProperty(SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY) @Nullable
- Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap) {
+ Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap,
+ @JsonProperty(TABLE_SAMPLERS_KEY) @Nullable List<TableSamplerConfig>
tableSamplers) {
Preconditions.checkArgument(tableName != null, "'tableName' must be
configured");
Preconditions.checkArgument(!tableName.contains(TABLE_NAME_FORBIDDEN_SUBSTRING),
"'tableName' cannot contain double underscore ('__')");
@@ -169,6 +179,7 @@ public class TableConfig extends BaseJsonConfig {
_tunerConfigList = tunerConfigList;
_instancePartitionsMap = instancePartitionsMap;
_segmentAssignmentConfigMap = segmentAssignmentConfigMap;
+ _tableSamplers = sanitizeAndValidateTableSamplers(tableSamplers);
}
public TableConfig(TableConfig tableConfig) {
@@ -193,6 +204,7 @@ public class TableConfig extends BaseJsonConfig {
_tunerConfigList = tableConfig.getTunerConfigsList();
_instancePartitionsMap = tableConfig.getInstancePartitionsMap();
_segmentAssignmentConfigMap = tableConfig.getSegmentAssignmentConfigMap();
+ _tableSamplers =
sanitizeAndValidateTableSamplers(tableConfig.getTableSamplers());
}
@JsonProperty(TABLE_NAME_KEY)
@@ -254,6 +266,46 @@ public class TableConfig extends BaseJsonConfig {
_customConfig = customConfig;
}
+ @JsonProperty(TABLE_SAMPLERS_KEY)
+ @Nullable
+ public List<TableSamplerConfig> getTableSamplers() {
+ return _tableSamplers;
+ }
+
+ public void setTableSamplers(@Nullable List<TableSamplerConfig>
tableSamplers) {
+ _tableSamplers = sanitizeAndValidateTableSamplers(tableSamplers);
+ }
+
+ @Nullable
+ private static List<TableSamplerConfig> sanitizeAndValidateTableSamplers(
+ @Nullable List<TableSamplerConfig> tableSamplers) {
+ if (tableSamplers == null || tableSamplers.isEmpty()) {
+ return null;
+ }
+ List<TableSamplerConfig> sanitized = new ArrayList<>(tableSamplers.size());
+ Set<String> seenNormalizedNames = new HashSet<>();
+ for (TableSamplerConfig config : tableSamplers) {
+ if (config != null) {
+ String name = config.getName();
+ if (name != null) {
+ String trimmedName = name.trim();
+ if (trimmedName.isEmpty()) {
+ throw new IllegalArgumentException("Table sampler name cannot be
blank");
+ }
+ String normalizedName = trimmedName.toLowerCase(Locale.ROOT);
+ if (!seenNormalizedNames.add(normalizedName)) {
+ throw new IllegalArgumentException("Duplicate table sampler name:
" + trimmedName);
+ }
+ }
+ sanitized.add(config);
+ }
+ }
+ if (sanitized.isEmpty()) {
+ return null;
+ }
+ return sanitized;
+ }
+
@JsonProperty(QUOTA_CONFIG_KEY)
@Nullable
public QuotaConfig getQuotaConfig() {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/sampler/TableSamplerConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/sampler/TableSamplerConfig.java
new file mode 100644
index 00000000000..bf804c173c3
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/sampler/TableSamplerConfig.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table.sampler;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+/**
+ * Configuration for a table sampler.
+ *
+ * Samplers are defined in {@link
org.apache.pinot.spi.config.table.TableConfig} and can be selected at query time
+ * via a query option. The sampler type can be one of the built-in types, a
fully qualified class name, or an alias
+ * discovered via broker-side annotation scanning. Additional annotation
packages can be configured via
+ * {@code pinot.broker.table.sampler.annotation.packages}.
+ */
+public class TableSamplerConfig extends BaseJsonConfig {
+ private final String _name;
+ private final String _type;
+ private final Map<String, String> _properties;
+
+ @JsonCreator
+ public TableSamplerConfig(@JsonProperty(value = "name", required = true)
String name,
+ @JsonProperty(value = "type", required = true) String type,
+ @JsonProperty("properties") @Nullable Map<String, String> properties) {
+ _name = name;
+ _type = type;
+ _properties = properties;
+ }
+
+ public String getName() {
+ return _name;
+ }
+
+ public String getType() {
+ return _type;
+ }
+
+ @Nullable
+ public Map<String, String> getProperties() {
+ return _properties;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 2c97a55df07..bc05f2c7720 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -336,6 +336,9 @@ public class CommonConstants {
public static final String ACCESS_CONTROL_CONFIG_PREFIX =
"pinot.broker.access.control";
public static final String METRICS_CONFIG_PREFIX = "pinot.broker.metrics";
public static final String EVENT_LISTENER_CONFIG_PREFIX =
"pinot.broker.event.listener";
+ // Prefix for table sampler configs:
+ // - pinot.broker.table.sampler.annotation.packages=<comma-separated
packages>
+ public static final String TABLE_SAMPLER_CONFIG_PREFIX =
"pinot.broker.table.sampler";
public static final String CONFIG_OF_METRICS_NAME_PREFIX =
"pinot.broker.metrics.prefix";
public static final String DEFAULT_METRICS_NAME_PREFIX = "pinot.broker.";
@@ -647,6 +650,7 @@ public class CommonConstants {
public static final String USE_STAR_TREE = "useStarTree";
public static final String SCAN_STAR_TREE_NODES = "scanStarTreeNodes";
public static final String ROUTING_OPTIONS = "routingOptions";
+ public static final String TABLE_SAMPLER = "sampler";
public static final String USE_SCAN_REORDER_OPTIMIZATION =
"useScanReorderOpt";
public static final String MAX_EXECUTION_THREADS =
"maxExecutionThreads";
public static final String COLLECT_GC_STATS = "collectGCStats";
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index 07d518fb4b7..52bc8023f7e 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -51,6 +51,7 @@ import
org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
public class TableConfigBuilder {
@@ -125,6 +126,7 @@ public class TableConfigBuilder {
private TableTaskConfig _taskConfig;
private RoutingConfig _routingConfig;
private QueryConfig _queryConfig;
+ private List<TableSamplerConfig> _tableSamplers;
private Map<String, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
private Map<InstancePartitionsType, String> _instancePartitionsMap;
private Map<String, SegmentAssignmentConfig> _segmentAssignmentConfigMap;
@@ -412,6 +414,11 @@ public class TableConfigBuilder {
return this;
}
+ public TableConfigBuilder setTableSamplers(List<TableSamplerConfig>
tableSamplers) {
+ _tableSamplers = tableSamplers;
+ return this;
+ }
+
public TableConfigBuilder setInstanceAssignmentConfigMap(
Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap) {
_instanceAssignmentConfigMap = instanceAssignmentConfigMap;
@@ -538,6 +545,6 @@ public class TableConfigBuilder {
return new TableConfig(_tableName, _tableType.toString(),
validationConfig, tenantConfig, indexingConfig,
_customConfig, _quotaConfig, _taskConfig, _routingConfig,
_queryConfig, _instanceAssignmentConfigMap,
_fieldConfigList, _upsertConfig, _dedupConfig, _dimensionTableConfig,
_ingestionConfig, _tierConfigList,
- _isDimTable, _tunerConfigList, _instancePartitionsMap,
_segmentAssignmentConfigMap);
+ _isDimTable, _tunerConfigList, _instancePartitionsMap,
_segmentAssignmentConfigMap, _tableSamplers);
}
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/TableConfigTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/TableConfigTest.java
index 4143b7ce2df..82119214e7a 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/TableConfigTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/TableConfigTest.java
@@ -27,8 +27,10 @@ import java.util.Map;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -104,4 +106,36 @@ public class TableConfigTest {
assertEquals(config, copy);
assertEquals(config.toJsonString(), copy.toJsonString());
}
+
+ @Test
+ public void testDuplicateTableSamplerNamesRejected() {
+ TableConfig config = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ List<TableSamplerConfig> duplicateSamplers = List.of(
+ new TableSamplerConfig("sampler1", "firstN", Map.of("numSegments",
"10")),
+ new TableSamplerConfig("sampler1", "firstN", Map.of("numSegments",
"1")));
+ IllegalArgumentException e =
Assert.expectThrows(IllegalArgumentException.class,
+ () -> config.setTableSamplers(duplicateSamplers));
+ assertTrue(e.getMessage().contains("Duplicate table sampler name:
sampler1"));
+ }
+
+ @Test
+ public void testDuplicateTableSamplerNamesRejectedAfterNormalization() {
+ TableConfig config = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ List<TableSamplerConfig> duplicateSamplers = List.of(
+ new TableSamplerConfig("sampler1", "firstN", Map.of("numSegments",
"10")),
+ new TableSamplerConfig(" Sampler1 ", "firstN", Map.of("numSegments",
"1")));
+ IllegalArgumentException e =
Assert.expectThrows(IllegalArgumentException.class,
+ () -> config.setTableSamplers(duplicateSamplers));
+ assertTrue(e.getMessage().contains("Duplicate table sampler name:
Sampler1"));
+ }
+
+ @Test
+ public void testBlankTableSamplerNameRejected() {
+ TableConfig config = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ List<TableSamplerConfig> samplers =
+ List.of(new TableSamplerConfig(" ", "firstN", Map.of("numSegments",
"10")));
+ IllegalArgumentException e =
+ Assert.expectThrows(IllegalArgumentException.class, () ->
config.setTableSamplers(samplers));
+ assertTrue(e.getMessage().contains("Table sampler name cannot be blank"));
+ }
}
diff --git
a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
index 5558a95609a..80e59c59493 100644
---
a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
+++
b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
@@ -108,6 +108,15 @@
"metadata": {
"customConfigs": {}
},
+ "tableSamplers": [
+ {
+ "name": "small",
+ "type": "firstN",
+ "properties": {
+ "numSegments": "1"
+ }
+ }
+ ],
"ingestionConfig": {
"transformConfigs": [
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]