This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 715df15ffa Add query tests for changeCompression and enableDictionary
changes (#9768)
715df15ffa is described below
commit 715df15ffaa734ec3973942f4262d964f01d33b7
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Tue Nov 8 23:54:08 2022 -0800
Add query tests for changeCompression and enableDictionary changes (#9768)
---
.../ForwardIndexHandlerReloadQueriesTest.java | 693 +++++++++++++++++++++
.../segment/index/loader/ForwardIndexHandler.java | 12 +-
.../index/loader/ForwardIndexHandlerTest.java | 48 +-
.../index/loader/SegmentPreProcessorTest.java | 88 ++-
4 files changed, 796 insertions(+), 45 deletions(-)
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
new file mode 100644
index 0000000000..00408f9a46
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
@@ -0,0 +1,693 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * The <code>ForwardIndexHandlerReloadQueriesTest</code> class sets up the
index segment for the
+ * no forward index multi-value queries test with reload.
+ * <p>There are totally 14 columns, 100000 records inside the original Avro
file where 10 columns are selected to build
+ * the index segment. Selected columns information are as following:
+ * <ul>
+ * ColumnName, FieldType, DataType, Cardinality, IsSorted, HasInvertedIndex,
IsMultiValue, HasDictionary, RangeIndex
+ * <li>column1, METRIC, INT, 51594, F, F, F, F, F</li>
+ * <li>column2, METRIC, INT, 42242, F, F, F, F, F</li>
+ * <li>column3, DIMENSION, STRING, 5, F, F, F, F, F</li>
+ * <li>column5, DIMENSION, STRING, 9, F, F, F, F, F</li>
+ * <li>column6, DIMENSION, INT, 18499, F, F, T, F, F</li>
+ * <li>column7, DIMENSION, INT, 359, F, F, T, F, F</li>
+ * <li>column8, DIMENSION, INT, 850, F, T, F, T, F</li>
+ * <li>column9, METRIC, INT, 146, F, T, F, T, F</li>
+ * <li>column10, METRIC, INT, 3960, F, F, F, F, T</li>
+ * <li>daysSinceEpoch, TIME, INT, 1, T, F, F, T, F</li>
+ * </ul>
+ */
+public class ForwardIndexHandlerReloadQueriesTest extends BaseQueriesTest {
+ private static final String AVRO_DATA = "data" + File.separator +
"test_data-mv.avro";
+ private static final String SEGMENT_NAME_1 =
"testTable_1756015690_1756015690";
+ private static final String SEGMENT_NAME_2 =
"testTable_1756015691_1756015691";
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"ForwardIndexHandlerReloadQueriesTest");
+
+ // Build the segment schema.
+ private static final Schema SCHEMA =
+ new
Schema.SchemaBuilder().setSchemaName("testTable").addMetric("column1",
FieldSpec.DataType.INT)
+ .addMetric("column2",
FieldSpec.DataType.INT).addSingleValueDimension("column3",
FieldSpec.DataType.STRING)
+ .addSingleValueDimension("column5", FieldSpec.DataType.STRING)
+ .addMultiValueDimension("column6", FieldSpec.DataType.INT)
+ .addMultiValueDimension("column7", FieldSpec.DataType.INT)
+ .addSingleValueDimension("column8",
FieldSpec.DataType.INT).addMetric("column9", FieldSpec.DataType.INT)
+ .addMetric("column10", FieldSpec.DataType.INT)
+ .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT,
TimeUnit.DAYS, "daysSinceEpoch"), null).build();
+
+ private static final String SELECT_STAR_QUERY = "SELECT * FROM testTable";
+ // Hard-coded query filter.
+ protected static final String FILTER =
+ " WHERE column1 > 100000000" + " AND column2 BETWEEN 20000000 AND
1000000000" + " AND column3 <> 'w'"
+ + " AND (column6 < 500000 OR column7 NOT IN (225, 407))" + " AND
daysSinceEpoch = 1756015683";
+
+ private IndexSegment _indexSegment;
+ // Contains 2 identical index segments.
+ private List<IndexSegment> _indexSegments;
+
+ private TableConfig _tableConfig;
+ private List<String> _invertedIndexColumns;
+ private List<String> _noDictionaryColumns;
+ private List<String> _rangeIndexColumns;
+
+ @BeforeMethod
+ public void buildSegment()
+ throws Exception {
+ FileUtils.deleteQuietly(INDEX_DIR);
+
+ // Get resource file path.
+ URL resource = getClass().getClassLoader().getResource(AVRO_DATA);
+ assertNotNull(resource);
+ String filePath = resource.getFile();
+
+ createSegment(filePath, SEGMENT_NAME_1);
+ createSegment(filePath, SEGMENT_NAME_2);
+
+ ImmutableSegment immutableSegment1 =
loadSegmentWithMetadataChecks(SEGMENT_NAME_1);
+ ImmutableSegment immutableSegment2 =
loadSegmentWithMetadataChecks(SEGMENT_NAME_2);
+
+ // immutableSegment1 checks
+ assertNotNull(immutableSegment1.getForwardIndex("column1"));
+ assertNull(immutableSegment1.getDictionary("column1"));
+ assertNotNull(immutableSegment1.getForwardIndex("column2"));
+ assertNull(immutableSegment1.getDictionary("column2"));
+ assertNotNull(immutableSegment1.getForwardIndex("column3"));
+ assertNull(immutableSegment1.getDictionary("column3"));
+ assertNotNull(immutableSegment1.getForwardIndex("column6"));
+ assertNull(immutableSegment1.getDictionary("column6"));
+ assertNotNull(immutableSegment1.getForwardIndex("column7"));
+ assertNull(immutableSegment1.getDictionary("column7"));
+ assertNotNull(immutableSegment1.getForwardIndex("column10"));
+ assertNull(immutableSegment1.getDictionary("column10"));
+
+ // immutableSegment2 checks
+ assertNotNull(immutableSegment2.getForwardIndex("column1"));
+ assertNull(immutableSegment2.getDictionary("column1"));
+ assertNotNull(immutableSegment2.getForwardIndex("column2"));
+ assertNull(immutableSegment2.getDictionary("column2"));
+ assertNotNull(immutableSegment2.getForwardIndex("column3"));
+ assertNull(immutableSegment2.getDictionary("column3"));
+ assertNotNull(immutableSegment2.getForwardIndex("column6"));
+ assertNull(immutableSegment2.getDictionary("column6"));
+ assertNotNull(immutableSegment2.getForwardIndex("column7"));
+ assertNull(immutableSegment2.getDictionary("column7"));
+ assertNotNull(immutableSegment2.getForwardIndex("column10"));
+ assertNull(immutableSegment2.getDictionary("column10"));
+
+ _indexSegment = immutableSegment1;
+ _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
+ }
+
+ private void createSegment(String filePath, String segmentName)
+ throws Exception {
+ _rangeIndexColumns = new ArrayList<>(Arrays.asList("column10"));
+
+ _noDictionaryColumns =
+ new ArrayList<>(Arrays.asList("column1", "column2", "column3",
"column5", "column6", "column7", "column10"));
+ List<FieldConfig> fieldConfigs = new ArrayList<>();
+ for (String column : _noDictionaryColumns) {
+ fieldConfigs.add(new FieldConfig(column, FieldConfig.EncodingType.RAW,
Collections.emptyList(),
+ FieldConfig.CompressionCodec.SNAPPY, null));
+ }
+
+ _tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setNoDictionaryColumns(_noDictionaryColumns).setTableName("testTable")
+
.setTimeColumnName("daysSinceEpoch").setFieldConfigList(fieldConfigs).build();
+
+ // Create the segment generator config.
+ SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(_tableConfig, SCHEMA);
+ segmentGeneratorConfig.setInputFilePath(filePath);
+ segmentGeneratorConfig.setTableName("testTable");
+ segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+ segmentGeneratorConfig.setSegmentName(segmentName);
+ _invertedIndexColumns = Arrays.asList("column8", "column9");
+
segmentGeneratorConfig.setInvertedIndexCreationColumns(_invertedIndexColumns);
+ segmentGeneratorConfig.setRawIndexCreationColumns(_noDictionaryColumns);
+ segmentGeneratorConfig.setRangeIndexCreationColumns(_rangeIndexColumns);
+ // The segment generation code in SegmentColumnarIndexCreator will throw
+ // exception if start and end time in time column are not in acceptable
+ // range. For this test, we first need to fix the input avro data
+ // to have the time column values in allowed range. Until then, the check
+ // is explicitly disabled
+ segmentGeneratorConfig.setSkipTimeValueCheck(true);
+
+ // Build the index segment.
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig);
+ driver.build();
+ }
+
+ private ImmutableSegment loadSegmentWithMetadataChecks(String segmentName)
+ throws Exception {
+ IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+ indexLoadingConfig.setTableConfig(_tableConfig);
+ indexLoadingConfig.setInvertedIndexColumns(new
HashSet<>(_invertedIndexColumns));
+ indexLoadingConfig.setNoDictionaryColumns(new
HashSet<>(_noDictionaryColumns));
+ indexLoadingConfig.setReadMode(ReadMode.heap);
+
+ ImmutableSegment immutableSegment =
+ ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName),
indexLoadingConfig);
+
+ Map<String, ColumnMetadata> columnMetadataMap1 =
immutableSegment.getSegmentMetadata().getColumnMetadataMap();
+ columnMetadataMap1.forEach((column, metadata) -> {
+ if (_invertedIndexColumns.contains(column)) {
+ assertTrue(metadata.hasDictionary());
+ assertNotNull(immutableSegment.getInvertedIndex(column));
+ assertNotNull(immutableSegment.getForwardIndex(column));
+ } else if (_noDictionaryColumns.contains(column)) {
+ assertFalse(metadata.hasDictionary());
+ assertNotNull(immutableSegment.getForwardIndex(column));
+ }
+ });
+
+ return immutableSegment;
+ }
+
+ @AfterMethod
+ public void deleteAndDestroySegment() {
+ FileUtils.deleteQuietly(INDEX_DIR);
+ _indexSegments.forEach((IndexSegment::destroy));
+ }
+
+ @Override
+ protected String getFilter() {
+ return FILTER;
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+
+ @Test
+ public void testSelectQueriesAfterReload()
+ throws Exception {
+ String query =
+ "SELECT column1, column2, column3, column6, column7, column10 FROM
testTable WHERE column10 > 674022574 AND "
+ + "column1 > 100000000 AND column2 BETWEEN 20000000 AND 1000000000
AND column3 <> 'w' AND (column6 < "
+ + "500000 OR column7 NOT IN (225, 407)) AND daysSinceEpoch =
1756015683 ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 1184L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 1384L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 913464L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "column1", "column2", "column3", "column6", "column7", "column10"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT_ARRAY,
DataSchema.ColumnDataType.INT_ARRAY, DataSchema.ColumnDataType.INT
+ });
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows1 = resultTable.getRows();
+
+ changePropertiesAndReloadSegment();
+
+ // Run the same query again.
+ brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 1184L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 1384L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 250896L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+
+ dataSchema = new DataSchema(new String[]{
+ "column1", "column2", "column3", "column6", "column7", "column10"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT_ARRAY,
DataSchema.ColumnDataType.INT_ARRAY, DataSchema.ColumnDataType.INT
+ });
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows2 = resultTable.getRows();
+
+ validateBeforeAfterQueryResults(resultRows1, resultRows2);
+ }
+
+ @Test
+ public void testSelectWithDistinctQueriesAfterReload()
+ throws Exception {
+ String query = "SELECT DISTINCT column1, column2, column3, column6,
column7, column10 FROM testTable ORDER BY "
+ + "column1 LIMIT 10";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(),
2400000L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "column1", "column2", "column3", "column6", "column7", "column10"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.INT
+ });
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows1 = resultTable.getRows();
+
+ changePropertiesAndReloadSegment();
+
+ brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(),
2400000L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ dataSchema = new DataSchema(new String[]{
+ "column1", "column2", "column3", "column6", "column7", "column10"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.INT
+ });
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows2 = resultTable.getRows();
+
+ validateBeforeAfterQueryResults(resultRows1, resultRows2);
+ }
+
+ @Test
+ public void testSelectWithGroupByOrderByQueriesWithReload()
+ throws Exception {
+ String query =
+ "SELECT column1, column7 FROM testTable GROUP BY column1, column7
ORDER BY column1, column7 " + " LIMIT 10";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 400000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(),
800000L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new
String[]{"column1", "column7"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.INT}));
+ List<Object[]> resultRows1 = resultTable.getRows();
+ int previousVal = -1;
+ for (Object[] resultRow : resultRows1) {
+ assertEquals(resultRow.length, 2);
+ assertTrue((int) resultRow[0] >= previousVal);
+ previousVal = (int) resultRow[0];
+ }
+
+ changePropertiesAndReloadSegment();
+
+ brokerResponseNative = getBrokerResponse(query);
+ resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 400000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(),
800000L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new
String[]{"column1", "column7"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.INT}));
+ List<Object[]> resultRows2 = resultTable.getRows();
+ previousVal = -1;
+ for (Object[] resultRow : resultRows2) {
+ assertEquals(resultRow.length, 2);
+ assertTrue((int) resultRow[0] >= previousVal);
+ previousVal = (int) resultRow[0];
+ }
+
+ validateBeforeAfterQueryResults(resultRows1, resultRows2);
+ }
+
+ @Test
+ public void testSelectWithAggregationQueriesWithReload()
+ throws Exception {
+ // TEST 1.
+ String query = "SELECT MAX(ARRAYLENGTH(column7)) from testTable LIMIT 10";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(),
400000);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new
String[]{"max(arraylength(column7))"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE}));
+ List<Object[]> beforeResultRows1 = resultTable.getRows();
+
+ // TEST2
+ query =
+ "SELECT MAX(column1), MIN(column1), MAX(column2), MIN(column2),
MAXMV(column6), MINMV(column6), MAXMV"
+ + "(column7), MINMV(column7), MAX(column10), MIN(column10) FROM
testTable";
+ brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 0);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{
+ "max(column1)", "min(column1)",
+ "max" + "(column2)", "min(column2)", "maxmv(column6)",
"minmv(column6)",
+ "maxmv" + "(column7)", "minmv(column7)", "max(column10)",
"min(column10)"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE
+ }));
+ List<Object[]> beforeResultRows2 = resultTable.getRows();
+
+ // TEST3
+ query = "SELECT column1, max(column1), sum(column10) from testTable WHERE
column7 = 2147483647 GROUP BY "
+ + "column1 ORDER BY column1";
+ brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 199_756L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(),
399_512L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 536360L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(),
+ new DataSchema(new String[]{"column1", "max(column1)",
"sum(column10)"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE,
DataSchema.ColumnDataType.DOUBLE
+ }));
+ List<Object[]> beforeResultRows3 = resultTable.getRows();
+
+ changePropertiesAndReloadSegment();
+
+ query = "SELECT MAX(ARRAYLENGTH(column7)) from testTable LIMIT 10";
+ brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(),
400000);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new
String[]{"max(arraylength(column7))"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE}));
+ List<Object[]> afterResultRows1 = resultTable.getRows();
+
+ validateBeforeAfterQueryResults(beforeResultRows1, afterResultRows1);
+
+ query =
+ "SELECT MAX(column1), MIN(column1), MAX(column2), MIN(column2),
MAXMV(column6), MINMV(column6), MAXMV"
+ + "(column7), MINMV(column7), MAX(column10), MIN(column10) FROM
testTable";
+ brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 0);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{
+ "max(column1)", "min(column1)",
+ "max" + "(column2)", "min(column2)", "maxmv(column6)",
"minmv(column6)",
+ "maxmv" + "(column7)", "minmv(column7)", "max(column10)",
"min(column10)"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE
+ }));
+ List<Object[]> afterResultRows2 = resultTable.getRows();
+
+ validateBeforeAfterQueryResults(beforeResultRows2, afterResultRows2);
+
+ query = "SELECT column1, max(column1), sum(column10) from testTable WHERE
column7 = 2147483647 GROUP BY "
+ + "column1 ORDER BY column1";
+ brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 199_756L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(),
399_512L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(),
+ new DataSchema(new String[]{"column1", "max(column1)",
"sum(column10)"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE,
DataSchema.ColumnDataType.DOUBLE
+ }));
+ List<Object[]> afterResultRows3 = resultTable.getRows();
+
+ validateBeforeAfterQueryResults(beforeResultRows3, afterResultRows3);
+ }
+
+ @Test
+ public void testRangeIndexAfterReload()
+ throws Exception {
+ String query = "select count(*) from testTable where column10 > 674022574";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable1 = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 103280L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 0L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 400000L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "count(*)"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG
+ });
+ assertEquals(resultTable1.getDataSchema(), dataSchema);
+ List<Object[]> resultRows1 = resultTable1.getRows();
+
+ changePropertiesAndReloadSegment();
+
+ brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ resultTable1 = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 103280L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 0L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+
+ dataSchema = new DataSchema(new String[]{
+ "count(*)"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG
+ });
+ assertEquals(resultTable1.getDataSchema(), dataSchema);
+ List<Object[]> resultRows2 = resultTable1.getRows();
+
+ validateBeforeAfterQueryResults(resultRows1, resultRows2);
+ }
+
+ private void validateBeforeAfterQueryResults(List<Object[]> beforeResults,
List<Object[]> afterResults) {
+ assertEquals(beforeResults.size(), afterResults.size());
+ for (int i = 0; i < beforeResults.size(); i++) {
+ Object[] resultRow1 = beforeResults.get(i);
+ Object[] resultRow2 = afterResults.get(i);
+ assertEquals(resultRow1.length, resultRow2.length);
+ for (int j = 0; j < resultRow1.length; j++) {
+ assertEquals(resultRow1[j], resultRow2[j]);
+ }
+ }
+ }
+
+ /**
+ * As a part of segmentReload, the ForwardIndexHandler will perform the
following operations:
+ *
+ * column1 -> change compression.
+ * column6 -> change compression
+ * column3 -> Enable dictionary.
+ * column2 -> Enable dictionary. Add inverted index.
+ * column7 -> Enable dictionary. Add inverted index.
+ * column10 -> Enable dictionary.
+ */
+ private void changePropertiesAndReloadSegment()
+ throws Exception {
+ List<FieldConfig> newFieldConfigs = new ArrayList<>();
+ newFieldConfigs.add(new FieldConfig("column1",
FieldConfig.EncodingType.RAW, Collections.emptyList(),
+ FieldConfig.CompressionCodec.ZSTANDARD, null));
+ newFieldConfigs.add(new FieldConfig("column6",
FieldConfig.EncodingType.RAW, Collections.emptyList(),
+ FieldConfig.CompressionCodec.LZ4, null));
+ _tableConfig.setFieldConfigList(newFieldConfigs);
+
+ IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null,
_tableConfig);
+ indexLoadingConfig.setTableConfig(_tableConfig);
+ Set<String> invertedIndexEnabledColumns = new
HashSet<>(_invertedIndexColumns);
+ invertedIndexEnabledColumns.add("column2");
+ invertedIndexEnabledColumns.add("column7");
+ indexLoadingConfig.setInvertedIndexColumns(invertedIndexEnabledColumns);
+ Set<String> noDictionaryColumns = new HashSet<>(_noDictionaryColumns);
+ indexLoadingConfig.setNoDictionaryColumns(noDictionaryColumns);
+ indexLoadingConfig.getNoDictionaryColumns().remove("column2");
+ indexLoadingConfig.getNoDictionaryColumns().remove("column3");
+ indexLoadingConfig.getNoDictionaryColumns().remove("column7");
+ indexLoadingConfig.getNoDictionaryColumns().remove("column10");
+ Set<String> rangeIndexColumns = new HashSet<>(_rangeIndexColumns);
+ indexLoadingConfig.setRangeIndexColumns(rangeIndexColumns);
+ indexLoadingConfig.setReadMode(ReadMode.heap);
+
+ // Reload the segments to pick up the new configs
+ File indexDir = new File(INDEX_DIR, SEGMENT_NAME_1);
+ ImmutableSegment immutableSegment1 = reloadSegment(indexDir,
indexLoadingConfig, SCHEMA);
+ indexDir = new File(INDEX_DIR, SEGMENT_NAME_2);
+ ImmutableSegment immutableSegment2 = reloadSegment(indexDir,
indexLoadingConfig, SCHEMA);
+ _indexSegment = immutableSegment1;
+ _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
+
+ // immutableSegment1 checks
+ assertNotNull(immutableSegment1.getForwardIndex("column1"));
+ assertNull(immutableSegment1.getDictionary("column1"));
+ assertNotNull(immutableSegment1.getForwardIndex("column2"));
+ assertNotNull(immutableSegment1.getDictionary("column2"));
+ assertNotNull(immutableSegment1.getForwardIndex("column3"));
+ assertNotNull(immutableSegment1.getDictionary("column3"));
+ assertNotNull(immutableSegment1.getForwardIndex("column6"));
+ assertNull(immutableSegment1.getDictionary("column6"));
+ assertNotNull(immutableSegment1.getForwardIndex("column7"));
+ assertNotNull(immutableSegment1.getDictionary("column7"));
+ assertNotNull(immutableSegment1.getForwardIndex("column10"));
+ assertNotNull(immutableSegment1.getDictionary("column10"));
+
+ // immutableSegment2 checks
+ assertNotNull(immutableSegment2.getForwardIndex("column1"));
+ assertNull(immutableSegment2.getDictionary("column1"));
+ assertNotNull(immutableSegment2.getForwardIndex("column2"));
+ assertNotNull(immutableSegment2.getDictionary("column2"));
+ assertNotNull(immutableSegment2.getForwardIndex("column3"));
+ assertNotNull(immutableSegment2.getDictionary("column3"));
+ assertNotNull(immutableSegment2.getForwardIndex("column6"));
+ assertNull(immutableSegment2.getDictionary("column6"));
+ assertNotNull(immutableSegment2.getForwardIndex("column7"));
+ assertNotNull(immutableSegment2.getDictionary("column7"));
+ assertNotNull(immutableSegment2.getForwardIndex("column10"));
+ assertNotNull(immutableSegment2.getDictionary("column10"));
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index 9ec3804563..7a1d918199 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -152,12 +152,17 @@ public class ForwardIndexHandler implements IndexHandler {
for (String column : existingAllColumns) {
if (existingNoDictColumns.contains(column) &&
!newNoDictColumns.contains(column)) {
+ // Existing column is RAW. New column is dictionary enabled.
if (_schema == null || _indexLoadingConfig.getTableConfig() == null) {
// This can only happen in tests.
LOGGER.warn("Cannot enable dictionary for column={} as schema or
tableConfig is null.", column);
continue;
}
- // Existing column is RAW. New column is dictionary enabled.
+
+ // Note that RAW columns cannot be sorted.
+ ColumnMetadata existingColMetadata =
_segmentMetadata.getColumnMetadataFor(column);
+ Preconditions.checkState(!existingColMetadata.isSorted(), "Raw
column=" + column + " cannot be sorted.");
+
columnOperationMap.put(column, Operation.ENABLE_DICTIONARY);
} else if (existingNoDictColumns.contains(column) &&
newNoDictColumns.contains(column)) {
// Both existing and new column is RAW forward index encoded. Check if
compression needs to be changed.
@@ -439,9 +444,8 @@ public class ForwardIndexHandler implements IndexHandler {
File dictionaryFile = new File(indexDir, column +
V1Constants.Dict.FILE_EXTENSION);
String fwdIndexFileExtension;
if (isSingleValue) {
- fwdIndexFileExtension =
- existingColMetadata.isSorted() ?
V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION
- : V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
+ // Raw columns cannot be sorted.
+ fwdIndexFileExtension =
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
} else {
fwdIndexFileExtension =
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
index f49b7755ce..5492e363d7 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -45,6 +45,7 @@ import org.apache.pinot.segment.spi.index.IndexingOverrides;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -88,9 +89,6 @@ public class ForwardIndexHandlerTest {
private static final String DIM_ZSTANDARD_BYTES = "DIM_ZSTANDARD_BYTES";
private static final String DIM_LZ4_BYTES = "DIM_LZ4_BYTES";
- // Sorted column
- private static final String DIM_PASS_THROUGH_SORTED_LONG =
"DIM_PASS_THROUGH_SORTED_LONG";
-
// Dictionary columns
private static final String DIM_DICT_INTEGER = "DIM_DICT_INTEGER";
private static final String DIM_DICT_STRING = "DIM_DICT_STRING";
@@ -124,8 +122,7 @@ public class ForwardIndexHandlerTest {
private static final List<String> RAW_PASS_THROUGH_INDEX_COLUMNS =
Arrays.asList(DIM_PASS_THROUGH_STRING, DIM_PASS_THROUGH_LONG,
DIM_PASS_THROUGH_INTEGER, DIM_PASS_THROUGH_BYTES,
METRIC_PASS_THROUGH_BIG_DECIMAL, METRIC_PASS_THROUGH_INTEGER,
DIM_MV_PASS_THROUGH_INTEGER,
- DIM_MV_PASS_THROUGH_LONG, DIM_MV_PASS_THROUGH_STRING,
DIM_MV_PASS_THROUGH_BYTES,
- DIM_PASS_THROUGH_SORTED_LONG);
+ DIM_MV_PASS_THROUGH_LONG, DIM_MV_PASS_THROUGH_STRING,
DIM_MV_PASS_THROUGH_BYTES);
private static final List<String> RAW_LZ4_INDEX_COLUMNS =
Arrays.asList(DIM_LZ4_STRING, DIM_LZ4_LONG, DIM_LZ4_INTEGER,
DIM_LZ4_BYTES, METRIC_LZ4_BIG_DECIMAL,
@@ -207,7 +204,6 @@ public class ForwardIndexHandlerTest {
.addSingleValueDimension(DIM_DICT_INTEGER, FieldSpec.DataType.INT)
.addSingleValueDimension(DIM_DICT_LONG, FieldSpec.DataType.LONG)
.addSingleValueDimension(DIM_DICT_STRING, FieldSpec.DataType.STRING)
- .addSingleValueDimension(DIM_PASS_THROUGH_SORTED_LONG,
FieldSpec.DataType.LONG)
.addMetric(METRIC_PASS_THROUGH_INTEGER, FieldSpec.DataType.INT)
.addMetric(METRIC_SNAPPY_INTEGER,
FieldSpec.DataType.INT).addMetric(METRIC_LZ4_INTEGER, FieldSpec.DataType.INT)
.addMetric(METRIC_ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
@@ -332,9 +328,6 @@ public class ForwardIndexHandlerTest {
row.putValue(DIM_MV_PASS_THROUGH_STRING, tempMVStringRows[i]);
row.putValue(DIM_MV_PASS_THROUGH_BYTES, tempMVByteRows[i]);
- // Sorted columns
- row.putValue(DIM_PASS_THROUGH_SORTED_LONG, (long) i);
-
rows.add(row);
}
return rows;
@@ -370,13 +363,6 @@ public class ForwardIndexHandlerTest {
operationMap = fwdIndexHandler.computeOperation(writer);
assertEquals(operationMap.get(DIM_MV_PASS_THROUGH_STRING),
ForwardIndexHandler.Operation.ENABLE_DICTIONARY);
- // TEST4: Enable dictionary for a sorted column.
- indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
-
indexLoadingConfig.getNoDictionaryColumns().remove(DIM_PASS_THROUGH_SORTED_LONG);
- fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata,
indexLoadingConfig, _schema);
- operationMap = fwdIndexHandler.computeOperation(writer);
- assertEquals(operationMap.get(DIM_PASS_THROUGH_SORTED_LONG),
ForwardIndexHandler.Operation.ENABLE_DICTIONARY);
-
// TEST5: Enable dictionary for a dict column. Should be a No-op.
indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata,
indexLoadingConfig, _schema);
@@ -398,7 +384,7 @@ public class ForwardIndexHandlerTest {
operationMap = fwdIndexHandler.computeOperation(writer);
assertEquals(operationMap, Collections.EMPTY_MAP);
- // TEST8: Add text index and disable forward index.
+ // TEST8: Add text index and enable dictionary.
indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
indexLoadingConfig.getRangeIndexColumns().add(METRIC_LZ4_INTEGER);
indexLoadingConfig.getNoDictionaryColumns().remove(METRIC_LZ4_INTEGER);
@@ -465,7 +451,7 @@ public class ForwardIndexHandlerTest {
}
@Test(priority = 1)
- public void testRewriteRawForwardIndexForSingleColumn()
+ public void testChangeCompressionForSingleColumn()
throws Exception {
for (int i = 0; i < _noDictionaryColumns.size(); i++) {
// For every noDictionaryColumn, change the compressionType to all
available types, one by one.
@@ -501,6 +487,7 @@ public class ForwardIndexHandlerTest {
segmentLocalFSDirectory.close();
// Validation
+ testIndexExists(columnName, ColumnIndexType.FORWARD_INDEX);
validateIndexMap(columnName, false);
validateForwardIndex(columnName, newCompressionType);
@@ -517,7 +504,7 @@ public class ForwardIndexHandlerTest {
}
@Test(priority = 2)
- public void testRewriteRawForwardIndexForMultipleColumns()
+ public void testChangeCompressionForMultipleColumns()
throws Exception {
// Setup
SegmentMetadataImpl existingSegmentMetadata = new
SegmentMetadataImpl(_segmentDirectory);
@@ -559,6 +546,7 @@ public class ForwardIndexHandlerTest {
// Tear down before validation. Because columns.psf and index map cleanup
happens at segmentDirectory.close()
segmentLocalFSDirectory.close();
+ testIndexExists(column1, ColumnIndexType.FORWARD_INDEX);
validateIndexMap(column1, false);
validateForwardIndex(column1, newCompressionType);
// Validate metadata properties. Nothing should change when a forwardIndex
is rewritten for compressionType
@@ -569,6 +557,7 @@ public class ForwardIndexHandlerTest {
metadata.isSorted(), metadata.isSingleValue(),
metadata.getMaxNumberOfMultiValues(),
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
metadata.getMinValue(), metadata.getMaxValue());
+ testIndexExists(column2, ColumnIndexType.FORWARD_INDEX);
validateIndexMap(column2, false);
validateForwardIndex(column2, newCompressionType);
metadata = existingSegmentMetadata.getColumnMetadataFor(column2);
@@ -601,6 +590,8 @@ public class ForwardIndexHandlerTest {
segmentLocalFSDirectory.close();
// Col1 validation.
+ testIndexExists(col1, ColumnIndexType.FORWARD_INDEX);
+ testIndexExists(col1, ColumnIndexType.DICTIONARY);
validateIndexMap(col1, true);
validateForwardIndex(col1, null);
// In column metadata, nothing other than hasDictionary and
dictionaryElementSize should change.
@@ -619,6 +610,8 @@ public class ForwardIndexHandlerTest {
metadata.getMinValue(), metadata.getMaxValue());
// Col2 validation.
+ testIndexExists(col2, ColumnIndexType.FORWARD_INDEX);
+ testIndexExists(col2, ColumnIndexType.DICTIONARY);
validateIndexMap(col2, true);
validateForwardIndex(col2, null);
// In column metadata, nothing other than hasDictionary and
dictionaryElementSize should change.
@@ -657,6 +650,8 @@ public class ForwardIndexHandlerTest {
// Tear down before validation. Because columns.psf and index map
cleanup happens at segmentDirectory.close()
segmentLocalFSDirectory.close();
+ testIndexExists(column, ColumnIndexType.FORWARD_INDEX);
+ testIndexExists(column, ColumnIndexType.DICTIONARY);
validateIndexMap(column, true);
validateForwardIndex(column, null);
@@ -735,11 +730,7 @@ public class ForwardIndexHandlerTest {
}
case LONG: {
if (isSingleValue) {
- if (isSorted) {
- assertEquals((long) val, rowIdx, columnName + " " + rowIdx +
" " + expectedCompressionType);
- } else {
- assertEquals((long) val, 1001L, columnName + " " + rowIdx +
" " + expectedCompressionType);
- }
+ assertEquals((long) val, 1001L, columnName + " " + rowIdx + "
" + expectedCompressionType);
} else {
Object[] values = (Object[]) val;
int length = values.length;
@@ -777,6 +768,15 @@ public class ForwardIndexHandlerTest {
}
}
+ private void testIndexExists(String columnName, ColumnIndexType indexType)
throws Exception {
+ SegmentMetadataImpl existingSegmentMetadata = new
SegmentMetadataImpl(_segmentDirectory);
+ SegmentDirectory segmentLocalFSDirectory =
+ new SegmentLocalFSDirectory(_segmentDirectory,
existingSegmentMetadata, ReadMode.mmap);
+ SegmentDirectory.Reader reader = segmentLocalFSDirectory.createReader();
+
+ assertTrue(reader.hasIndexFor(columnName, indexType));
+ }
+
private void validateIndexMap(String columnName, boolean dictionaryEnabled)
throws IOException {
// Panic validation to make sure all columns have only one forward index
entry in index map.
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 4ad93b8c6e..3a9eb19427 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -356,33 +356,66 @@ public class SegmentPreProcessorTest {
}
@Test
- public void testForwardIndexHandlerEnableDictionary()
+ public void testSimpleEnableDictionarySV()
throws Exception {
// Add raw columns in indexingConfig so that the ForwardIndexHandler
doesn't end up converting them to dictionary
// enabled columns
- _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW_MV);
_indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW);
_indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW);
// TEST 1. Check running forwardIndexHandler on a V1 segment. No-op for
all existing raw columns.
constructV1Segment(Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
- checkForwardIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _schema, false,
true, false, 26, null, true, 0,
- DataType.STRING, 100000);
- validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5,
3, _schema, false, false, false, 0, true,
- 0, ChunkCompressionType.LZ4, false, DataType.STRING, 100000);
- validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_INT_COL_RAW_MV,
18499, 15, _schema, false, false, false, 0,
- false, 13, ChunkCompressionType.LZ4, false, DataType.INT, 106688);
+ checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false,
false, false, 0, ChunkCompressionType.LZ4,
+ true, 0, DataType.STRING, 100000);
validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_INT_COL_RAW, 42242,
16, _schema, false, false, false, 0, true,
0, ChunkCompressionType.LZ4, false, DataType.INT, 100000);
// Convert the segment to V3.
new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
- // TEST 2: Run reload with no-changes.
- checkForwardIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _schema, false,
true, false, 26, null, true, 0,
+ // TEST 2: Enable dictionary on EXISTING_STRING_COL_RAW
+
_indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_STRING_COL_RAW);
+ checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false,
true, false, 4, null, true, 0,
DataType.STRING, 100000);
- // TEST 3: EXISTING_STRING_COL_RAW. Enable dictionary. Also add inverted
index and text index. Reload code path
+ // TEST 3: Enable dictionary on EXISTING_INT_COL_RAW
+ _indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_INT_COL_RAW);
+ checkForwardIndexCreation(EXISTING_INT_COL_RAW, 42242, 16, _schema, false,
true, false, 0, null, true, 0,
+ DataType.INT, 100000);
+ }
+
+ @Test
+ public void testSimpleEnableDictionaryMV()
+ throws Exception {
+ // Add raw columns in indexingConfig so that the ForwardIndexHandler
doesn't end up converting them to dictionary
+ // enabled columns
+ _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW_MV);
+
+ // TEST 1. Check running forwardIndexHandler on a V1 segment. No-op for
all existing raw columns.
+ constructV1Segment(Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
+ checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, 18499, 15, _schema,
false, false, false, 0,
+ ChunkCompressionType.LZ4, false, 13, DataType.INT, 106688);
+
+ // Convert the segment to V3.
+ new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
+
+ // TEST 2: Enable dictionary on EXISTING_STRING_COL_RAW
+
_indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_INT_COL_RAW_MV);
+ checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, 18499, 15, _schema,
false, true, false, 0, null, false, 13,
+ DataType.INT, 106688);
+ }
+
+ @Test
+ public void testEnableDictAndOtherIndexesSV()
+ throws Exception {
+ // Add raw columns in indexingConfig so that the ForwardIndexHandler
doesn't end up converting them to dictionary
+ // enabled columns
+ _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW);
+ _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW);
+ constructV1Segment(Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
+ new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
+
+ // TEST 1: EXISTING_STRING_COL_RAW. Enable dictionary. Also add inverted
index and text index. Reload code path
// will create dictionary, inverted index and text index.
_indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_STRING_COL_RAW);
_indexLoadingConfig.getInvertedIndexColumns().add(EXISTING_STRING_COL_RAW);
@@ -394,7 +427,7 @@ public class SegmentPreProcessorTest {
validateIndex(ColumnIndexType.TEXT_INDEX, EXISTING_STRING_COL_RAW, 5, 3,
_schema, false, true, false, 4, true, 0,
null, false, DataType.STRING, 100000);
- // TEST4: EXISTING_STRING_COL_RAW. Enable dictionary on a raw column that
already has text index.
+ // TEST 2: EXISTING_STRING_COL_RAW. Enable dictionary on a raw column that
already has text index.
List<String> textIndexCols = new ArrayList<>();
textIndexCols.add(EXISTING_STRING_COL_RAW);
constructV1Segment(Collections.emptyList(), textIndexCols,
Collections.emptyList());
@@ -410,7 +443,7 @@ public class SegmentPreProcessorTest {
// Add it back so that this column is not rewritten for the other tests
below.
_indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW);
- // TEST 5: EXISTING_INT_COL_RAW. Enable dictionary on a column that
already has range index.
+ // TEST 3: EXISTING_INT_COL_RAW. Enable dictionary on a column that
already has range index.
List<String> rangeIndexCols = new ArrayList<>();
rangeIndexCols.add(EXISTING_INT_COL_RAW);
constructV1Segment(Collections.emptyList(), Collections.emptyList(),
rangeIndexCols);
@@ -427,14 +460,18 @@ public class SegmentPreProcessorTest {
null, false, DataType.INT, 100000);
// Add it back so that this column is not rewritten for the other tests
below.
_indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW);
+ }
- // TEST 6: EXISTING_INT_COL_RAW_MV. Enable dictionary for an MV column.
Also enable inverted index and range index.
+ @Test
+ public void testEnableDictAndOtherIndexesMV()
+ throws Exception {
+ // Add raw columns in indexingConfig so that the ForwardIndexHandler
doesn't end up converting them to dictionary
+ // enabled columns
+ _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW_MV);
constructV1Segment(Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
- validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_INT_COL_RAW_MV,
18499, 15, _schema, false, false, false, 0,
- false, 13, ChunkCompressionType.LZ4, false, DataType.INT, 106688);
- // Enable dictionary and inverted index.
+ // TEST 1: EXISTING_INT_COL_RAW_MV. Enable dictionary for an MV column.
Also enable inverted index and range index.
_indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_INT_COL_RAW_MV);
_indexLoadingConfig.getInvertedIndexColumns().add(EXISTING_INT_COL_RAW_MV);
_indexLoadingConfig.getRangeIndexColumns().add(EXISTING_INT_COL_RAW_MV);
@@ -444,6 +481,23 @@ public class SegmentPreProcessorTest {
false, 13, null, false, DataType.INT, 106688);
validateIndex(ColumnIndexType.RANGE_INDEX, EXISTING_INT_COL_RAW_MV, 18499,
15, _schema, false, true, false, 0,
false, 13, null, false, DataType.INT, 106688);
+
+ // TEST 2: EXISTING_INT_COL_RAW_MV. Enable dictionary for an MV column
that already has range index.
+ List<String> rangeIndexCols = new ArrayList<>();
+ rangeIndexCols.add(EXISTING_INT_COL_RAW_MV);
+ constructV1Segment(Collections.emptyList(), Collections.emptyList(),
rangeIndexCols);
+ new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
+ validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_INT_COL_RAW_MV,
18499, 15, _schema, false, false, false, 0,
+ false, 13, ChunkCompressionType.LZ4, false, DataType.INT, 106688);
+ validateIndex(ColumnIndexType.RANGE_INDEX, EXISTING_INT_COL_RAW_MV, 18499,
15, _schema, false, false, false, 0,
+ false, 13, ChunkCompressionType.LZ4, false, DataType.INT, 106688);
+
+ // Enable dictionary.
+
_indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_INT_COL_RAW_MV);
+ checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, 18499, 15, _schema,
false, true, false, 0, null, false, 13,
+ DataType.INT, 106688);
+ validateIndex(ColumnIndexType.RANGE_INDEX, EXISTING_INT_COL_RAW_MV, 18499,
15, _schema, false, true, false, 0,
+ false, 13, null, false, DataType.INT, 106688);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]