Copilot commented on code in PR #17532: URL: https://github.com/apache/pinot/pull/17532#discussion_r2768450026
########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TableSamplerIntegrationTest.java: ########## @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.custom; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.Assert; +import org.testng.annotations.Test; + + +@Test(suiteName = "CustomClusterIntegrationTest") +public class TableSamplerIntegrationTest extends CustomDataQueryClusterIntegrationTest { + private static final int DAYS = 7; + private static final int SEGMENTS_PER_DAY = 10; + private static final int RECORDS_PER_SEGMENT = 1; + private static final int BASE_DAY = 20000; + + private static final String DAYS_SINCE_EPOCH_COL = "DaysSinceEpoch"; + + @Override + public String getTableName() { + return "TableSamplerIntegrationTest"; + } + + @Override + protected long getCountStarResult() { + return (long) DAYS * SEGMENTS_PER_DAY * RECORDS_PER_SEGMENT; + } + + @Override + public Schema createSchema() { + return new Schema.SchemaBuilder().setSchemaName(getTableName()) + .addDateTime(DAYS_SINCE_EPOCH_COL, FieldSpec.DataType.INT, "1:DAYS:EPOCH", "1:DAYS") + .build(); + } + + @Override + public TableConfig createOfflineTableConfig() { + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()) + .setTimeColumnName(DAYS_SINCE_EPOCH_COL) + .setTimeType("DAYS") + .build(); + tableConfig.setTableSamplers(List.of( + new TableSamplerConfig("firstOnly", "firstN", Map.of("numSegments", "1")))); + return tableConfig; + } + + @Override + public List<File> createAvroFiles() + throws Exception { + org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); + avroSchema.setFields(List.of( + new org.apache.avro.Schema.Field(DAYS_SINCE_EPOCH_COL, + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), null, null))); + + List<File> files = new ArrayList<>(); + for (int day = 0; day < DAYS; day++) { + int dayValue = BASE_DAY + day; + for (int seg = 0; seg < SEGMENTS_PER_DAY; seg++) { + File avroFile = new File(_tempDir, "data_day_" + day + "_seg_" + seg + ".avro"); + try (DataFileWriter<GenericData.Record> fileWriter = + new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { + fileWriter.create(avroSchema, avroFile); + for (int docId = 0; docId < RECORDS_PER_SEGMENT; docId++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put(DAYS_SINCE_EPOCH_COL, dayValue); + fileWriter.append(record); + } + } + files.add(avroFile); + } + } + return files; + } + + @Test(dataProvider = "useBothQueryEngines") + public void testFirstNSamplerForGroupByDay(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + + JsonNode full = postQuery("SELECT DaysSinceEpoch, COUNT(*) AS cnt FROM " + getTableName() + + " GROUP BY DaysSinceEpoch ORDER BY DaysSinceEpoch"); + JsonNode fullRows = full.path("resultTable").path("rows"); + Assert.assertEquals(fullRows.size(), DAYS); + for (int i = 0; i < DAYS; i++) { + Assert.assertEquals(fullRows.get(i).get(0).asInt(), BASE_DAY + i); + Assert.assertEquals(fullRows.get(i).get(1).asLong(), (long) SEGMENTS_PER_DAY * RECORDS_PER_SEGMENT); + } + Assert.assertEquals(Integer.parseInt(full.path("numSegmentsQueried").asText()), DAYS * SEGMENTS_PER_DAY); + + JsonNode sampled = postQueryWithOptions("SELECT DaysSinceEpoch, COUNT(*) AS cnt FROM " + getTableName() + + " GROUP BY DaysSinceEpoch ORDER BY DaysSinceEpoch", + "tableSampler=firstOnly"); Review Comment: The test only validates the `firstN` sampler with `numSegments=1`. Consider adding test coverage for other scenarios such as numSegments > 1, and potentially for the `timeBucket` sampler mentioned in the PR description. ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java: ########## @@ -1087,10 +1277,37 @@ private Map<ServerInstance, SegmentsToQuery> getServerInstanceToSegmentsMap(Stri @Override public List<String> getSegments(BrokerRequest brokerRequest) { String tableNameWithType = brokerRequest.getQuerySource().getTableName(); - RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType); + RoutingEntry routingEntry = getRoutingEntry(brokerRequest, tableNameWithType); return routingEntry != null ? routingEntry.getSegments(brokerRequest) : null; } + @Nullable + private RoutingEntry getRoutingEntry(BrokerRequest brokerRequest, String tableNameWithType) { + String samplerName = extractSamplerName(brokerRequest); + if (StringUtils.isNotBlank(samplerName)) { + Map<String, RoutingEntry> samplerEntries = _samplerRoutingEntryMap.get(tableNameWithType); + RoutingEntry samplerEntry = samplerEntries != null ? samplerEntries.get(samplerName) : null; + if (samplerEntry != null) { + return samplerEntry; + } + LOGGER.warn("Requested sampler '{}' not found for table '{}'; falling back to default routing entry", + samplerName, tableNameWithType); + } + return _routingEntryMap.get(tableNameWithType); + } + Review Comment: This warning is logged on every query when a sampler is not found, which could generate excessive logs if a misconfigured sampler name is used repeatedly. Consider rate-limiting this warning or logging it at debug level after the first occurrence. ```suggestion if (MissingSamplerWarningTracker.shouldLogWarn(tableNameWithType, samplerName)) { LOGGER.warn("Requested sampler '{}' not found for table '{}'; falling back to default routing entry", samplerName, tableNameWithType); } else if (LOGGER.isDebugEnabled()) { LOGGER.debug( "Requested sampler '{}' not found for table '{}'; falling back to default routing entry " + "(subsequent occurrences logged at DEBUG level only)", samplerName, tableNameWithType); } } return _routingEntryMap.get(tableNameWithType); } /** * Tracks sampler/table combinations that have already emitted a warning, to avoid excessive logs * when a misconfigured sampler name is used repeatedly. */ private static final class MissingSamplerWarningTracker { private static final Map<String, Set<String>> TABLE_TO_SAMPLERS = new ConcurrentHashMap<>(); private MissingSamplerWarningTracker() { } static boolean shouldLogWarn(String tableNameWithType, String samplerName) { Set<String> samplers = TABLE_TO_SAMPLERS.computeIfAbsent(tableNameWithType, k -> ConcurrentHashMap.newKeySet()); // Returns true only the first time we see this sampler for the given table. return samplers.add(samplerName); } } ``` ########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java: ########## @@ -254,6 +265,38 @@ public void setCustomConfig(TableCustomConfig customConfig) { _customConfig = customConfig; } + @JsonProperty(TABLE_SAMPLERS_KEY) + @Nullable + public List<TableSamplerConfig> getTableSamplers() { + return _tableSamplers; + } + + public void setTableSamplers(@Nullable List<TableSamplerConfig> tableSamplers) { + _tableSamplers = sanitizeTableSamplers(tableSamplers); + } + + @Nullable + private static List<TableSamplerConfig> sanitizeTableSamplers(@Nullable List<TableSamplerConfig> tableSamplers) { Review Comment: The method name `sanitizeTableSamplers` doesn't clearly indicate that it also validates for duplicates. Consider renaming to `validateAndSanitizeTableSamplers` or `sanitizeAndValidateTableSamplers` to better reflect its dual purpose. ```suggestion _tableSamplers = validateAndSanitizeTableSamplers(tableSamplers); } @Nullable private static List<TableSamplerConfig> validateAndSanitizeTableSamplers(@Nullable List<TableSamplerConfig> tableSamplers) { ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java: ########## @@ -820,6 +940,69 @@ private void buildRoutingInternal(String tableNameWithType) { LOGGER.info("Rebuilt routing for table: {}", tableNameWithType); } + // Build routing entries for configured table samplers (if any). + // These entries use the same underlying routing components, but operate on a deterministically sampled set of + // segments selected during routing build. + List<TableSamplerConfig> tableSamplerConfigs = tableConfig.getTableSamplers(); + if (tableSamplerConfigs != null && !tableSamplerConfigs.isEmpty()) { + Map<String, RoutingEntry> samplerRoutingEntries = new HashMap<>(); + for (TableSamplerConfig samplerConfig : tableSamplerConfigs) { + String samplerName = samplerConfig.getName(); + String samplerType = samplerConfig.getType(); + if (StringUtils.isBlank(samplerName) || StringUtils.isBlank(samplerType)) { + LOGGER.warn("Skipping invalid table sampler config for table: {}, samplerName: {}, samplerType: {}", + tableNameWithType, samplerName, samplerType); + continue; + } + try { + TableSampler sampler = TableSamplerFactory.create(samplerType); + sampler.init(tableNameWithType, tableConfig, samplerConfig, _propertyStore); + + SegmentPreSelector samplerSegmentPreSelector = + new TableSamplerSegmentPreSelector(segmentPreSelector, sampler); + Set<String> samplerPreSelectedOnlineSegments = + // SegmentPreSelector contract allows mutation of the input set. Avoid passing the shared + // onlineSegments. Review Comment: The comment describes the contract about mutation but doesn't explain why we're creating a copy. Consider expanding the comment to clarify that we're creating a defensive copy to protect the shared `onlineSegments` set from being mutated by the sampler, as this set is reused for other samplers. ```suggestion // SegmentPreSelector contract allows mutation of the input set. Create a defensive copy here so // the sampler cannot mutate the shared onlineSegments set, which is reused by other samplers. ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactory.java: ########## @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.routing.tablesampler; + +import com.google.common.annotations.VisibleForTesting; +import java.lang.reflect.Modifier; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.annotations.tablesampler.TableSamplerProvider; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.utils.PinotReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TableSamplerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(TableSamplerFactory.class); + private static final String ANNOTATION_PACKAGES_KEY = "annotation.packages"; + private static final List<String> DEFAULT_ANNOTATION_PACKAGES = + List.of("org.apache.pinot.broker.routing.tablesampler"); + private static final Map<String, String> REGISTERED_TABLE_SAMPLER_CLASS_MAP = new ConcurrentHashMap<>(); + + private TableSamplerFactory() { + } + + public static void init(PinotConfiguration tableSamplerConfig) { + if (tableSamplerConfig == null) { + return; + } + registerAnnotatedTableSamplers(tableSamplerConfig); + } + + public static void register(String alias, String className) { + if (StringUtils.isBlank(alias)) { + LOGGER.warn("Skipping table sampler registration because alias is blank"); + return; + } + if (StringUtils.isBlank(className)) { + LOGGER.warn("Skipping table sampler registration for alias '{}' because class name is blank", alias); + return; + } + String normalizedAlias = normalizeType(alias); + String trimmedClassName = className.trim(); + String previousClassName = REGISTERED_TABLE_SAMPLER_CLASS_MAP.put(normalizedAlias, trimmedClassName); + if (previousClassName == null) { + LOGGER.info("Registered table sampler alias '{}' -> '{}'", alias, trimmedClassName); + } else if (!previousClassName.equals(trimmedClassName)) { + LOGGER.warn("Overriding table sampler alias '{}' from '{}' to '{}'", alias, previousClassName, trimmedClassName); + } + } + + public static TableSampler create(String type) { + String resolvedClassName = resolveClassName(type); + String classNameToLoad = resolvedClassName != null ? resolvedClassName : type; + try { + return PluginManager.get().createInstance(classNameToLoad); + } catch (Exception e) { + String errorMessage = resolvedClassName != null + ? String.format("Failed to create TableSampler for alias '%s' mapped to class '%s'", type, + resolvedClassName) + : "Failed to create TableSampler for type: " + type; + throw new RuntimeException(errorMessage, e); + } + } + + @VisibleForTesting + static void clearRegistry() { + REGISTERED_TABLE_SAMPLER_CLASS_MAP.clear(); + } + + private static void registerAnnotatedTableSamplers(PinotConfiguration tableSamplerConfig) { + List<String> configuredPackages = tableSamplerConfig.getProperty(ANNOTATION_PACKAGES_KEY, List.of()); + LinkedHashSet<String> combinedPackages = new LinkedHashSet<>(DEFAULT_ANNOTATION_PACKAGES); + if (configuredPackages != null) { + for (String packageName : configuredPackages) { + if (StringUtils.isNotBlank(packageName)) { + combinedPackages.add(packageName.trim()); + } + } + } + List<String> sanitizedPackages = combinedPackages.stream().collect(Collectors.toList()); Review Comment: The `combinedPackages` is already a LinkedHashSet which maintains insertion order. Converting to a List via stream is unnecessary. Consider using `new ArrayList<>(combinedPackages)` for a more direct conversion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
