Copilot commented on code in PR #17532:
URL: https://github.com/apache/pinot/pull/17532#discussion_r2768928080


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/FirstNSegmentsTableSampler.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.tablesampler;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.annotations.tablesampler.TableSamplerProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+
+
+/**
+ * Selects the first N segments after sorting segment names lexicographically.
+ *
+ * <p>Config:
+ * <ul>
+ *   <li>{@code properties.numSegments}: positive integer</li>
+ * </ul>
+ */
+@TableSamplerProvider(name = FirstNSegmentsTableSampler.TYPE)
+public class FirstNSegmentsTableSampler implements TableSampler {
+  public static final String TYPE = "firstN";
+  public static final String PROP_NUM_SEGMENTS = "numSegments";
+
+  private int _numSegments;
+
+  @Override
+  public void init(String tableNameWithType, TableConfig tableConfig, 
TableSamplerConfig samplerConfig,
+      ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    Map<String, String> props = samplerConfig.getProperties();
+    if (MapUtils.isEmpty(props) || !props.containsKey(PROP_NUM_SEGMENTS)) {
+      throw new IllegalArgumentException(
+          "Missing required property '" + PROP_NUM_SEGMENTS + "' for table 
sampler type '" + TYPE + "'");
+    }
+    _numSegments = Integer.parseInt(props.get(PROP_NUM_SEGMENTS));

Review Comment:
   The NumberFormatException from parseInt is not caught, resulting in an 
unclear error message. Wrap this in a try-catch and provide a clear message 
indicating the invalid value and expected format.
   ```suggestion
       String numSegmentsString = props.get(PROP_NUM_SEGMENTS);
       try {
         _numSegments = Integer.parseInt(numSegmentsString);
       } catch (NumberFormatException e) {
         throw new IllegalArgumentException(
             "Invalid value for '" + PROP_NUM_SEGMENTS + "': '" + 
numSegmentsString + "'. Expected a positive integer.",
             e);
       }
   ```



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TableSamplerIntegrationTest.java:
##########
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class TableSamplerIntegrationTest extends 
CustomDataQueryClusterIntegrationTest {
+  private static final int DAYS = 7;
+  private static final int SEGMENTS_PER_DAY = 10;
+  private static final int RECORDS_PER_SEGMENT = 1;
+  private static final int BASE_DAY = 20000;
+
+  private static final String DAYS_SINCE_EPOCH_COL = "DaysSinceEpoch";
+
+  @Override
+  public String getTableName() {
+    return "TableSamplerIntegrationTest";
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return (long) DAYS * SEGMENTS_PER_DAY * RECORDS_PER_SEGMENT;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(getTableName())
+        .addDateTime(DAYS_SINCE_EPOCH_COL, FieldSpec.DataType.INT, 
"1:DAYS:EPOCH", "1:DAYS")
+        .build();
+  }
+
+  @Override
+  public TableConfig createOfflineTableConfig() {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName())
+        .setTimeColumnName(DAYS_SINCE_EPOCH_COL)
+        .setTimeType("DAYS")
+        .build();
+    tableConfig.setTableSamplers(List.of(
+        new TableSamplerConfig("firstOnly", "firstN", Map.of("numSegments", 
"1")),
+        new TableSamplerConfig("firstTwo", "firstN", Map.of("numSegments", 
"2"))));
+    return tableConfig;
+  }
+
+  @Override
+  public List<File> createAvroFiles()
+      throws Exception {
+    var fieldAssembler = SchemaBuilder.record("myRecord").fields();
+    fieldAssembler.name(DAYS_SINCE_EPOCH_COL).type().intType().noDefault();
+    var avroSchema = fieldAssembler.endRecord();
+
+    List<File> files = new ArrayList<>();
+    for (int day = 0; day < DAYS; day++) {
+      int dayValue = BASE_DAY + day;
+      for (int seg = 0; seg < SEGMENTS_PER_DAY; seg++) {

Review Comment:
   The file naming pattern "data_day_X_seg_Y.avro" creates segments that sort 
lexicographically as intended, but this dependency on naming for test 
correctness is fragile. Consider adding a comment explaining this requirement 
or using a more explicit segment ordering mechanism.
   ```suggestion
         for (int seg = 0; seg < SEGMENTS_PER_DAY; seg++) {
           // NOTE: The table sampler tests rely on the lexicographic ordering 
of segment names.
           // Keep the "data_day_<day>_seg_<seg>.avro" pattern so segments sort 
by day, then by segment index.
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -1087,10 +1277,59 @@ private Map<ServerInstance, SegmentsToQuery> 
getServerInstanceToSegmentsMap(Stri
   @Override
   public List<String> getSegments(BrokerRequest brokerRequest) {
     String tableNameWithType = brokerRequest.getQuerySource().getTableName();
-    RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
+    RoutingEntry routingEntry = getRoutingEntry(brokerRequest, 
tableNameWithType);
     return routingEntry != null ? routingEntry.getSegments(brokerRequest) : 
null;
   }
 
+  @Nullable
+  private RoutingEntry getRoutingEntry(BrokerRequest brokerRequest, String 
tableNameWithType) {
+    String samplerName = extractSamplerName(brokerRequest);
+    if (StringUtils.isNotBlank(samplerName)) {
+      Map<String, RoutingEntry> samplerEntries = 
_samplerRoutingEntryMap.get(tableNameWithType);
+      RoutingEntry samplerEntry = samplerEntries != null ? 
samplerEntries.get(samplerName) : null;

Review Comment:
   The sampler name lookup is case-sensitive, but sampler names in table config 
may vary in case. Consider normalizing the sampler name (e.g., to lowercase) 
before lookup to avoid mismatches.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java:
##########
@@ -254,6 +265,39 @@ public void setCustomConfig(TableCustomConfig 
customConfig) {
     _customConfig = customConfig;
   }
 
+  @JsonProperty(TABLE_SAMPLERS_KEY)
+  @Nullable
+  public List<TableSamplerConfig> getTableSamplers() {
+    return _tableSamplers;
+  }
+
+  public void setTableSamplers(@Nullable List<TableSamplerConfig> 
tableSamplers) {
+    _tableSamplers = sanitizeAndValidateTableSamplers(tableSamplers);
+  }
+
+  @Nullable
+  private static List<TableSamplerConfig> sanitizeAndValidateTableSamplers(
+      @Nullable List<TableSamplerConfig> tableSamplers) {
+    if (tableSamplers == null || tableSamplers.isEmpty()) {
+      return null;
+    }
+    List<TableSamplerConfig> sanitized = new ArrayList<>(tableSamplers.size());
+    Set<String> seenNames = new HashSet<>();
+    for (TableSamplerConfig config : tableSamplers) {
+      if (config != null) {
+        String name = config.getName();
+        if (name != null && !seenNames.add(name)) {
+          throw new IllegalArgumentException("Duplicate table sampler name: " 
+ name);

Review Comment:
   Validate that the sampler name does not contain only whitespace. A name 
consisting of only spaces could pass the null check but would be effectively 
empty, potentially allowing duplicate "blank" samplers.
   ```suggestion
           if (name != null) {
             if (name.trim().isEmpty()) {
               throw new IllegalArgumentException("Table sampler name cannot be 
blank");
             }
             if (!seenNames.add(name)) {
               throw new IllegalArgumentException("Duplicate table sampler 
name: " + name);
             }
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactory.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.tablesampler;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.annotations.tablesampler.TableSamplerProvider;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TableSamplerFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableSamplerFactory.class);
+  private static final String ANNOTATION_PACKAGES_KEY = "annotation.packages";
+  // Keep this list in sync with built-in TableSampler locations; additional 
packages can be configured via broker
+  // config, but defaults should always include built-ins.
+  private static final List<String> DEFAULT_ANNOTATION_PACKAGES =
+      List.of("org.apache.pinot.broker.routing.tablesampler");
+  private static final Map<String, String> REGISTERED_TABLE_SAMPLER_CLASS_MAP 
= new ConcurrentHashMap<>();
+
+  private TableSamplerFactory() {
+  }
+
+  public static void init(PinotConfiguration tableSamplerConfig) {
+    if (tableSamplerConfig == null) {
+      return;
+    }
+    registerAnnotatedTableSamplers(tableSamplerConfig);
+  }
+
+  public static void register(String alias, String className) {
+    if (StringUtils.isBlank(alias)) {
+      LOGGER.warn("Skipping table sampler registration because alias is 
blank");
+      return;
+    }
+    if (StringUtils.isBlank(className)) {
+      LOGGER.warn("Skipping table sampler registration for alias '{}' because 
class name is blank", alias);
+      return;
+    }
+    String normalizedAlias = normalizeType(alias);
+    String trimmedClassName = className.trim();
+    String previousClassName = 
REGISTERED_TABLE_SAMPLER_CLASS_MAP.put(normalizedAlias, trimmedClassName);
+    if (previousClassName == null) {
+      LOGGER.info("Registered table sampler alias '{}' -> '{}'", alias, 
trimmedClassName);
+    } else if (!previousClassName.equals(trimmedClassName)) {
+      LOGGER.warn("Overriding table sampler alias '{}' from '{}' to '{}'", 
alias, previousClassName, trimmedClassName);
+    }
+  }
+
+  public static TableSampler create(String type) {
+    String resolvedClassName = resolveClassName(type);
+    String classNameToLoad = resolvedClassName != null ? resolvedClassName : 
type;
+    try {
+      return PluginManager.get().createInstance(classNameToLoad);
+    } catch (Exception e) {
+      String errorMessage = resolvedClassName != null
+          ? String.format("Failed to create TableSampler for alias '%s' mapped 
to class '%s'", type,
+              resolvedClassName)
+          : "Failed to create TableSampler for type: " + type;
+      throw new RuntimeException(errorMessage, e);
+    }
+  }
+
+  @VisibleForTesting
+  static void clearRegistry() {
+    REGISTERED_TABLE_SAMPLER_CLASS_MAP.clear();
+  }
+
+  private static void registerAnnotatedTableSamplers(PinotConfiguration 
tableSamplerConfig) {
+    List<String> configuredPackages = 
tableSamplerConfig.getProperty(ANNOTATION_PACKAGES_KEY, List.of());
+    LinkedHashSet<String> combinedPackages = new 
LinkedHashSet<>(DEFAULT_ANNOTATION_PACKAGES);
+    if (configuredPackages != null) {
+      for (String packageName : configuredPackages) {
+        if (StringUtils.isNotBlank(packageName)) {
+          combinedPackages.add(packageName.trim());
+        }
+      }
+    }
+    List<String> sanitizedPackages = new ArrayList<>(combinedPackages);
+    if (sanitizedPackages.isEmpty()) {
+      LOGGER.info("No table sampler annotation packages configured");
+      return;
+    }
+    Set<Class<?>> samplerClasses =
+        PinotReflectionUtils.getClassesThroughReflection(sanitizedPackages, 
".*", TableSamplerProvider.class);
+    for (Class<?> samplerClass : samplerClasses) {
+      TableSamplerProvider annotation = 
samplerClass.getAnnotation(TableSamplerProvider.class);
+      if (annotation == null || !annotation.enabled()) {
+        continue;
+      }
+      if (!TableSampler.class.isAssignableFrom(samplerClass)) {
+        LOGGER.warn("Skipping table sampler class '{}' because it does not 
implement TableSampler",
+            samplerClass.getName());
+        continue;
+      }
+      if (!Modifier.isPublic(samplerClass.getModifiers()) || 
Modifier.isAbstract(samplerClass.getModifiers())) {
+        LOGGER.warn("Skipping table sampler class '{}' because it is not a 
public concrete class",
+            samplerClass.getName());
+        continue;
+      }
+      String alias = annotation.name();
+      if (StringUtils.isBlank(alias)) {
+        LOGGER.warn("Skipping table sampler class '{}' because annotation name 
is blank", samplerClass.getName());
+        continue;
+      }
+      register(alias, samplerClass.getName());
+    }
+  }
+
+  private static String resolveClassName(String type) {
+    if (StringUtils.isBlank(type)) {
+      return null;
+    }
+    return REGISTERED_TABLE_SAMPLER_CLASS_MAP.get(normalizeType(type));
+  }
+

Review Comment:
   The normalizeType method converts sampler names to lowercase, but this 
behavior is not documented in TableSamplerConfig or TableSamplerProvider. Add 
documentation explaining that sampler name matching is case-insensitive.
   ```suggestion
   
     /**
      * Normalizes a table sampler type/alias for use in the internal registry.
      * <p>
      * Both registration and lookup go through this method, so sampler names 
are effectively
      * matched in a case-insensitive manner: the configured name is trimmed 
and converted to
      * lower case using {@link Locale#ROOT} before being used as a key.
      */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to