Copilot commented on code in PR #17532:
URL: https://github.com/apache/pinot/pull/17532#discussion_r2773559266


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TableSamplerIntegrationTest.java:
##########
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class TableSamplerIntegrationTest extends 
CustomDataQueryClusterIntegrationTest {
+  private static final int DAYS = 7;
+  private static final int SEGMENTS_PER_DAY = 10;
+  private static final int RECORDS_PER_SEGMENT = 1;
+  private static final int BASE_DAY = 20000;
+
+  private static final String DAYS_SINCE_EPOCH_COL = "DaysSinceEpoch";
+
+  @Override
+  public String getTableName() {
+    return "TableSamplerIntegrationTest";
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return (long) DAYS * SEGMENTS_PER_DAY * RECORDS_PER_SEGMENT;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(getTableName())
+        .addDateTime(DAYS_SINCE_EPOCH_COL, FieldSpec.DataType.INT, 
"1:DAYS:EPOCH", "1:DAYS")
+        .build();
+  }
+
+  @Override
+  public TableConfig createOfflineTableConfig() {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName())
+        .setTimeColumnName(DAYS_SINCE_EPOCH_COL)
+        .setTimeType("DAYS")
+        .build();
+    tableConfig.setTableSamplers(List.of(
+        new TableSamplerConfig("firstOnly", "firstN", Map.of("numSegments", 
"1")),
+        new TableSamplerConfig("firstTwo", "firstN", Map.of("numSegments", 
"2"))));
+    return tableConfig;
+  }
+
+  @Override
+  public List<File> createAvroFiles()
+      throws Exception {
+    var fieldAssembler = SchemaBuilder.record("myRecord").fields();
+    fieldAssembler.name(DAYS_SINCE_EPOCH_COL).type().intType().noDefault();
+    var avroSchema = fieldAssembler.endRecord();
+
+    List<File> files = new ArrayList<>();
+    for (int day = 0; day < DAYS; day++) {
+      int dayValue = BASE_DAY + day;
+      for (int seg = 0; seg < SEGMENTS_PER_DAY; seg++) {
+        File avroFile = new File(_tempDir, "data_day_" + day + "_seg_" + seg + 
".avro");
+        try (DataFileWriter<GenericData.Record> fileWriter =
+            new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+          fileWriter.create(avroSchema, avroFile);
+          for (int docId = 0; docId < RECORDS_PER_SEGMENT; docId++) {
+            GenericData.Record record = new GenericData.Record(avroSchema);
+            record.put(DAYS_SINCE_EPOCH_COL, dayValue);
+            fileWriter.append(record);
+          }
+        }
+        files.add(avroFile);
+      }
+    }
+    return files;
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testFirstNSamplerForGroupByDay(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    JsonNode full = postQuery("SELECT DaysSinceEpoch, COUNT(*) AS cnt FROM " + 
getTableName()
+        + " GROUP BY DaysSinceEpoch ORDER BY DaysSinceEpoch");
+    JsonNode fullRows = full.path("resultTable").path("rows");
+    Assert.assertEquals(fullRows.size(), DAYS);
+    for (int i = 0; i < DAYS; i++) {
+      Assert.assertEquals(fullRows.get(i).get(0).asInt(), BASE_DAY + i);
+      Assert.assertEquals(fullRows.get(i).get(1).asLong(), (long) 
SEGMENTS_PER_DAY * RECORDS_PER_SEGMENT);
+    }
+    
Assert.assertEquals(Integer.parseInt(full.path("numSegmentsQueried").asText()), 
DAYS * SEGMENTS_PER_DAY);

Review Comment:
   Parsing the numSegmentsQueried field as text and then converting to int is 
fragile. Use asInt() directly if the field is numeric, or handle the case where 
the field might not be present or numeric.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -1087,10 +1309,59 @@ private Map<ServerInstance, SegmentsToQuery> 
getServerInstanceToSegmentsMap(Stri
   @Override
   public List<String> getSegments(BrokerRequest brokerRequest) {
     String tableNameWithType = brokerRequest.getQuerySource().getTableName();
-    RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
+    RoutingEntry routingEntry = getRoutingEntry(brokerRequest, 
tableNameWithType);
     return routingEntry != null ? routingEntry.getSegments(brokerRequest) : 
null;
   }
 
+  @Nullable
+  private RoutingEntry getRoutingEntry(BrokerRequest brokerRequest, String 
tableNameWithType) {
+    String samplerName = extractSamplerName(brokerRequest);
+    if (StringUtils.isNotBlank(samplerName)) {
+      Map<String, RoutingEntry> samplerEntries = 
_samplerRoutingEntryMap.get(tableNameWithType);
+      RoutingEntry samplerEntry = samplerEntries != null ? 
samplerEntries.get(samplerName) : null;
+      if (samplerEntry != null) {
+        return samplerEntry;
+      }
+      if (MissingSamplerWarningTracker.shouldLogWarn(tableNameWithType, 
samplerName)) {
+        LOGGER.warn("Requested sampler '{}' not found for table '{}'; falling 
back to default routing entry",
+            samplerName, tableNameWithType);
+      } else if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Requested sampler '{}' not found for table '{}'; falling 
back to default routing entry "
+            + "(subsequent occurrences logged at DEBUG level only)", 
samplerName, tableNameWithType);

Review Comment:
   The debug log message mentions 'subsequent occurrences' but this is actually 
the first occurrence that didn't log at WARN level. The message should clarify 
that the WARN was already logged once and this is a subsequent occurrence.
   ```suggestion
               + "(WARN already logged once for this sampler/table; subsequent 
occurrences logged at DEBUG level only)",
               samplerName, tableNameWithType);
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactory.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.tablesampler;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.annotations.tablesampler.TableSamplerProvider;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TableSamplerFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableSamplerFactory.class);
+  private static final String ANNOTATION_PACKAGES_KEY = "annotation.packages";
+  // Keep this list in sync with built-in TableSampler locations; additional 
packages can be configured via broker
+  // config, but defaults should always include built-ins.
+  private static final List<String> DEFAULT_ANNOTATION_PACKAGES =
+      List.of("org.apache.pinot.broker.routing.tablesampler");
+  private static final Map<String, String> REGISTERED_TABLE_SAMPLER_CLASS_MAP 
= new ConcurrentHashMap<>();
+
+  private TableSamplerFactory() {
+  }
+
+  public static void init(PinotConfiguration tableSamplerConfig) {
+    if (tableSamplerConfig == null) {
+      return;
+    }
+    registerAnnotatedTableSamplers(tableSamplerConfig);
+  }
+
+  public static void register(String alias, String className) {
+    if (StringUtils.isBlank(alias)) {
+      LOGGER.warn("Skipping table sampler registration because alias is 
blank");
+      return;
+    }
+    if (StringUtils.isBlank(className)) {
+      LOGGER.warn("Skipping table sampler registration for alias '{}' because 
class name is blank", alias);
+      return;
+    }
+    String normalizedAlias = normalizeType(alias);
+    String trimmedClassName = className.trim();
+    String previousClassName = 
REGISTERED_TABLE_SAMPLER_CLASS_MAP.put(normalizedAlias, trimmedClassName);
+    if (previousClassName == null) {
+      LOGGER.info("Registered table sampler alias '{}' -> '{}'", alias, 
trimmedClassName);
+    } else if (!previousClassName.equals(trimmedClassName)) {
+      LOGGER.warn("Overriding table sampler alias '{}' from '{}' to '{}'", 
alias, previousClassName, trimmedClassName);
+    }
+  }
+
+  public static TableSampler create(String type) {
+    String resolvedClassName = resolveClassName(type);
+    String classNameToLoad = resolvedClassName != null ? resolvedClassName : 
type;
+    try {
+      return PluginManager.get().createInstance(classNameToLoad);
+    } catch (Exception e) {
+      String errorMessage = resolvedClassName != null
+          ? String.format("Failed to create TableSampler for alias '%s' mapped 
to class '%s'", type,
+              resolvedClassName)
+          : "Failed to create TableSampler for type: " + type;
+      throw new RuntimeException(errorMessage, e);
+    }
+  }
+
+  @VisibleForTesting
+  static void clearRegistry() {
+    REGISTERED_TABLE_SAMPLER_CLASS_MAP.clear();
+  }
+
+  private static void registerAnnotatedTableSamplers(PinotConfiguration 
tableSamplerConfig) {
+    List<String> configuredPackages = 
tableSamplerConfig.getProperty(ANNOTATION_PACKAGES_KEY, List.of());

Review Comment:
   The getProperty method is being called with a List.of() default, but 
PinotConfiguration.getProperty returns a String or collection based on the 
actual stored type. If the property is not set, this could return an empty 
list, but if it's set as a comma-separated string, it won't be properly parsed 
into a list. Use the appropriate method for retrieving list properties or 
handle string-to-list conversion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to