This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 715df15ffa Add query tests for changeCompression and enableDictionary 
changes (#9768)
715df15ffa is described below

commit 715df15ffaa734ec3973942f4262d964f01d33b7
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Tue Nov 8 23:54:08 2022 -0800

    Add query tests for changeCompression and enableDictionary changes (#9768)
---
 .../ForwardIndexHandlerReloadQueriesTest.java      | 693 +++++++++++++++++++++
 .../segment/index/loader/ForwardIndexHandler.java  |  12 +-
 .../index/loader/ForwardIndexHandlerTest.java      |  48 +-
 .../index/loader/SegmentPreProcessorTest.java      |  88 ++-
 4 files changed, 796 insertions(+), 45 deletions(-)

diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
new file mode 100644
index 0000000000..00408f9a46
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
@@ -0,0 +1,693 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * The <code>ForwardIndexHandlerReloadQueriesTest</code> class sets up the 
index segment for the
+ * no forward index multi-value queries test with reload.
+ * <p>There are totally 14 columns, 100000 records inside the original Avro 
file where 10 columns are selected to build
+ * the index segment. Selected columns information are as following:
+ * <ul>
+ *   ColumnName, FieldType, DataType, Cardinality, IsSorted, HasInvertedIndex, 
IsMultiValue, HasDictionary, RangeIndex
+ *   <li>column1, METRIC, INT, 51594, F, F, F, F, F</li>
+ *   <li>column2, METRIC, INT, 42242, F, F, F, F, F</li>
+ *   <li>column3, DIMENSION, STRING, 5, F, F, F, F, F</li>
+ *   <li>column5, DIMENSION, STRING, 9, F, F, F, F, F</li>
+ *   <li>column6, DIMENSION, INT, 18499, F, F, T, F, F</li>
+ *   <li>column7, DIMENSION, INT, 359, F, F, T, F, F</li>
+ *   <li>column8, DIMENSION, INT, 850, F, T, F, T, F</li>
+ *   <li>column9, METRIC, INT, 146, F, T, F, T, F</li>
+ *   <li>column10, METRIC, INT, 3960, F, F, F, F, T</li>
+ *   <li>daysSinceEpoch, TIME, INT, 1, T, F, F, T, F</li>
+ * </ul>
+ */
+public class ForwardIndexHandlerReloadQueriesTest extends BaseQueriesTest {
+  private static final String AVRO_DATA = "data" + File.separator + 
"test_data-mv.avro";
+  private static final String SEGMENT_NAME_1 = 
"testTable_1756015690_1756015690";
+  private static final String SEGMENT_NAME_2 = 
"testTable_1756015691_1756015691";
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"ForwardIndexHandlerReloadQueriesTest");
+
+  // Build the segment schema.
+  private static final Schema SCHEMA =
+      new 
Schema.SchemaBuilder().setSchemaName("testTable").addMetric("column1", 
FieldSpec.DataType.INT)
+          .addMetric("column2", 
FieldSpec.DataType.INT).addSingleValueDimension("column3", 
FieldSpec.DataType.STRING)
+          .addSingleValueDimension("column5", FieldSpec.DataType.STRING)
+          .addMultiValueDimension("column6", FieldSpec.DataType.INT)
+          .addMultiValueDimension("column7", FieldSpec.DataType.INT)
+          .addSingleValueDimension("column8", 
FieldSpec.DataType.INT).addMetric("column9", FieldSpec.DataType.INT)
+          .addMetric("column10", FieldSpec.DataType.INT)
+          .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, 
TimeUnit.DAYS, "daysSinceEpoch"), null).build();
+
+  private static final String SELECT_STAR_QUERY = "SELECT * FROM testTable";
+  // Hard-coded query filter.
+  protected static final String FILTER =
+      " WHERE column1 > 100000000" + " AND column2 BETWEEN 20000000 AND 
1000000000" + " AND column3 <> 'w'"
+          + " AND (column6 < 500000 OR column7 NOT IN (225, 407))" + " AND 
daysSinceEpoch = 1756015683";
+
+  private IndexSegment _indexSegment;
+  // Contains 2 identical index segments.
+  private List<IndexSegment> _indexSegments;
+
+  private TableConfig _tableConfig;
+  private List<String> _invertedIndexColumns;
+  private List<String> _noDictionaryColumns;
+  private List<String> _rangeIndexColumns;
+
+  @BeforeMethod
+  public void buildSegment()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    // Get resource file path.
+    URL resource = getClass().getClassLoader().getResource(AVRO_DATA);
+    assertNotNull(resource);
+    String filePath = resource.getFile();
+
+    createSegment(filePath, SEGMENT_NAME_1);
+    createSegment(filePath, SEGMENT_NAME_2);
+
+    ImmutableSegment immutableSegment1 = 
loadSegmentWithMetadataChecks(SEGMENT_NAME_1);
+    ImmutableSegment immutableSegment2 = 
loadSegmentWithMetadataChecks(SEGMENT_NAME_2);
+
+    // immutableSegment1 checks
+    assertNotNull(immutableSegment1.getForwardIndex("column1"));
+    assertNull(immutableSegment1.getDictionary("column1"));
+    assertNotNull(immutableSegment1.getForwardIndex("column2"));
+    assertNull(immutableSegment1.getDictionary("column2"));
+    assertNotNull(immutableSegment1.getForwardIndex("column3"));
+    assertNull(immutableSegment1.getDictionary("column3"));
+    assertNotNull(immutableSegment1.getForwardIndex("column6"));
+    assertNull(immutableSegment1.getDictionary("column6"));
+    assertNotNull(immutableSegment1.getForwardIndex("column7"));
+    assertNull(immutableSegment1.getDictionary("column7"));
+    assertNotNull(immutableSegment1.getForwardIndex("column10"));
+    assertNull(immutableSegment1.getDictionary("column10"));
+
+    // immutableSegment2 checks
+    assertNotNull(immutableSegment2.getForwardIndex("column1"));
+    assertNull(immutableSegment2.getDictionary("column1"));
+    assertNotNull(immutableSegment2.getForwardIndex("column2"));
+    assertNull(immutableSegment2.getDictionary("column2"));
+    assertNotNull(immutableSegment2.getForwardIndex("column3"));
+    assertNull(immutableSegment2.getDictionary("column3"));
+    assertNotNull(immutableSegment2.getForwardIndex("column6"));
+    assertNull(immutableSegment2.getDictionary("column6"));
+    assertNotNull(immutableSegment2.getForwardIndex("column7"));
+    assertNull(immutableSegment2.getDictionary("column7"));
+    assertNotNull(immutableSegment2.getForwardIndex("column10"));
+    assertNull(immutableSegment2.getDictionary("column10"));
+
+    _indexSegment = immutableSegment1;
+    _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
+  }
+
+  private void createSegment(String filePath, String segmentName)
+      throws Exception {
+    _rangeIndexColumns = new ArrayList<>(Arrays.asList("column10"));
+
+    _noDictionaryColumns =
+        new ArrayList<>(Arrays.asList("column1", "column2", "column3", 
"column5", "column6", "column7", "column10"));
+    List<FieldConfig> fieldConfigs = new ArrayList<>();
+    for (String column : _noDictionaryColumns) {
+      fieldConfigs.add(new FieldConfig(column, FieldConfig.EncodingType.RAW, 
Collections.emptyList(),
+          FieldConfig.CompressionCodec.SNAPPY, null));
+    }
+
+    _tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setNoDictionaryColumns(_noDictionaryColumns).setTableName("testTable")
+            
.setTimeColumnName("daysSinceEpoch").setFieldConfigList(fieldConfigs).build();
+
+    // Create the segment generator config.
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(_tableConfig, SCHEMA);
+    segmentGeneratorConfig.setInputFilePath(filePath);
+    segmentGeneratorConfig.setTableName("testTable");
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+    segmentGeneratorConfig.setSegmentName(segmentName);
+    _invertedIndexColumns = Arrays.asList("column8", "column9");
+    
segmentGeneratorConfig.setInvertedIndexCreationColumns(_invertedIndexColumns);
+    segmentGeneratorConfig.setRawIndexCreationColumns(_noDictionaryColumns);
+    segmentGeneratorConfig.setRangeIndexCreationColumns(_rangeIndexColumns);
+    // The segment generation code in SegmentColumnarIndexCreator will throw
+    // exception if start and end time in time column are not in acceptable
+    // range. For this test, we first need to fix the input avro data
+    // to have the time column values in allowed range. Until then, the check
+    // is explicitly disabled
+    segmentGeneratorConfig.setSkipTimeValueCheck(true);
+
+    // Build the index segment.
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+  }
+
+  private ImmutableSegment loadSegmentWithMetadataChecks(String segmentName)
+      throws Exception {
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+    indexLoadingConfig.setTableConfig(_tableConfig);
+    indexLoadingConfig.setInvertedIndexColumns(new 
HashSet<>(_invertedIndexColumns));
+    indexLoadingConfig.setNoDictionaryColumns(new 
HashSet<>(_noDictionaryColumns));
+    indexLoadingConfig.setReadMode(ReadMode.heap);
+
+    ImmutableSegment immutableSegment =
+        ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), 
indexLoadingConfig);
+
+    Map<String, ColumnMetadata> columnMetadataMap1 = 
immutableSegment.getSegmentMetadata().getColumnMetadataMap();
+    columnMetadataMap1.forEach((column, metadata) -> {
+      if (_invertedIndexColumns.contains(column)) {
+        assertTrue(metadata.hasDictionary());
+        assertNotNull(immutableSegment.getInvertedIndex(column));
+        assertNotNull(immutableSegment.getForwardIndex(column));
+      } else if (_noDictionaryColumns.contains(column)) {
+        assertFalse(metadata.hasDictionary());
+        assertNotNull(immutableSegment.getForwardIndex(column));
+      }
+    });
+
+    return immutableSegment;
+  }
+
+  @AfterMethod
+  public void deleteAndDestroySegment() {
+    FileUtils.deleteQuietly(INDEX_DIR);
+    _indexSegments.forEach((IndexSegment::destroy));
+  }
+
+  @Override
+  protected String getFilter() {
+    return FILTER;
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @Test
+  public void testSelectQueriesAfterReload()
+      throws Exception {
+    String query =
+        "SELECT column1, column2, column3, column6, column7, column10 FROM 
testTable WHERE column10 > 674022574 AND "
+            + "column1 > 100000000 AND column2 BETWEEN 20000000 AND 1000000000 
AND column3 <> 'w' AND (column6 < "
+            + "500000 OR column7 NOT IN (225, 407)) AND daysSinceEpoch = 
1756015683 ORDER BY column1";
+    BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+    assertTrue(brokerResponseNative.getProcessingExceptions() == null
+        || brokerResponseNative.getProcessingExceptions().size() == 0);
+    ResultTable resultTable = brokerResponseNative.getResultTable();
+    assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+    assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+    assertEquals(brokerResponseNative.getNumDocsScanned(), 1184L);
+    assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+    assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 1384L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 913464L);
+    assertNotNull(brokerResponseNative.getProcessingExceptions());
+
+    DataSchema dataSchema = new DataSchema(new String[]{
+        "column1", "column2", "column3", "column6", "column7", "column10"
+    }, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.STRING,
+        DataSchema.ColumnDataType.INT_ARRAY, 
DataSchema.ColumnDataType.INT_ARRAY, DataSchema.ColumnDataType.INT
+    });
+    assertEquals(resultTable.getDataSchema(), dataSchema);
+    List<Object[]> resultRows1 = resultTable.getRows();
+
+    changePropertiesAndReloadSegment();
+
+    // Run the same query again.
+    brokerResponseNative = getBrokerResponse(query);
+    assertTrue(brokerResponseNative.getProcessingExceptions() == null
+        || brokerResponseNative.getProcessingExceptions().size() == 0);
+    resultTable = brokerResponseNative.getResultTable();
+    assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+    assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+    assertEquals(brokerResponseNative.getNumDocsScanned(), 1184L);
+    assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+    assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 1384L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 250896L);
+    assertNotNull(brokerResponseNative.getProcessingExceptions());
+
+    dataSchema = new DataSchema(new String[]{
+        "column1", "column2", "column3", "column6", "column7", "column10"
+    }, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.STRING,
+        DataSchema.ColumnDataType.INT_ARRAY, 
DataSchema.ColumnDataType.INT_ARRAY, DataSchema.ColumnDataType.INT
+    });
+    assertEquals(resultTable.getDataSchema(), dataSchema);
+    List<Object[]> resultRows2 = resultTable.getRows();
+
+    validateBeforeAfterQueryResults(resultRows1, resultRows2);
+  }
+
+  @Test
+  public void testSelectWithDistinctQueriesAfterReload()
+      throws Exception {
+    String query = "SELECT DISTINCT column1, column2, column3, column6, 
column7, column10 FROM testTable ORDER BY "
+        + "column1 LIMIT 10";
+    BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+    assertTrue(brokerResponseNative.getProcessingExceptions() == null
+        || brokerResponseNative.getProcessingExceptions().size() == 0);
+    ResultTable resultTable = brokerResponseNative.getResultTable();
+    assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+    assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+    assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+    assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+    assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 
2400000L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+    assertNotNull(brokerResponseNative.getProcessingExceptions());
+    assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+    DataSchema dataSchema = new DataSchema(new String[]{
+        "column1", "column2", "column3", "column6", "column7", "column10"
+    }, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.STRING,
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.INT
+    });
+    assertEquals(resultTable.getDataSchema(), dataSchema);
+    List<Object[]> resultRows1 = resultTable.getRows();
+
+    changePropertiesAndReloadSegment();
+
+    brokerResponseNative = getBrokerResponse(query);
+    assertTrue(brokerResponseNative.getProcessingExceptions() == null
+        || brokerResponseNative.getProcessingExceptions().size() == 0);
+    resultTable = brokerResponseNative.getResultTable();
+    assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+    assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+    assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+    assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+    assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 
2400000L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+    assertNotNull(brokerResponseNative.getProcessingExceptions());
+    assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+    dataSchema = new DataSchema(new String[]{
+        "column1", "column2", "column3", "column6", "column7", "column10"
+    }, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.STRING,
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.INT
+    });
+    assertEquals(resultTable.getDataSchema(), dataSchema);
+    List<Object[]> resultRows2 = resultTable.getRows();
+
+    validateBeforeAfterQueryResults(resultRows1, resultRows2);
+  }
+
+  @Test
+  public void testSelectWithGroupByOrderByQueriesWithReload()
+      throws Exception {
+    String query =
+        "SELECT column1, column7 FROM testTable GROUP BY column1, column7 
ORDER BY column1, column7 " + " LIMIT 10";
+    BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+    ResultTable resultTable = brokerResponseNative.getResultTable();
+    assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+    assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+    assertEquals(brokerResponseNative.getNumDocsScanned(), 400000L);
+    assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+    assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 
800000L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+    assertNotNull(brokerResponseNative.getProcessingExceptions());
+    assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+    assertEquals(resultTable.getDataSchema(), new DataSchema(new 
String[]{"column1", "column7"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.INT}));
+    List<Object[]> resultRows1 = resultTable.getRows();
+    int previousVal = -1;
+    for (Object[] resultRow : resultRows1) {
+      assertEquals(resultRow.length, 2);
+      assertTrue((int) resultRow[0] >= previousVal);
+      previousVal = (int) resultRow[0];
+    }
+
+    changePropertiesAndReloadSegment();
+
+    brokerResponseNative = getBrokerResponse(query);
+    resultTable = brokerResponseNative.getResultTable();
+    assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+    assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+    assertEquals(brokerResponseNative.getNumDocsScanned(), 400000L);
+    assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+    assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 
800000L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+    assertNotNull(brokerResponseNative.getProcessingExceptions());
+    assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+    assertEquals(resultTable.getDataSchema(), new DataSchema(new 
String[]{"column1", "column7"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.INT}));
+    List<Object[]> resultRows2 = resultTable.getRows();
+    previousVal = -1;
+    for (Object[] resultRow : resultRows2) {
+      assertEquals(resultRow.length, 2);
+      assertTrue((int) resultRow[0] >= previousVal);
+      previousVal = (int) resultRow[0];
+    }
+
+    validateBeforeAfterQueryResults(resultRows1, resultRows2);
+  }
+
+  @Test
+  public void testSelectWithAggregationQueriesWithReload()
+      throws Exception {
+    // TEST 1.
+    String query = "SELECT MAX(ARRAYLENGTH(column7)) from testTable LIMIT 10";
+    BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+    assertTrue(brokerResponseNative.getProcessingExceptions() == null
+        || brokerResponseNative.getProcessingExceptions().size() == 0);
+    ResultTable resultTable = brokerResponseNative.getResultTable();
+    assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+    assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+    assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+    assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+    assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 
400000);
+    assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+    assertNotNull(brokerResponseNative.getProcessingExceptions());
+    assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+    assertEquals(resultTable.getDataSchema(), new DataSchema(new 
String[]{"max(arraylength(column7))"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE}));
+    List<Object[]> beforeResultRows1 = resultTable.getRows();
+
+    // TEST2
+    query =
+        "SELECT MAX(column1), MIN(column1), MAX(column2), MIN(column2), 
MAXMV(column6), MINMV(column6), MAXMV"
+            + "(column7), MINMV(column7), MAX(column10), MIN(column10) FROM 
testTable";
+    brokerResponseNative = getBrokerResponse(query);
+    assertTrue(brokerResponseNative.getProcessingExceptions() == null
+        || brokerResponseNative.getProcessingExceptions().size() == 0);
+    resultTable = brokerResponseNative.getResultTable();
+    assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+    assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+    assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+    assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+    assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 0);
+    assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+    assertNotNull(brokerResponseNative.getProcessingExceptions());
+    assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+    assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{
+        "max(column1)", "min(column1)",
+        "max" + "(column2)", "min(column2)", "maxmv(column6)", 
"minmv(column6)",
+        "maxmv" + "(column7)", "minmv(column7)", "max(column10)", 
"min(column10)"
+    }, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE,
+        DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE,
+        DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE,
+        DataSchema.ColumnDataType.DOUBLE
+    }));
+    List<Object[]> beforeResultRows2 = resultTable.getRows();
+
+    // TEST3
+    query = "SELECT column1, max(column1), sum(column10) from testTable WHERE 
column7 = 2147483647 GROUP BY "
+        + "column1 ORDER BY column1";
+    brokerResponseNative = getBrokerResponse(query);
+    assertTrue(brokerResponseNative.getProcessingExceptions() == null
+        || brokerResponseNative.getProcessingExceptions().size() == 0);
+    resultTable = brokerResponseNative.getResultTable();
+    assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+    assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+    assertEquals(brokerResponseNative.getNumDocsScanned(), 199_756L);
+    assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+    assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 
399_512L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 536360L);
+    assertNotNull(brokerResponseNative.getProcessingExceptions());
+    assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+    assertEquals(resultTable.getDataSchema(),
+        new DataSchema(new String[]{"column1", "max(column1)", 
"sum(column10)"}, new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE
+        }));
+    List<Object[]> beforeResultRows3 = resultTable.getRows();
+
+    changePropertiesAndReloadSegment();
+
+    query = "SELECT MAX(ARRAYLENGTH(column7)) from testTable LIMIT 10";
+    brokerResponseNative = getBrokerResponse(query);
+    assertTrue(brokerResponseNative.getProcessingExceptions() == null
+        || brokerResponseNative.getProcessingExceptions().size() == 0);
+    resultTable = brokerResponseNative.getResultTable();
+    assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+    assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+    assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+    assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+    assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 
400000);
+    assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+    assertNotNull(brokerResponseNative.getProcessingExceptions());
+    assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+    assertEquals(resultTable.getDataSchema(), new DataSchema(new 
String[]{"max(arraylength(column7))"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE}));
+    List<Object[]> afterResultRows1 = resultTable.getRows();
+
+    validateBeforeAfterQueryResults(beforeResultRows1, afterResultRows1);
+
+    query =
+        "SELECT MAX(column1), MIN(column1), MAX(column2), MIN(column2), 
MAXMV(column6), MINMV(column6), MAXMV"
+            + "(column7), MINMV(column7), MAX(column10), MIN(column10) FROM 
testTable";
+    brokerResponseNative = getBrokerResponse(query);
+    assertTrue(brokerResponseNative.getProcessingExceptions() == null
+        || brokerResponseNative.getProcessingExceptions().size() == 0);
+    resultTable = brokerResponseNative.getResultTable();
+    assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+    assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+    assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+    assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+    assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 0);
+    assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+    assertNotNull(brokerResponseNative.getProcessingExceptions());
+    assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+    assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{
+        "max(column1)", "min(column1)",
+        "max" + "(column2)", "min(column2)", "maxmv(column6)", 
"minmv(column6)",
+        "maxmv" + "(column7)", "minmv(column7)", "max(column10)", 
"min(column10)"
+    }, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE,
+        DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE,
+        DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE,
+        DataSchema.ColumnDataType.DOUBLE
+    }));
+    List<Object[]> afterResultRows2 = resultTable.getRows();
+
+    validateBeforeAfterQueryResults(beforeResultRows2, afterResultRows2);
+
+    query = "SELECT column1, max(column1), sum(column10) from testTable WHERE 
column7 = 2147483647 GROUP BY "
+        + "column1 ORDER BY column1";
+    brokerResponseNative = getBrokerResponse(query);
+    assertTrue(brokerResponseNative.getProcessingExceptions() == null
+        || brokerResponseNative.getProcessingExceptions().size() == 0);
+    resultTable = brokerResponseNative.getResultTable();
+    assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+    assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+    assertEquals(brokerResponseNative.getNumDocsScanned(), 199_756L);
+    assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+    assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 
399_512L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+    assertNotNull(brokerResponseNative.getProcessingExceptions());
+    assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+    assertEquals(resultTable.getDataSchema(),
+        new DataSchema(new String[]{"column1", "max(column1)", 
"sum(column10)"}, new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE
+        }));
+    List<Object[]> afterResultRows3 = resultTable.getRows();
+
+    validateBeforeAfterQueryResults(beforeResultRows3, afterResultRows3);
+  }
+
+  @Test
+  public void testRangeIndexAfterReload()
+      throws Exception {
+    String query = "select count(*) from testTable where column10 > 674022574";
+    BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+    assertTrue(brokerResponseNative.getProcessingExceptions() == null
+        || brokerResponseNative.getProcessingExceptions().size() == 0);
+    ResultTable resultTable1 = brokerResponseNative.getResultTable();
+    assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+    assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+    assertEquals(brokerResponseNative.getNumDocsScanned(), 103280L);
+    assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+    assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 0L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 400000L);
+    assertNotNull(brokerResponseNative.getProcessingExceptions());
+
+    DataSchema dataSchema = new DataSchema(new String[]{
+        "count(*)"
+    }, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.LONG
+    });
+    assertEquals(resultTable1.getDataSchema(), dataSchema);
+    List<Object[]> resultRows1 = resultTable1.getRows();
+
+    changePropertiesAndReloadSegment();
+
+    brokerResponseNative = getBrokerResponse(query);
+    assertTrue(brokerResponseNative.getProcessingExceptions() == null
+        || brokerResponseNative.getProcessingExceptions().size() == 0);
+    resultTable1 = brokerResponseNative.getResultTable();
+    assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+    assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+    assertEquals(brokerResponseNative.getNumDocsScanned(), 103280L);
+    assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+    assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 0L);
+    assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+    assertNotNull(brokerResponseNative.getProcessingExceptions());
+
+    dataSchema = new DataSchema(new String[]{
+        "count(*)"
+    }, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.LONG
+    });
+    assertEquals(resultTable1.getDataSchema(), dataSchema);
+    List<Object[]> resultRows2 = resultTable1.getRows();
+
+    validateBeforeAfterQueryResults(resultRows1, resultRows2);
+  }
+
+  private void validateBeforeAfterQueryResults(List<Object[]> beforeResults, 
List<Object[]> afterResults) {
+    assertEquals(beforeResults.size(), afterResults.size());
+    for (int i = 0; i < beforeResults.size(); i++) {
+      Object[] resultRow1 = beforeResults.get(i);
+      Object[] resultRow2 = afterResults.get(i);
+      assertEquals(resultRow1.length, resultRow2.length);
+      for (int j = 0; j < resultRow1.length; j++) {
+        assertEquals(resultRow1[j], resultRow2[j]);
+      }
+    }
+  }
+
+  /**
+   * As a part of segmentReload, the ForwardIndexHandler will perform the 
following operations:
+   *
+   * column1 -> change compression.
+   * column6 -> change compression
+   * column3 -> Enable dictionary.
+   * column2 -> Enable dictionary. Add inverted index.
+   * column7 -> Enable dictionary. Add inverted index.
+   * column10 -> Enable dictionary.
+   */
+  private void changePropertiesAndReloadSegment()
+      throws Exception {
+    List<FieldConfig> newFieldConfigs = new ArrayList<>();
+    newFieldConfigs.add(new FieldConfig("column1", 
FieldConfig.EncodingType.RAW, Collections.emptyList(),
+        FieldConfig.CompressionCodec.ZSTANDARD, null));
+    newFieldConfigs.add(new FieldConfig("column6", 
FieldConfig.EncodingType.RAW, Collections.emptyList(),
+        FieldConfig.CompressionCodec.LZ4, null));
+    _tableConfig.setFieldConfigList(newFieldConfigs);
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, 
_tableConfig);
+    indexLoadingConfig.setTableConfig(_tableConfig);
+    Set<String> invertedIndexEnabledColumns = new 
HashSet<>(_invertedIndexColumns);
+    invertedIndexEnabledColumns.add("column2");
+    invertedIndexEnabledColumns.add("column7");
+    indexLoadingConfig.setInvertedIndexColumns(invertedIndexEnabledColumns);
+    Set<String> noDictionaryColumns = new HashSet<>(_noDictionaryColumns);
+    indexLoadingConfig.setNoDictionaryColumns(noDictionaryColumns);
+    indexLoadingConfig.getNoDictionaryColumns().remove("column2");
+    indexLoadingConfig.getNoDictionaryColumns().remove("column3");
+    indexLoadingConfig.getNoDictionaryColumns().remove("column7");
+    indexLoadingConfig.getNoDictionaryColumns().remove("column10");
+    Set<String> rangeIndexColumns = new HashSet<>(_rangeIndexColumns);
+    indexLoadingConfig.setRangeIndexColumns(rangeIndexColumns);
+    indexLoadingConfig.setReadMode(ReadMode.heap);
+
+    // Reload the segments to pick up the new configs
+    File indexDir = new File(INDEX_DIR, SEGMENT_NAME_1);
+    ImmutableSegment immutableSegment1 = reloadSegment(indexDir, 
indexLoadingConfig, SCHEMA);
+    indexDir = new File(INDEX_DIR, SEGMENT_NAME_2);
+    ImmutableSegment immutableSegment2 = reloadSegment(indexDir, 
indexLoadingConfig, SCHEMA);
+    _indexSegment = immutableSegment1;
+    _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
+
+    // immutableSegment1 checks
+    assertNotNull(immutableSegment1.getForwardIndex("column1"));
+    assertNull(immutableSegment1.getDictionary("column1"));
+    assertNotNull(immutableSegment1.getForwardIndex("column2"));
+    assertNotNull(immutableSegment1.getDictionary("column2"));
+    assertNotNull(immutableSegment1.getForwardIndex("column3"));
+    assertNotNull(immutableSegment1.getDictionary("column3"));
+    assertNotNull(immutableSegment1.getForwardIndex("column6"));
+    assertNull(immutableSegment1.getDictionary("column6"));
+    assertNotNull(immutableSegment1.getForwardIndex("column7"));
+    assertNotNull(immutableSegment1.getDictionary("column7"));
+    assertNotNull(immutableSegment1.getForwardIndex("column10"));
+    assertNotNull(immutableSegment1.getDictionary("column10"));
+
+    // immutableSegment2 checks
+    assertNotNull(immutableSegment2.getForwardIndex("column1"));
+    assertNull(immutableSegment2.getDictionary("column1"));
+    assertNotNull(immutableSegment2.getForwardIndex("column2"));
+    assertNotNull(immutableSegment2.getDictionary("column2"));
+    assertNotNull(immutableSegment2.getForwardIndex("column3"));
+    assertNotNull(immutableSegment2.getDictionary("column3"));
+    assertNotNull(immutableSegment2.getForwardIndex("column6"));
+    assertNull(immutableSegment2.getDictionary("column6"));
+    assertNotNull(immutableSegment2.getForwardIndex("column7"));
+    assertNotNull(immutableSegment2.getDictionary("column7"));
+    assertNotNull(immutableSegment2.getForwardIndex("column10"));
+    assertNotNull(immutableSegment2.getDictionary("column10"));
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index 9ec3804563..7a1d918199 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -152,12 +152,17 @@ public class ForwardIndexHandler implements IndexHandler {
 
     for (String column : existingAllColumns) {
       if (existingNoDictColumns.contains(column) && 
!newNoDictColumns.contains(column)) {
+        // Existing column is RAW. New column is dictionary enabled.
         if (_schema == null || _indexLoadingConfig.getTableConfig() == null) {
           // This can only happen in tests.
           LOGGER.warn("Cannot enable dictionary for column={} as schema or 
tableConfig is null.", column);
           continue;
         }
-        // Existing column is RAW. New column is dictionary enabled.
+
+        // Note that RAW columns cannot be sorted.
+        ColumnMetadata existingColMetadata = 
_segmentMetadata.getColumnMetadataFor(column);
+        Preconditions.checkState(!existingColMetadata.isSorted(), "Raw 
column=" + column + " cannot be sorted.");
+
         columnOperationMap.put(column, Operation.ENABLE_DICTIONARY);
       } else if (existingNoDictColumns.contains(column) && 
newNoDictColumns.contains(column)) {
         // Both existing and new column is RAW forward index encoded. Check if 
compression needs to be changed.
@@ -439,9 +444,8 @@ public class ForwardIndexHandler implements IndexHandler {
     File dictionaryFile = new File(indexDir, column + 
V1Constants.Dict.FILE_EXTENSION);
     String fwdIndexFileExtension;
     if (isSingleValue) {
-      fwdIndexFileExtension =
-          existingColMetadata.isSorted() ? 
V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION
-              : V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
+      // Raw columns cannot be sorted.
+      fwdIndexFileExtension = 
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
     } else {
       fwdIndexFileExtension = 
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
     }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
index f49b7755ce..5492e363d7 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -45,6 +45,7 @@ import org.apache.pinot.segment.spi.index.IndexingOverrides;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.store.ColumnIndexType;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -88,9 +89,6 @@ public class ForwardIndexHandlerTest {
   private static final String DIM_ZSTANDARD_BYTES = "DIM_ZSTANDARD_BYTES";
   private static final String DIM_LZ4_BYTES = "DIM_LZ4_BYTES";
 
-  // Sorted column
-  private static final String DIM_PASS_THROUGH_SORTED_LONG = 
"DIM_PASS_THROUGH_SORTED_LONG";
-
   // Dictionary columns
   private static final String DIM_DICT_INTEGER = "DIM_DICT_INTEGER";
   private static final String DIM_DICT_STRING = "DIM_DICT_STRING";
@@ -124,8 +122,7 @@ public class ForwardIndexHandlerTest {
   private static final List<String> RAW_PASS_THROUGH_INDEX_COLUMNS =
       Arrays.asList(DIM_PASS_THROUGH_STRING, DIM_PASS_THROUGH_LONG, 
DIM_PASS_THROUGH_INTEGER, DIM_PASS_THROUGH_BYTES,
           METRIC_PASS_THROUGH_BIG_DECIMAL, METRIC_PASS_THROUGH_INTEGER, 
DIM_MV_PASS_THROUGH_INTEGER,
-          DIM_MV_PASS_THROUGH_LONG, DIM_MV_PASS_THROUGH_STRING, 
DIM_MV_PASS_THROUGH_BYTES,
-          DIM_PASS_THROUGH_SORTED_LONG);
+          DIM_MV_PASS_THROUGH_LONG, DIM_MV_PASS_THROUGH_STRING, 
DIM_MV_PASS_THROUGH_BYTES);
 
   private static final List<String> RAW_LZ4_INDEX_COLUMNS =
       Arrays.asList(DIM_LZ4_STRING, DIM_LZ4_LONG, DIM_LZ4_INTEGER, 
DIM_LZ4_BYTES, METRIC_LZ4_BIG_DECIMAL,
@@ -207,7 +204,6 @@ public class ForwardIndexHandlerTest {
         .addSingleValueDimension(DIM_DICT_INTEGER, FieldSpec.DataType.INT)
         .addSingleValueDimension(DIM_DICT_LONG, FieldSpec.DataType.LONG)
         .addSingleValueDimension(DIM_DICT_STRING, FieldSpec.DataType.STRING)
-        .addSingleValueDimension(DIM_PASS_THROUGH_SORTED_LONG, 
FieldSpec.DataType.LONG)
         .addMetric(METRIC_PASS_THROUGH_INTEGER, FieldSpec.DataType.INT)
         .addMetric(METRIC_SNAPPY_INTEGER, 
FieldSpec.DataType.INT).addMetric(METRIC_LZ4_INTEGER, FieldSpec.DataType.INT)
         .addMetric(METRIC_ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
@@ -332,9 +328,6 @@ public class ForwardIndexHandlerTest {
       row.putValue(DIM_MV_PASS_THROUGH_STRING, tempMVStringRows[i]);
       row.putValue(DIM_MV_PASS_THROUGH_BYTES, tempMVByteRows[i]);
 
-      // Sorted columns
-      row.putValue(DIM_PASS_THROUGH_SORTED_LONG, (long) i);
-
       rows.add(row);
     }
     return rows;
@@ -370,13 +363,6 @@ public class ForwardIndexHandlerTest {
     operationMap = fwdIndexHandler.computeOperation(writer);
     assertEquals(operationMap.get(DIM_MV_PASS_THROUGH_STRING), 
ForwardIndexHandler.Operation.ENABLE_DICTIONARY);
 
-    // TEST4: Enable dictionary for a sorted column.
-    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
-    
indexLoadingConfig.getNoDictionaryColumns().remove(DIM_PASS_THROUGH_SORTED_LONG);
-    fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, 
indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
-    assertEquals(operationMap.get(DIM_PASS_THROUGH_SORTED_LONG), 
ForwardIndexHandler.Operation.ENABLE_DICTIONARY);
-
     // TEST5: Enable dictionary for a dict column. Should be a No-op.
     indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
     fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, 
indexLoadingConfig, _schema);
@@ -398,7 +384,7 @@ public class ForwardIndexHandlerTest {
     operationMap = fwdIndexHandler.computeOperation(writer);
     assertEquals(operationMap, Collections.EMPTY_MAP);
 
-    // TEST8: Add text index and disable forward index.
+    // TEST8: Add text index and enable dictionary.
     indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
     indexLoadingConfig.getRangeIndexColumns().add(METRIC_LZ4_INTEGER);
     indexLoadingConfig.getNoDictionaryColumns().remove(METRIC_LZ4_INTEGER);
@@ -465,7 +451,7 @@ public class ForwardIndexHandlerTest {
   }
 
   @Test(priority = 1)
-  public void testRewriteRawForwardIndexForSingleColumn()
+  public void testChangeCompressionForSingleColumn()
       throws Exception {
     for (int i = 0; i < _noDictionaryColumns.size(); i++) {
       // For every noDictionaryColumn, change the compressionType to all 
available types, one by one.
@@ -501,6 +487,7 @@ public class ForwardIndexHandlerTest {
         segmentLocalFSDirectory.close();
 
         // Validation
+        testIndexExists(columnName, ColumnIndexType.FORWARD_INDEX);
         validateIndexMap(columnName, false);
         validateForwardIndex(columnName, newCompressionType);
 
@@ -517,7 +504,7 @@ public class ForwardIndexHandlerTest {
   }
 
   @Test(priority = 2)
-  public void testRewriteRawForwardIndexForMultipleColumns()
+  public void testChangeCompressionForMultipleColumns()
       throws Exception {
     // Setup
     SegmentMetadataImpl existingSegmentMetadata = new 
SegmentMetadataImpl(_segmentDirectory);
@@ -559,6 +546,7 @@ public class ForwardIndexHandlerTest {
     // Tear down before validation. Because columns.psf and index map cleanup 
happens at segmentDirectory.close()
     segmentLocalFSDirectory.close();
 
+    testIndexExists(column1, ColumnIndexType.FORWARD_INDEX);
     validateIndexMap(column1, false);
     validateForwardIndex(column1, newCompressionType);
     // Validate metadata properties. Nothing should change when a forwardIndex 
is rewritten for compressionType
@@ -569,6 +557,7 @@ public class ForwardIndexHandlerTest {
         metadata.isSorted(), metadata.isSingleValue(), 
metadata.getMaxNumberOfMultiValues(),
         metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), 
metadata.getMinValue(), metadata.getMaxValue());
 
+    testIndexExists(column2, ColumnIndexType.FORWARD_INDEX);
     validateIndexMap(column2, false);
     validateForwardIndex(column2, newCompressionType);
     metadata = existingSegmentMetadata.getColumnMetadataFor(column2);
@@ -601,6 +590,8 @@ public class ForwardIndexHandlerTest {
     segmentLocalFSDirectory.close();
 
     // Col1 validation.
+    testIndexExists(col1, ColumnIndexType.FORWARD_INDEX);
+    testIndexExists(col1, ColumnIndexType.DICTIONARY);
     validateIndexMap(col1, true);
     validateForwardIndex(col1, null);
     // In column metadata, nothing other than hasDictionary and 
dictionaryElementSize should change.
@@ -619,6 +610,8 @@ public class ForwardIndexHandlerTest {
         metadata.getMinValue(), metadata.getMaxValue());
 
     // Col2 validation.
+    testIndexExists(col2, ColumnIndexType.FORWARD_INDEX);
+    testIndexExists(col2, ColumnIndexType.DICTIONARY);
     validateIndexMap(col2, true);
     validateForwardIndex(col2, null);
     // In column metadata, nothing other than hasDictionary and 
dictionaryElementSize should change.
@@ -657,6 +650,8 @@ public class ForwardIndexHandlerTest {
       // Tear down before validation. Because columns.psf and index map 
cleanup happens at segmentDirectory.close()
       segmentLocalFSDirectory.close();
 
+      testIndexExists(column, ColumnIndexType.FORWARD_INDEX);
+      testIndexExists(column, ColumnIndexType.DICTIONARY);
       validateIndexMap(column, true);
       validateForwardIndex(column, null);
 
@@ -735,11 +730,7 @@ public class ForwardIndexHandlerTest {
             }
             case LONG: {
               if (isSingleValue) {
-                if (isSorted) {
-                  assertEquals((long) val, rowIdx, columnName + " " + rowIdx + 
" " + expectedCompressionType);
-                } else {
-                  assertEquals((long) val, 1001L, columnName + " " + rowIdx + 
" " + expectedCompressionType);
-                }
+                assertEquals((long) val, 1001L, columnName + " " + rowIdx + " 
" + expectedCompressionType);
               } else {
                 Object[] values = (Object[]) val;
                 int length = values.length;
@@ -777,6 +768,15 @@ public class ForwardIndexHandlerTest {
     }
   }
 
+  private void testIndexExists(String columnName, ColumnIndexType indexType) 
throws Exception {
+    SegmentMetadataImpl existingSegmentMetadata = new 
SegmentMetadataImpl(_segmentDirectory);
+    SegmentDirectory segmentLocalFSDirectory =
+        new SegmentLocalFSDirectory(_segmentDirectory, 
existingSegmentMetadata, ReadMode.mmap);
+    SegmentDirectory.Reader reader = segmentLocalFSDirectory.createReader();
+
+    assertTrue(reader.hasIndexFor(columnName, indexType));
+  }
+
   private void validateIndexMap(String columnName, boolean dictionaryEnabled)
       throws IOException {
     // Panic validation to make sure all columns have only one forward index 
entry in index map.
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 4ad93b8c6e..3a9eb19427 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -356,33 +356,66 @@ public class SegmentPreProcessorTest {
   }
 
   @Test
-  public void testForwardIndexHandlerEnableDictionary()
+  public void testSimpleEnableDictionarySV()
       throws Exception {
     // Add raw columns in indexingConfig so that the ForwardIndexHandler 
doesn't end up converting them to dictionary
     // enabled columns
-    _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW_MV);
     _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW);
     _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW);
 
     // TEST 1. Check running forwardIndexHandler on a V1 segment. No-op for 
all existing raw columns.
     constructV1Segment(Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
-    checkForwardIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _schema, false, 
true, false, 26, null, true, 0,
-        DataType.STRING, 100000);
-    validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 
3, _schema, false, false, false, 0, true,
-        0, ChunkCompressionType.LZ4, false, DataType.STRING, 100000);
-    validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_INT_COL_RAW_MV, 
18499, 15, _schema, false, false, false, 0,
-        false, 13, ChunkCompressionType.LZ4, false, DataType.INT, 106688);
+    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, 
false, false, 0, ChunkCompressionType.LZ4,
+        true, 0, DataType.STRING, 100000);
     validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_INT_COL_RAW, 42242, 
16, _schema, false, false, false, 0, true,
         0, ChunkCompressionType.LZ4, false, DataType.INT, 100000);
 
     // Convert the segment to V3.
     new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
 
-    // TEST 2: Run reload with no-changes.
-    checkForwardIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _schema, false, 
true, false, 26, null, true, 0,
+    // TEST 2: Enable dictionary on EXISTING_STRING_COL_RAW
+    
_indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_STRING_COL_RAW);
+    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, 
true, false, 4, null, true, 0,
         DataType.STRING, 100000);
 
-    // TEST 3: EXISTING_STRING_COL_RAW. Enable dictionary. Also add inverted 
index and text index. Reload code path
+    // TEST 3: Enable dictionary on EXISTING_INT_COL_RAW
+    _indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_INT_COL_RAW);
+    checkForwardIndexCreation(EXISTING_INT_COL_RAW, 42242, 16, _schema, false, 
true, false, 0, null, true, 0,
+        DataType.INT, 100000);
+  }
+
+  @Test
+  public void testSimpleEnableDictionaryMV()
+      throws Exception {
+    // Add raw columns in indexingConfig so that the ForwardIndexHandler 
doesn't end up converting them to dictionary
+    // enabled columns
+    _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW_MV);
+
+    // TEST 1. Check running forwardIndexHandler on a V1 segment. No-op for 
all existing raw columns.
+    constructV1Segment(Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
+    checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, 18499, 15, _schema, 
false, false, false, 0,
+        ChunkCompressionType.LZ4, false, 13, DataType.INT, 106688);
+
+    // Convert the segment to V3.
+    new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
+
+    // TEST 2: Enable dictionary on EXISTING_STRING_COL_RAW
+    
_indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_INT_COL_RAW_MV);
+    checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, 18499, 15, _schema, 
false, true, false, 0, null, false, 13,
+        DataType.INT, 106688);
+  }
+
+  @Test
+  public void testEnableDictAndOtherIndexesSV()
+      throws Exception {
+    // Add raw columns in indexingConfig so that the ForwardIndexHandler 
doesn't end up converting them to dictionary
+    // enabled columns
+    _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW);
+    _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW);
+    constructV1Segment(Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
+    new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
+
+    // TEST 1: EXISTING_STRING_COL_RAW. Enable dictionary. Also add inverted 
index and text index. Reload code path
     // will create dictionary, inverted index and text index.
     
_indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_STRING_COL_RAW);
     _indexLoadingConfig.getInvertedIndexColumns().add(EXISTING_STRING_COL_RAW);
@@ -394,7 +427,7 @@ public class SegmentPreProcessorTest {
     validateIndex(ColumnIndexType.TEXT_INDEX, EXISTING_STRING_COL_RAW, 5, 3, 
_schema, false, true, false, 4, true, 0,
         null, false, DataType.STRING, 100000);
 
-    // TEST4: EXISTING_STRING_COL_RAW. Enable dictionary on a raw column that 
already has text index.
+    // TEST 2: EXISTING_STRING_COL_RAW. Enable dictionary on a raw column that 
already has text index.
     List<String> textIndexCols = new ArrayList<>();
     textIndexCols.add(EXISTING_STRING_COL_RAW);
     constructV1Segment(Collections.emptyList(), textIndexCols, 
Collections.emptyList());
@@ -410,7 +443,7 @@ public class SegmentPreProcessorTest {
     // Add it back so that this column is not rewritten for the other tests 
below.
     _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW);
 
-    // TEST 5: EXISTING_INT_COL_RAW. Enable dictionary on a column that 
already has range index.
+    // TEST 3: EXISTING_INT_COL_RAW. Enable dictionary on a column that 
already has range index.
     List<String> rangeIndexCols = new ArrayList<>();
     rangeIndexCols.add(EXISTING_INT_COL_RAW);
     constructV1Segment(Collections.emptyList(), Collections.emptyList(), 
rangeIndexCols);
@@ -427,14 +460,18 @@ public class SegmentPreProcessorTest {
         null, false, DataType.INT, 100000);
     // Add it back so that this column is not rewritten for the other tests 
below.
     _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW);
+  }
 
-    // TEST 6: EXISTING_INT_COL_RAW_MV. Enable dictionary for an MV column. 
Also enable inverted index and range index.
+  @Test
+  public void testEnableDictAndOtherIndexesMV()
+      throws Exception {
+    // Add raw columns in indexingConfig so that the ForwardIndexHandler 
doesn't end up converting them to dictionary
+    // enabled columns
+    _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW_MV);
     constructV1Segment(Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
     new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
-    validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_INT_COL_RAW_MV, 
18499, 15, _schema, false, false, false, 0,
-        false, 13, ChunkCompressionType.LZ4, false, DataType.INT, 106688);
 
-    // Enable dictionary and inverted index.
+    // TEST 1: EXISTING_INT_COL_RAW_MV. Enable dictionary for an MV column. 
Also enable inverted index and range index.
     
_indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_INT_COL_RAW_MV);
     _indexLoadingConfig.getInvertedIndexColumns().add(EXISTING_INT_COL_RAW_MV);
     _indexLoadingConfig.getRangeIndexColumns().add(EXISTING_INT_COL_RAW_MV);
@@ -444,6 +481,23 @@ public class SegmentPreProcessorTest {
         false, 13, null, false, DataType.INT, 106688);
     validateIndex(ColumnIndexType.RANGE_INDEX, EXISTING_INT_COL_RAW_MV, 18499, 
15, _schema, false, true, false, 0,
         false, 13, null, false, DataType.INT, 106688);
+
+    // TEST 2: EXISTING_INT_COL_RAW_MV. Enable dictionary for an MV column 
that already has range index.
+    List<String> rangeIndexCols = new ArrayList<>();
+    rangeIndexCols.add(EXISTING_INT_COL_RAW_MV);
+    constructV1Segment(Collections.emptyList(), Collections.emptyList(), 
rangeIndexCols);
+    new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
+    validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_INT_COL_RAW_MV, 
18499, 15, _schema, false, false, false, 0,
+        false, 13, ChunkCompressionType.LZ4, false, DataType.INT, 106688);
+    validateIndex(ColumnIndexType.RANGE_INDEX, EXISTING_INT_COL_RAW_MV, 18499, 
15, _schema, false, false, false, 0,
+        false, 13, ChunkCompressionType.LZ4, false, DataType.INT, 106688);
+
+    // Enable dictionary.
+    
_indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_INT_COL_RAW_MV);
+    checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, 18499, 15, _schema, 
false, true, false, 0, null, false, 13,
+        DataType.INT, 106688);
+    validateIndex(ColumnIndexType.RANGE_INDEX, EXISTING_INT_COL_RAW_MV, 18499, 
15, _schema, false, true, false, 0,
+        false, 13, null, false, DataType.INT, 106688);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to