Copilot commented on code in PR #17532:
URL: https://github.com/apache/pinot/pull/17532#discussion_r2768450026


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TableSamplerIntegrationTest.java:
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class TableSamplerIntegrationTest extends 
CustomDataQueryClusterIntegrationTest {
+  private static final int DAYS = 7;
+  private static final int SEGMENTS_PER_DAY = 10;
+  private static final int RECORDS_PER_SEGMENT = 1;
+  private static final int BASE_DAY = 20000;
+
+  private static final String DAYS_SINCE_EPOCH_COL = "DaysSinceEpoch";
+
+  @Override
+  public String getTableName() {
+    return "TableSamplerIntegrationTest";
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return (long) DAYS * SEGMENTS_PER_DAY * RECORDS_PER_SEGMENT;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(getTableName())
+        .addDateTime(DAYS_SINCE_EPOCH_COL, FieldSpec.DataType.INT, 
"1:DAYS:EPOCH", "1:DAYS")
+        .build();
+  }
+
+  @Override
+  public TableConfig createOfflineTableConfig() {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName())
+        .setTimeColumnName(DAYS_SINCE_EPOCH_COL)
+        .setTimeType("DAYS")
+        .build();
+    tableConfig.setTableSamplers(List.of(
+        new TableSamplerConfig("firstOnly", "firstN", Map.of("numSegments", 
"1"))));
+    return tableConfig;
+  }
+
+  @Override
+  public List<File> createAvroFiles()
+      throws Exception {
+    org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+    avroSchema.setFields(List.of(
+        new org.apache.avro.Schema.Field(DAYS_SINCE_EPOCH_COL,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), 
null, null)));
+
+    List<File> files = new ArrayList<>();
+    for (int day = 0; day < DAYS; day++) {
+      int dayValue = BASE_DAY + day;
+      for (int seg = 0; seg < SEGMENTS_PER_DAY; seg++) {
+        File avroFile = new File(_tempDir, "data_day_" + day + "_seg_" + seg + 
".avro");
+        try (DataFileWriter<GenericData.Record> fileWriter =
+            new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+          fileWriter.create(avroSchema, avroFile);
+          for (int docId = 0; docId < RECORDS_PER_SEGMENT; docId++) {
+            GenericData.Record record = new GenericData.Record(avroSchema);
+            record.put(DAYS_SINCE_EPOCH_COL, dayValue);
+            fileWriter.append(record);
+          }
+        }
+        files.add(avroFile);
+      }
+    }
+    return files;
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testFirstNSamplerForGroupByDay(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    JsonNode full = postQuery("SELECT DaysSinceEpoch, COUNT(*) AS cnt FROM " + 
getTableName()
+        + " GROUP BY DaysSinceEpoch ORDER BY DaysSinceEpoch");
+    JsonNode fullRows = full.path("resultTable").path("rows");
+    Assert.assertEquals(fullRows.size(), DAYS);
+    for (int i = 0; i < DAYS; i++) {
+      Assert.assertEquals(fullRows.get(i).get(0).asInt(), BASE_DAY + i);
+      Assert.assertEquals(fullRows.get(i).get(1).asLong(), (long) 
SEGMENTS_PER_DAY * RECORDS_PER_SEGMENT);
+    }
+    
Assert.assertEquals(Integer.parseInt(full.path("numSegmentsQueried").asText()), 
DAYS * SEGMENTS_PER_DAY);
+
+    JsonNode sampled = postQueryWithOptions("SELECT DaysSinceEpoch, COUNT(*) 
AS cnt FROM " + getTableName()
+            + " GROUP BY DaysSinceEpoch ORDER BY DaysSinceEpoch",
+        "tableSampler=firstOnly");

Review Comment:
   The test only validates the `firstN` sampler with `numSegments=1`. Consider 
adding test coverage for other scenarios such as numSegments > 1, and 
potentially for the `timeBucket` sampler mentioned in the PR description.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -1087,10 +1277,37 @@ private Map<ServerInstance, SegmentsToQuery> 
getServerInstanceToSegmentsMap(Stri
   @Override
   public List<String> getSegments(BrokerRequest brokerRequest) {
     String tableNameWithType = brokerRequest.getQuerySource().getTableName();
-    RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
+    RoutingEntry routingEntry = getRoutingEntry(brokerRequest, 
tableNameWithType);
     return routingEntry != null ? routingEntry.getSegments(brokerRequest) : 
null;
   }
 
+  @Nullable
+  private RoutingEntry getRoutingEntry(BrokerRequest brokerRequest, String 
tableNameWithType) {
+    String samplerName = extractSamplerName(brokerRequest);
+    if (StringUtils.isNotBlank(samplerName)) {
+      Map<String, RoutingEntry> samplerEntries = 
_samplerRoutingEntryMap.get(tableNameWithType);
+      RoutingEntry samplerEntry = samplerEntries != null ? 
samplerEntries.get(samplerName) : null;
+      if (samplerEntry != null) {
+        return samplerEntry;
+      }
+      LOGGER.warn("Requested sampler '{}' not found for table '{}'; falling 
back to default routing entry",
+          samplerName, tableNameWithType);
+    }
+    return _routingEntryMap.get(tableNameWithType);
+  }
+

Review Comment:
   This warning is logged on every query when a sampler is not found, which 
could generate excessive logs if a misconfigured sampler name is used 
repeatedly. Consider rate-limiting this warning or logging it at debug level 
after the first occurrence.
   ```suggestion
         if (MissingSamplerWarningTracker.shouldLogWarn(tableNameWithType, 
samplerName)) {
           LOGGER.warn("Requested sampler '{}' not found for table '{}'; 
falling back to default routing entry",
               samplerName, tableNameWithType);
         } else if (LOGGER.isDebugEnabled()) {
           LOGGER.debug(
               "Requested sampler '{}' not found for table '{}'; falling back 
to default routing entry "
                   + "(subsequent occurrences logged at DEBUG level only)",
               samplerName, tableNameWithType);
         }
       }
       return _routingEntryMap.get(tableNameWithType);
     }
   
     /**
      * Tracks sampler/table combinations that have already emitted a warning, 
to avoid excessive logs
      * when a misconfigured sampler name is used repeatedly.
      */
     private static final class MissingSamplerWarningTracker {
   
       private static final Map<String, Set<String>> TABLE_TO_SAMPLERS = new 
ConcurrentHashMap<>();
   
       private MissingSamplerWarningTracker() {
       }
   
       static boolean shouldLogWarn(String tableNameWithType, String 
samplerName) {
         Set<String> samplers =
             TABLE_TO_SAMPLERS.computeIfAbsent(tableNameWithType, k -> 
ConcurrentHashMap.newKeySet());
         // Returns true only the first time we see this sampler for the given 
table.
         return samplers.add(samplerName);
       }
     }
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java:
##########
@@ -254,6 +265,38 @@ public void setCustomConfig(TableCustomConfig 
customConfig) {
     _customConfig = customConfig;
   }
 
+  @JsonProperty(TABLE_SAMPLERS_KEY)
+  @Nullable
+  public List<TableSamplerConfig> getTableSamplers() {
+    return _tableSamplers;
+  }
+
+  public void setTableSamplers(@Nullable List<TableSamplerConfig> 
tableSamplers) {
+    _tableSamplers = sanitizeTableSamplers(tableSamplers);
+  }
+
+  @Nullable
+  private static List<TableSamplerConfig> sanitizeTableSamplers(@Nullable 
List<TableSamplerConfig> tableSamplers) {

Review Comment:
   The method name `sanitizeTableSamplers` doesn't clearly indicate that it 
also validates for duplicates. Consider renaming to 
`validateAndSanitizeTableSamplers` or `sanitizeAndValidateTableSamplers` to 
better reflect its dual purpose.
   ```suggestion
       _tableSamplers = validateAndSanitizeTableSamplers(tableSamplers);
     }
   
     @Nullable
     private static List<TableSamplerConfig> 
validateAndSanitizeTableSamplers(@Nullable List<TableSamplerConfig> 
tableSamplers) {
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -820,6 +940,69 @@ private void buildRoutingInternal(String 
tableNameWithType) {
         LOGGER.info("Rebuilt routing for table: {}", tableNameWithType);
       }
 
+      // Build routing entries for configured table samplers (if any).
+      // These entries use the same underlying routing components, but operate 
on a deterministically sampled set of
+      // segments selected during routing build.
+      List<TableSamplerConfig> tableSamplerConfigs = 
tableConfig.getTableSamplers();
+      if (tableSamplerConfigs != null && !tableSamplerConfigs.isEmpty()) {
+        Map<String, RoutingEntry> samplerRoutingEntries = new HashMap<>();
+        for (TableSamplerConfig samplerConfig : tableSamplerConfigs) {
+          String samplerName = samplerConfig.getName();
+          String samplerType = samplerConfig.getType();
+          if (StringUtils.isBlank(samplerName) || 
StringUtils.isBlank(samplerType)) {
+            LOGGER.warn("Skipping invalid table sampler config for table: {}, 
samplerName: {}, samplerType: {}",
+                tableNameWithType, samplerName, samplerType);
+            continue;
+          }
+          try {
+            TableSampler sampler = TableSamplerFactory.create(samplerType);
+            sampler.init(tableNameWithType, tableConfig, samplerConfig, 
_propertyStore);
+
+            SegmentPreSelector samplerSegmentPreSelector =
+                new TableSamplerSegmentPreSelector(segmentPreSelector, 
sampler);
+            Set<String> samplerPreSelectedOnlineSegments =
+                // SegmentPreSelector contract allows mutation of the input 
set. Avoid passing the shared
+                // onlineSegments.

Review Comment:
   The comment describes the contract about mutation but doesn't explain why 
we're creating a copy. Consider expanding the comment to clarify that we're 
creating a defensive copy to protect the shared `onlineSegments` set from being 
mutated by the sampler, as this set is reused for other samplers.
   ```suggestion
                   // SegmentPreSelector contract allows mutation of the input 
set. Create a defensive copy here so
                   // the sampler cannot mutate the shared onlineSegments set, 
which is reused by other samplers.
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactory.java:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.tablesampler;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.lang.reflect.Modifier;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.annotations.tablesampler.TableSamplerProvider;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TableSamplerFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableSamplerFactory.class);
+  private static final String ANNOTATION_PACKAGES_KEY = "annotation.packages";
+  private static final List<String> DEFAULT_ANNOTATION_PACKAGES =
+      List.of("org.apache.pinot.broker.routing.tablesampler");
+  private static final Map<String, String> REGISTERED_TABLE_SAMPLER_CLASS_MAP 
= new ConcurrentHashMap<>();
+
+  private TableSamplerFactory() {
+  }
+
+  public static void init(PinotConfiguration tableSamplerConfig) {
+    if (tableSamplerConfig == null) {
+      return;
+    }
+    registerAnnotatedTableSamplers(tableSamplerConfig);
+  }
+
+  public static void register(String alias, String className) {
+    if (StringUtils.isBlank(alias)) {
+      LOGGER.warn("Skipping table sampler registration because alias is 
blank");
+      return;
+    }
+    if (StringUtils.isBlank(className)) {
+      LOGGER.warn("Skipping table sampler registration for alias '{}' because 
class name is blank", alias);
+      return;
+    }
+    String normalizedAlias = normalizeType(alias);
+    String trimmedClassName = className.trim();
+    String previousClassName = 
REGISTERED_TABLE_SAMPLER_CLASS_MAP.put(normalizedAlias, trimmedClassName);
+    if (previousClassName == null) {
+      LOGGER.info("Registered table sampler alias '{}' -> '{}'", alias, 
trimmedClassName);
+    } else if (!previousClassName.equals(trimmedClassName)) {
+      LOGGER.warn("Overriding table sampler alias '{}' from '{}' to '{}'", 
alias, previousClassName, trimmedClassName);
+    }
+  }
+
+  public static TableSampler create(String type) {
+    String resolvedClassName = resolveClassName(type);
+    String classNameToLoad = resolvedClassName != null ? resolvedClassName : 
type;
+    try {
+      return PluginManager.get().createInstance(classNameToLoad);
+    } catch (Exception e) {
+      String errorMessage = resolvedClassName != null
+          ? String.format("Failed to create TableSampler for alias '%s' mapped 
to class '%s'", type,
+              resolvedClassName)
+          : "Failed to create TableSampler for type: " + type;
+      throw new RuntimeException(errorMessage, e);
+    }
+  }
+
+  @VisibleForTesting
+  static void clearRegistry() {
+    REGISTERED_TABLE_SAMPLER_CLASS_MAP.clear();
+  }
+
+  private static void registerAnnotatedTableSamplers(PinotConfiguration 
tableSamplerConfig) {
+    List<String> configuredPackages = 
tableSamplerConfig.getProperty(ANNOTATION_PACKAGES_KEY, List.of());
+    LinkedHashSet<String> combinedPackages = new 
LinkedHashSet<>(DEFAULT_ANNOTATION_PACKAGES);
+    if (configuredPackages != null) {
+      for (String packageName : configuredPackages) {
+        if (StringUtils.isNotBlank(packageName)) {
+          combinedPackages.add(packageName.trim());
+        }
+      }
+    }
+    List<String> sanitizedPackages = 
combinedPackages.stream().collect(Collectors.toList());

Review Comment:
   The `combinedPackages` is already a LinkedHashSet which maintains insertion 
order. Converting to a List via stream is unnecessary. Consider using `new 
ArrayList<>(combinedPackages)` for a more direct conversion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to