Copilot commented on code in PR #17247:
URL: https://github.com/apache/pinot/pull/17247#discussion_r2545724192


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java:
##########
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class DistinctQueriesTest extends CustomDataQueryClusterIntegrationTest 
{
+  private static final String TABLE_NAME = "DistinctQueriesCustomTest";
+
+  private static final String INT_COL = "intCol";
+  private static final String LONG_COL = "longCol";
+  private static final String DOUBLE_COL = "doubleCol";
+  private static final String STRING_COL = "stringCol";
+  private static final String MV_INT_COL = "intArrayCol";
+  private static final String MV_STRING_COL = "stringArrayCol";
+
+  private static final int NUM_ROWS = 24_000;
+  private static final int NUM_INT_VALUES = 5;
+  private static final int NUM_LONG_VALUES = 5;
+  private static final int NUM_DOUBLE_VALUES = 3;
+  private static final int NUM_STRING_VALUES = 4;
+  private static final int NUM_MV_INT_VALUES = 3;
+  private static final long LONG_BASE_VALUE = 1_000L;
+  private static final double DOUBLE_OFFSET = 0.25d;
+  private static final int MV_OFFSET = 50;
+
+  @Override
+  protected long getCountStarResult() {
+    return NUM_ROWS;
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(INT_COL, FieldSpec.DataType.INT)
+        .addSingleValueDimension(LONG_COL, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(DOUBLE_COL, FieldSpec.DataType.DOUBLE)
+        .addSingleValueDimension(STRING_COL, FieldSpec.DataType.STRING)
+        .addMultiValueDimension(MV_INT_COL, FieldSpec.DataType.INT)
+        .addMultiValueDimension(MV_STRING_COL, FieldSpec.DataType.STRING)
+        .build();
+  }
+
+  @Override
+  public List<File> createAvroFiles()
+      throws Exception {
+    org.apache.avro.Schema avroSchema =
+        SchemaBuilder.record("DistinctRecord").fields()
+            .requiredInt(INT_COL)
+            .requiredLong(LONG_COL)
+            .requiredDouble(DOUBLE_COL)
+            .requiredString(STRING_COL)
+            .name(MV_INT_COL).type().array().items().intType().noDefault()
+            
.name(MV_STRING_COL).type().array().items().stringType().noDefault()
+            .endRecord();
+
+    File avroFile = new File(_tempDir, "distinct-data.avro");
+    try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new 
GenericDatumWriter<>(avroSchema))) {
+      writer.create(avroSchema, avroFile);
+      for (int i = 0; i < NUM_ROWS; i++) {
+        GenericData.Record record = new GenericData.Record(avroSchema);
+        record.put(INT_COL, getIntValue(i));
+        record.put(LONG_COL, getLongValue(i));
+        record.put(DOUBLE_COL, getDoubleValue(i));
+        record.put(STRING_COL, getStringValue(i));
+        record.put(MV_INT_COL, List.of(getMultiValueBase(i), 
getMultiValueBase(i) + MV_OFFSET));
+        record.put(MV_STRING_COL, List.of(getStringValue(i), getStringValue(i 
+ MV_OFFSET)));
+        writer.append(record);
+      }
+    }
+    return List.of(avroFile);
+  }
+
+  @Override
+  protected String getSortedColumn() {
+    return null;
+  }
+
+  private int getIntValue(int recordId) {
+    return recordId % NUM_INT_VALUES;
+  }
+
+  private long getLongValue(int recordId) {
+    return LONG_BASE_VALUE + (recordId % NUM_LONG_VALUES);
+  }
+
+  private double getDoubleValue(int recordId) {
+    return (recordId % NUM_DOUBLE_VALUES) + DOUBLE_OFFSET;
+  }
+
+  private String getStringValue(int recordId) {
+    return "type_" + (recordId % NUM_STRING_VALUES);
+  }
+
+  private int getMultiValueBase(int recordId) {
+    return recordId % NUM_MV_INT_VALUES;
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctSingleValuedColumns(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    assertDistinctColumnValues(INT_COL, getExpectedIntValues(), 
JsonNode::asInt);
+    assertDistinctColumnValues(LONG_COL, getExpectedLongValues(), 
JsonNode::asLong);
+    assertDistinctColumnValues(DOUBLE_COL, getExpectedDoubleValues(), 
JsonNode::asDouble);
+    assertDistinctColumnValues(STRING_COL, getExpectedStringValues(), 
JsonNode::textValue);
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctMultiValueColumn(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query = useMultiStageQueryEngine ? String.format("SELECT %s FROM %s 
GROUP BY ARRAY_TO_MV(%s)", MV_INT_COL,
+        getTableName(), MV_INT_COL) : String.format("SELECT DISTINCT %s FROM 
%s", MV_INT_COL, getTableName());
+    JsonNode result = postQuery(query);
+    System.out.println("result = " + result);

Review Comment:
   Debug print statement should be removed or replaced with proper logging. 
Using System.out.println in production code is not recommended.



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java:
##########
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class DistinctQueriesTest extends CustomDataQueryClusterIntegrationTest 
{
+  private static final String TABLE_NAME = "DistinctQueriesCustomTest";
+
+  private static final String INT_COL = "intCol";
+  private static final String LONG_COL = "longCol";
+  private static final String DOUBLE_COL = "doubleCol";
+  private static final String STRING_COL = "stringCol";
+  private static final String MV_INT_COL = "intArrayCol";
+  private static final String MV_STRING_COL = "stringArrayCol";
+
+  private static final int NUM_ROWS = 24_000;
+  private static final int NUM_INT_VALUES = 5;
+  private static final int NUM_LONG_VALUES = 5;
+  private static final int NUM_DOUBLE_VALUES = 3;
+  private static final int NUM_STRING_VALUES = 4;
+  private static final int NUM_MV_INT_VALUES = 3;
+  private static final long LONG_BASE_VALUE = 1_000L;
+  private static final double DOUBLE_OFFSET = 0.25d;
+  private static final int MV_OFFSET = 50;
+
+  @Override
+  protected long getCountStarResult() {
+    return NUM_ROWS;
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(INT_COL, FieldSpec.DataType.INT)
+        .addSingleValueDimension(LONG_COL, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(DOUBLE_COL, FieldSpec.DataType.DOUBLE)
+        .addSingleValueDimension(STRING_COL, FieldSpec.DataType.STRING)
+        .addMultiValueDimension(MV_INT_COL, FieldSpec.DataType.INT)
+        .addMultiValueDimension(MV_STRING_COL, FieldSpec.DataType.STRING)
+        .build();
+  }
+
+  @Override
+  public List<File> createAvroFiles()
+      throws Exception {
+    org.apache.avro.Schema avroSchema =
+        SchemaBuilder.record("DistinctRecord").fields()
+            .requiredInt(INT_COL)
+            .requiredLong(LONG_COL)
+            .requiredDouble(DOUBLE_COL)
+            .requiredString(STRING_COL)
+            .name(MV_INT_COL).type().array().items().intType().noDefault()
+            
.name(MV_STRING_COL).type().array().items().stringType().noDefault()
+            .endRecord();
+
+    File avroFile = new File(_tempDir, "distinct-data.avro");
+    try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new 
GenericDatumWriter<>(avroSchema))) {
+      writer.create(avroSchema, avroFile);
+      for (int i = 0; i < NUM_ROWS; i++) {
+        GenericData.Record record = new GenericData.Record(avroSchema);
+        record.put(INT_COL, getIntValue(i));
+        record.put(LONG_COL, getLongValue(i));
+        record.put(DOUBLE_COL, getDoubleValue(i));
+        record.put(STRING_COL, getStringValue(i));
+        record.put(MV_INT_COL, List.of(getMultiValueBase(i), 
getMultiValueBase(i) + MV_OFFSET));
+        record.put(MV_STRING_COL, List.of(getStringValue(i), getStringValue(i 
+ MV_OFFSET)));
+        writer.append(record);
+      }
+    }
+    return List.of(avroFile);
+  }
+
+  @Override
+  protected String getSortedColumn() {
+    return null;
+  }
+
+  private int getIntValue(int recordId) {
+    return recordId % NUM_INT_VALUES;
+  }
+
+  private long getLongValue(int recordId) {
+    return LONG_BASE_VALUE + (recordId % NUM_LONG_VALUES);
+  }
+
+  private double getDoubleValue(int recordId) {
+    return (recordId % NUM_DOUBLE_VALUES) + DOUBLE_OFFSET;
+  }
+
+  private String getStringValue(int recordId) {
+    return "type_" + (recordId % NUM_STRING_VALUES);
+  }
+
+  private int getMultiValueBase(int recordId) {
+    return recordId % NUM_MV_INT_VALUES;
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctSingleValuedColumns(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    assertDistinctColumnValues(INT_COL, getExpectedIntValues(), 
JsonNode::asInt);
+    assertDistinctColumnValues(LONG_COL, getExpectedLongValues(), 
JsonNode::asLong);
+    assertDistinctColumnValues(DOUBLE_COL, getExpectedDoubleValues(), 
JsonNode::asDouble);
+    assertDistinctColumnValues(STRING_COL, getExpectedStringValues(), 
JsonNode::textValue);
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctMultiValueColumn(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query = useMultiStageQueryEngine ? String.format("SELECT %s FROM %s 
GROUP BY ARRAY_TO_MV(%s)", MV_INT_COL,
+        getTableName(), MV_INT_COL) : String.format("SELECT DISTINCT %s FROM 
%s", MV_INT_COL, getTableName());
+    JsonNode result = postQuery(query);
+    System.out.println("result = " + result);
+    JsonNode rows = result.get("resultTable").get("rows");
+    Set<Integer> actual = new HashSet<>();
+    for (JsonNode row : rows) {
+      actual.add(row.get(0).asInt());
+    }
+    assertEquals(actual, getExpectedMultiValueEntries());
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctMultipleColumns(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format("SELECT DISTINCT %s, %s FROM %s ORDER BY %s, %s", 
INT_COL, STRING_COL, getTableName(), INT_COL,
+            STRING_COL);
+    JsonNode rows = postQuery(query).get("resultTable").get("rows");
+    List<String> actual = new ArrayList<>(rows.size());
+    for (JsonNode row : rows) {
+      actual.add(row.get(0).asInt() + "|" + row.get(1).textValue());
+    }
+    assertEquals(actual, getExpectedIntStringPairs());
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testMaxRowsInDistinctEarlyTermination(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String sql = String.format("SET \"%s\" = 3; SELECT DISTINCT %s FROM %s 
WHERE rand() < 1.0 LIMIT 100",
+        QueryOptionKey.MAX_ROWS_IN_DISTINCT, STRING_COL, getTableName());
+    JsonNode response = postQuery(sql);
+    System.out.println("response = " + response);
+    assertTrue(response.get("maxRowsInDistinctReached").asBoolean(), "expected 
maxRowsInDistinctReached flag");
+    assertTrue(response.get("partialResult").asBoolean(), "partialResult 
should be true");
+    assertTrue(response.get("resultTable").get("rows").size() <= 3, "row count 
should honor threshold");
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testNoChangeEarlyTermination(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String sql = String.format("SET \"%s\" = 5000; SELECT DISTINCT %s FROM %s 
WHERE rand() < 1.0 LIMIT 100",
+        QueryOptionKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT, INT_COL, 
getTableName());
+    JsonNode response = postQuery(sql);
+    System.out.println("response = " + response);

Review Comment:
   Debug print statement should be removed or replaced with proper logging. 
Using System.out.println in production code is not recommended.



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java:
##########
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class DistinctQueriesTest extends CustomDataQueryClusterIntegrationTest 
{
+  private static final String TABLE_NAME = "DistinctQueriesCustomTest";
+
+  private static final String INT_COL = "intCol";
+  private static final String LONG_COL = "longCol";
+  private static final String DOUBLE_COL = "doubleCol";
+  private static final String STRING_COL = "stringCol";
+  private static final String MV_INT_COL = "intArrayCol";
+  private static final String MV_STRING_COL = "stringArrayCol";
+
+  private static final int NUM_ROWS = 24_000;
+  private static final int NUM_INT_VALUES = 5;
+  private static final int NUM_LONG_VALUES = 5;
+  private static final int NUM_DOUBLE_VALUES = 3;
+  private static final int NUM_STRING_VALUES = 4;
+  private static final int NUM_MV_INT_VALUES = 3;
+  private static final long LONG_BASE_VALUE = 1_000L;
+  private static final double DOUBLE_OFFSET = 0.25d;
+  private static final int MV_OFFSET = 50;
+
+  @Override
+  protected long getCountStarResult() {
+    return NUM_ROWS;
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(INT_COL, FieldSpec.DataType.INT)
+        .addSingleValueDimension(LONG_COL, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(DOUBLE_COL, FieldSpec.DataType.DOUBLE)
+        .addSingleValueDimension(STRING_COL, FieldSpec.DataType.STRING)
+        .addMultiValueDimension(MV_INT_COL, FieldSpec.DataType.INT)
+        .addMultiValueDimension(MV_STRING_COL, FieldSpec.DataType.STRING)
+        .build();
+  }
+
+  @Override
+  public List<File> createAvroFiles()
+      throws Exception {
+    org.apache.avro.Schema avroSchema =
+        SchemaBuilder.record("DistinctRecord").fields()
+            .requiredInt(INT_COL)
+            .requiredLong(LONG_COL)
+            .requiredDouble(DOUBLE_COL)
+            .requiredString(STRING_COL)
+            .name(MV_INT_COL).type().array().items().intType().noDefault()
+            
.name(MV_STRING_COL).type().array().items().stringType().noDefault()
+            .endRecord();
+
+    File avroFile = new File(_tempDir, "distinct-data.avro");
+    try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new 
GenericDatumWriter<>(avroSchema))) {
+      writer.create(avroSchema, avroFile);
+      for (int i = 0; i < NUM_ROWS; i++) {
+        GenericData.Record record = new GenericData.Record(avroSchema);
+        record.put(INT_COL, getIntValue(i));
+        record.put(LONG_COL, getLongValue(i));
+        record.put(DOUBLE_COL, getDoubleValue(i));
+        record.put(STRING_COL, getStringValue(i));
+        record.put(MV_INT_COL, List.of(getMultiValueBase(i), 
getMultiValueBase(i) + MV_OFFSET));
+        record.put(MV_STRING_COL, List.of(getStringValue(i), getStringValue(i 
+ MV_OFFSET)));
+        writer.append(record);
+      }
+    }
+    return List.of(avroFile);
+  }
+
+  @Override
+  protected String getSortedColumn() {
+    return null;
+  }
+
+  private int getIntValue(int recordId) {
+    return recordId % NUM_INT_VALUES;
+  }
+
+  private long getLongValue(int recordId) {
+    return LONG_BASE_VALUE + (recordId % NUM_LONG_VALUES);
+  }
+
+  private double getDoubleValue(int recordId) {
+    return (recordId % NUM_DOUBLE_VALUES) + DOUBLE_OFFSET;
+  }
+
+  private String getStringValue(int recordId) {
+    return "type_" + (recordId % NUM_STRING_VALUES);
+  }
+
+  private int getMultiValueBase(int recordId) {
+    return recordId % NUM_MV_INT_VALUES;
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctSingleValuedColumns(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    assertDistinctColumnValues(INT_COL, getExpectedIntValues(), 
JsonNode::asInt);
+    assertDistinctColumnValues(LONG_COL, getExpectedLongValues(), 
JsonNode::asLong);
+    assertDistinctColumnValues(DOUBLE_COL, getExpectedDoubleValues(), 
JsonNode::asDouble);
+    assertDistinctColumnValues(STRING_COL, getExpectedStringValues(), 
JsonNode::textValue);
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctMultiValueColumn(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query = useMultiStageQueryEngine ? String.format("SELECT %s FROM %s 
GROUP BY ARRAY_TO_MV(%s)", MV_INT_COL,
+        getTableName(), MV_INT_COL) : String.format("SELECT DISTINCT %s FROM 
%s", MV_INT_COL, getTableName());
+    JsonNode result = postQuery(query);
+    System.out.println("result = " + result);
+    JsonNode rows = result.get("resultTable").get("rows");
+    Set<Integer> actual = new HashSet<>();
+    for (JsonNode row : rows) {
+      actual.add(row.get(0).asInt());
+    }
+    assertEquals(actual, getExpectedMultiValueEntries());
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctMultipleColumns(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format("SELECT DISTINCT %s, %s FROM %s ORDER BY %s, %s", 
INT_COL, STRING_COL, getTableName(), INT_COL,
+            STRING_COL);
+    JsonNode rows = postQuery(query).get("resultTable").get("rows");
+    List<String> actual = new ArrayList<>(rows.size());
+    for (JsonNode row : rows) {
+      actual.add(row.get(0).asInt() + "|" + row.get(1).textValue());
+    }
+    assertEquals(actual, getExpectedIntStringPairs());
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testMaxRowsInDistinctEarlyTermination(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String sql = String.format("SET \"%s\" = 3; SELECT DISTINCT %s FROM %s 
WHERE rand() < 1.0 LIMIT 100",
+        QueryOptionKey.MAX_ROWS_IN_DISTINCT, STRING_COL, getTableName());
+    JsonNode response = postQuery(sql);
+    System.out.println("response = " + response);
+    assertTrue(response.get("maxRowsInDistinctReached").asBoolean(), "expected 
maxRowsInDistinctReached flag");
+    assertTrue(response.get("partialResult").asBoolean(), "partialResult 
should be true");
+    assertTrue(response.get("resultTable").get("rows").size() <= 3, "row count 
should honor threshold");
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testNoChangeEarlyTermination(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String sql = String.format("SET \"%s\" = 5000; SELECT DISTINCT %s FROM %s 
WHERE rand() < 1.0 LIMIT 100",
+        QueryOptionKey.NUM_ROWS_WITHOUT_CHANGE_IN_DISTINCT, INT_COL, 
getTableName());
+    JsonNode response = postQuery(sql);
+    System.out.println("response = " + response);
+    
assertTrue(response.get("numRowsWithoutChangeInDistinctReached").asBoolean(),
+        "expected no-change flag to be set");
+    assertTrue(response.get("partialResult").asBoolean(), "partialResult 
should be true");
+  }
+
+  private <T> void assertDistinctColumnValues(String column, List<T> expected,
+      java.util.function.Function<JsonNode, T> parser)
+      throws Exception {
+    String query = String.format("SELECT DISTINCT %s FROM %s ORDER BY %s", 
column, getTableName(), column);
+    JsonNode rows = postQuery(query).get("resultTable").get("rows");
+    List<T> actual = new ArrayList<>(rows.size());
+    for (JsonNode row : rows) {
+      actual.add(parser.apply(row.get(0)));
+    }
+    assertEquals(actual, expected);
+  }
+
+  private List<Integer> getExpectedIntValues() {
+    return IntStream.range(0, 
NUM_INT_VALUES).boxed().collect(Collectors.toList());
+  }
+
+  private List<Long> getExpectedLongValues() {
+    return IntStream.range(0, NUM_LONG_VALUES).mapToObj(i -> LONG_BASE_VALUE + 
i).collect(Collectors.toList());
+  }
+
+  private List<Double> getExpectedDoubleValues() {
+    return IntStream.range(0, NUM_DOUBLE_VALUES).mapToObj(i -> i + 
DOUBLE_OFFSET).collect(Collectors.toList());
+  }
+
+  private List<String> getExpectedStringValues() {
+    return IntStream.range(0, NUM_STRING_VALUES).mapToObj(i -> "type_" + 
i).collect(Collectors.toList());
+  }
+
+  private Set<Integer> getExpectedMultiValueEntries() {
+    Set<Integer> entries = new HashSet<>();
+    for (int i = 0; i < NUM_MV_INT_VALUES; i++) {

Review Comment:
   The method only adds values with MV_OFFSET but based on line 107 in 
createAvroFiles(), the multi-value array includes both the base value and base 
+ MV_OFFSET. This method should also add the base values (0 to 
NUM_MV_INT_VALUES-1) to match the actual data.
   ```suggestion
       for (int i = 0; i < NUM_MV_INT_VALUES; i++) {
         entries.add(i);
   ```



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java:
##########
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class DistinctQueriesTest extends CustomDataQueryClusterIntegrationTest 
{
+  private static final String TABLE_NAME = "DistinctQueriesCustomTest";
+
+  private static final String INT_COL = "intCol";
+  private static final String LONG_COL = "longCol";
+  private static final String DOUBLE_COL = "doubleCol";
+  private static final String STRING_COL = "stringCol";
+  private static final String MV_INT_COL = "intArrayCol";
+  private static final String MV_STRING_COL = "stringArrayCol";
+
+  private static final int NUM_ROWS = 24_000;
+  private static final int NUM_INT_VALUES = 5;
+  private static final int NUM_LONG_VALUES = 5;
+  private static final int NUM_DOUBLE_VALUES = 3;
+  private static final int NUM_STRING_VALUES = 4;
+  private static final int NUM_MV_INT_VALUES = 3;
+  private static final long LONG_BASE_VALUE = 1_000L;
+  private static final double DOUBLE_OFFSET = 0.25d;
+  private static final int MV_OFFSET = 50;
+
+  @Override
+  protected long getCountStarResult() {
+    return NUM_ROWS;
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(INT_COL, FieldSpec.DataType.INT)
+        .addSingleValueDimension(LONG_COL, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(DOUBLE_COL, FieldSpec.DataType.DOUBLE)
+        .addSingleValueDimension(STRING_COL, FieldSpec.DataType.STRING)
+        .addMultiValueDimension(MV_INT_COL, FieldSpec.DataType.INT)
+        .addMultiValueDimension(MV_STRING_COL, FieldSpec.DataType.STRING)
+        .build();
+  }
+
+  @Override
+  public List<File> createAvroFiles()
+      throws Exception {
+    org.apache.avro.Schema avroSchema =
+        SchemaBuilder.record("DistinctRecord").fields()
+            .requiredInt(INT_COL)
+            .requiredLong(LONG_COL)
+            .requiredDouble(DOUBLE_COL)
+            .requiredString(STRING_COL)
+            .name(MV_INT_COL).type().array().items().intType().noDefault()
+            
.name(MV_STRING_COL).type().array().items().stringType().noDefault()
+            .endRecord();
+
+    File avroFile = new File(_tempDir, "distinct-data.avro");
+    try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new 
GenericDatumWriter<>(avroSchema))) {
+      writer.create(avroSchema, avroFile);
+      for (int i = 0; i < NUM_ROWS; i++) {
+        GenericData.Record record = new GenericData.Record(avroSchema);
+        record.put(INT_COL, getIntValue(i));
+        record.put(LONG_COL, getLongValue(i));
+        record.put(DOUBLE_COL, getDoubleValue(i));
+        record.put(STRING_COL, getStringValue(i));
+        record.put(MV_INT_COL, List.of(getMultiValueBase(i), 
getMultiValueBase(i) + MV_OFFSET));
+        record.put(MV_STRING_COL, List.of(getStringValue(i), getStringValue(i 
+ MV_OFFSET)));
+        writer.append(record);
+      }
+    }
+    return List.of(avroFile);
+  }
+
+  @Override
+  protected String getSortedColumn() {
+    return null;
+  }
+
+  private int getIntValue(int recordId) {
+    return recordId % NUM_INT_VALUES;
+  }
+
+  private long getLongValue(int recordId) {
+    return LONG_BASE_VALUE + (recordId % NUM_LONG_VALUES);
+  }
+
+  private double getDoubleValue(int recordId) {
+    return (recordId % NUM_DOUBLE_VALUES) + DOUBLE_OFFSET;
+  }
+
+  private String getStringValue(int recordId) {
+    return "type_" + (recordId % NUM_STRING_VALUES);
+  }
+
+  private int getMultiValueBase(int recordId) {
+    return recordId % NUM_MV_INT_VALUES;
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctSingleValuedColumns(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    assertDistinctColumnValues(INT_COL, getExpectedIntValues(), 
JsonNode::asInt);
+    assertDistinctColumnValues(LONG_COL, getExpectedLongValues(), 
JsonNode::asLong);
+    assertDistinctColumnValues(DOUBLE_COL, getExpectedDoubleValues(), 
JsonNode::asDouble);
+    assertDistinctColumnValues(STRING_COL, getExpectedStringValues(), 
JsonNode::textValue);
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctMultiValueColumn(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query = useMultiStageQueryEngine ? String.format("SELECT %s FROM %s 
GROUP BY ARRAY_TO_MV(%s)", MV_INT_COL,
+        getTableName(), MV_INT_COL) : String.format("SELECT DISTINCT %s FROM 
%s", MV_INT_COL, getTableName());
+    JsonNode result = postQuery(query);
+    System.out.println("result = " + result);
+    JsonNode rows = result.get("resultTable").get("rows");
+    Set<Integer> actual = new HashSet<>();
+    for (JsonNode row : rows) {
+      actual.add(row.get(0).asInt());
+    }
+    assertEquals(actual, getExpectedMultiValueEntries());
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctMultipleColumns(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format("SELECT DISTINCT %s, %s FROM %s ORDER BY %s, %s", 
INT_COL, STRING_COL, getTableName(), INT_COL,
+            STRING_COL);
+    JsonNode rows = postQuery(query).get("resultTable").get("rows");
+    List<String> actual = new ArrayList<>(rows.size());
+    for (JsonNode row : rows) {
+      actual.add(row.get(0).asInt() + "|" + row.get(1).textValue());
+    }
+    assertEquals(actual, getExpectedIntStringPairs());
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testMaxRowsInDistinctEarlyTermination(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String sql = String.format("SET \"%s\" = 3; SELECT DISTINCT %s FROM %s 
WHERE rand() < 1.0 LIMIT 100",
+        QueryOptionKey.MAX_ROWS_IN_DISTINCT, STRING_COL, getTableName());
+    JsonNode response = postQuery(sql);
+    System.out.println("response = " + response);

Review Comment:
   Debug print statement should be removed or replaced with proper logging. 
Using System.out.println in production code is not recommended.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to