This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f7067dbead Fix raw index conversion from v4 (#14171)
f7067dbead is described below

commit f7067dbead7aa0b7376105470daa51fb5f3e3028
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Sun Oct 6 15:27:46 2024 -0700

    Fix raw index conversion from v4 (#14171)
---
 .../MultiNodesOfflineClusterIntegrationTest.java   |   9 +-
 .../tests/OfflineClusterIntegrationTest.java       | 170 +++++++++++++++++---
 .../BigDecimalColumnPreIndexStatsCollector.java    |  10 +-
 .../stats/BytesColumnPredIndexStatsCollector.java  |  10 +-
 .../stats/DoubleColumnPreIndexStatsCollector.java  |   5 +-
 .../stats/FloatColumnPreIndexStatsCollector.java   |   5 +-
 .../stats/IntColumnPreIndexStatsCollector.java     |   5 +-
 .../stats/LongColumnPreIndexStatsCollector.java    |   5 +-
 .../stats/MapColumnPreIndexStatsCollector.java     | 173 ++++++--------------
 .../stats/StringColumnPreIndexStatsCollector.java  |  10 +-
 .../segment/index/loader/ForwardIndexHandler.java  | 178 ++++++++++-----------
 .../segment/readers/PinotSegmentColumnReader.java  |   9 +-
 .../index/loader/ForwardIndexHandlerTest.java      |  78 +++++++++
 .../utils/builder/ControllerRequestURLBuilder.java |  39 ++++-
 14 files changed, 408 insertions(+), 298 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
index 2a6054f6ba..a981a930ce 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
@@ -296,7 +296,14 @@ public class MultiNodesOfflineClusterIntegrationTest 
extends OfflineClusterInteg
   // Disabled because with multiple replicas, there is no guarantee that all 
replicas are reloaded
   @Test(enabled = false)
   @Override
-  public void testDefaultColumns(boolean useMultiStageQueryEngineg) {
+  public void testDefaultColumns(boolean useMultiStageQueryEngine) {
+    // Ignored
+  }
+
+  // Disabled because with multiple replicas, there is no guarantee that all 
replicas are reloaded
+  @Test(enabled = false)
+  @Override
+  public void testForwardIndexTriggering() {
     // Ignored
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 2bcfcabef1..97f39f66a4 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -61,6 +61,7 @@ import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.http.HttpClient;
 import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import 
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.spi.config.instance.InstanceType;
@@ -191,8 +192,8 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     HelixConfigScope scope =
         new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
             .build();
-    _helixManager.getConfigAccessor().set(scope, 
CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY,
-        Integer.toString(12));
+    _helixManager.getConfigAccessor()
+        .set(scope, CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY, 
Integer.toString(12));
     startBrokers();
     startServers();
 
@@ -1295,7 +1296,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertEquals(getTableSize(getTableName()), _tableSize);
   }
 
-  @Test(dependsOnMethods = "testDefaultColumns")
+  @Test(dependsOnMethods = "testForwardIndexTriggering")
   public void testBloomFilterTriggering()
       throws Exception {
     long numTotalDocs = getCountStarResult();
@@ -1318,6 +1319,111 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertEquals(getTableSize(getTableName()), _tableSize);
   }
 
+  @Test(dependsOnMethods = "testDefaultColumns")
+  public void testForwardIndexTriggering()
+      throws Exception {
+    String column = "DestCityName";
+    JsonNode columnIndexSize = getColumnIndexSize(column);
+    assertTrue(columnIndexSize.has("dictionary"));
+    assertTrue(columnIndexSize.has("forward_index"));
+    double dictionarySize = columnIndexSize.get("dictionary").asDouble();
+    double forwardIndexSize = columnIndexSize.get("forward_index").asDouble();
+
+    // Convert 'DestCityName' to raw index
+    TableConfig tableConfig = getOfflineTableConfig();
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns();
+    assertNotNull(noDictionaryColumns);
+    noDictionaryColumns.add(column);
+    updateTableConfig(tableConfig);
+    long numTotalDocs = getCountStarResult();
+    reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
+    columnIndexSize = getColumnIndexSize(column);
+    assertFalse(columnIndexSize.has("dictionary"));
+    assertTrue(columnIndexSize.has("forward_index"));
+    double v2rawIndexSize = columnIndexSize.get("forward_index").asDouble();
+    assertTrue(v2rawIndexSize > forwardIndexSize);
+
+    // NOTE: Currently Pinot doesn't support directly changing raw index 
version, so we need to first reset it back to
+    //       dictionary encoding.
+    // TODO: Support it
+    resetForwardIndex(dictionarySize, forwardIndexSize);
+
+    // Convert 'DestCityName' to v4 raw index
+    List<FieldConfig> fieldConfigs = tableConfig.getFieldConfigList();
+    assertNotNull(fieldConfigs);
+    ForwardIndexConfig forwardIndexConfig = new 
ForwardIndexConfig.Builder().withRawIndexWriterVersion(4).build();
+    ObjectNode indexes = JsonUtils.newObjectNode();
+    indexes.set("forward", forwardIndexConfig.toJsonNode());
+    FieldConfig fieldConfig =
+        new 
FieldConfig.Builder(column).withEncodingType(FieldConfig.EncodingType.RAW).withIndexes(indexes).build();
+    fieldConfigs.add(fieldConfig);
+    updateTableConfig(tableConfig);
+    reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
+    columnIndexSize = getColumnIndexSize(column);
+    assertFalse(columnIndexSize.has("dictionary"));
+    assertTrue(columnIndexSize.has("forward_index"));
+    double v4RawIndexSize = columnIndexSize.get("forward_index").asDouble();
+    assertTrue(v4RawIndexSize < v2rawIndexSize && v4RawIndexSize > 
forwardIndexSize);
+
+    // Convert 'DestCityName' to SNAPPY compression
+    forwardIndexConfig =
+        new 
ForwardIndexConfig.Builder().withCompressionCodec(CompressionCodec.SNAPPY).withRawIndexWriterVersion(4)
+            .build();
+    indexes.set("forward", forwardIndexConfig.toJsonNode());
+    fieldConfig =
+        new 
FieldConfig.Builder(column).withEncodingType(FieldConfig.EncodingType.RAW).withIndexes(indexes).build();
+    fieldConfigs.set(fieldConfigs.size() - 1, fieldConfig);
+    updateTableConfig(tableConfig);
+    reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
+    columnIndexSize = getColumnIndexSize(column);
+    assertFalse(columnIndexSize.has("dictionary"));
+    assertTrue(columnIndexSize.has("forward_index"));
+    double v4SnappyRawIndexSize = 
columnIndexSize.get("forward_index").asDouble();
+    assertTrue(v4SnappyRawIndexSize > v2rawIndexSize);
+
+    // Removing FieldConfig should be no-op because compression is not 
explicitly set
+    fieldConfigs.remove(fieldConfigs.size() - 1);
+    updateTableConfig(tableConfig);
+    reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
+    columnIndexSize = getColumnIndexSize(column);
+    assertFalse(columnIndexSize.has("dictionary"));
+    assertTrue(columnIndexSize.has("forward_index"));
+    assertEquals(columnIndexSize.get("forward_index").asDouble(), 
v4SnappyRawIndexSize);
+
+    // Adding 'LZ4' compression explicitly should trigger the conversion
+    forwardIndexConfig = new 
ForwardIndexConfig.Builder().withCompressionCodec(CompressionCodec.LZ4).build();
+    indexes.set("forward", forwardIndexConfig.toJsonNode());
+    fieldConfig =
+        new 
FieldConfig.Builder(column).withEncodingType(FieldConfig.EncodingType.RAW).withIndexes(indexes).build();
+    fieldConfigs.add(fieldConfig);
+    updateTableConfig(tableConfig);
+    reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
+    columnIndexSize = getColumnIndexSize(column);
+    assertFalse(columnIndexSize.has("dictionary"));
+    assertTrue(columnIndexSize.has("forward_index"));
+    assertEquals(columnIndexSize.get("forward_index").asDouble(), 
v2rawIndexSize);
+
+    resetForwardIndex(dictionarySize, forwardIndexSize);
+  }
+
+  private JsonNode getColumnIndexSize(String column)
+      throws Exception {
+    return JsonUtils.stringToJsonNode(
+            
sendGetRequest(_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(),
 List.of(column))))
+        .get("columnIndexSizeMap").get(column);
+  }
+
+  private void resetForwardIndex(double expectedDictionarySize, double 
expectedForwardIndexSize)
+      throws Exception {
+    TableConfig tableConfig = createOfflineTableConfig();
+    updateTableConfig(tableConfig);
+    reloadAllSegments(SELECT_STAR_QUERY, false, getCountStarResult());
+    JsonNode columnIndexSize = getColumnIndexSize("DestCityName");
+    assertEquals(columnIndexSize.get("dictionary").asDouble(), 
expectedDictionarySize);
+    assertEquals(columnIndexSize.get("forward_index").asDouble(), 
expectedForwardIndexSize);
+  }
+
   /**
    * Check if server returns error response quickly without timing out Broker.
    */
@@ -1617,10 +1723,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
 
     // Verify the index sizes
     JsonNode columnIndexSizeMap = JsonUtils.stringToJsonNode(sendGetRequest(
-            getControllerBaseApiUrl() + 
"/tables/mytable/metadata?columns=DivAirportSeqIDs"
-                + 
"&columns=NewAddedDerivedDivAirportSeqIDs&columns=NewAddedDerivedDivAirportSeqIDsString"
-                + 
"&columns=NewAddedRawDerivedStringDimension&columns=NewAddedRawDerivedMVIntDimension"
-                + "&columns=NewAddedDerivedNullString"))
+            
_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(),
+                List.of("DivAirportSeqIDs", "NewAddedDerivedDivAirportSeqIDs", 
"NewAddedDerivedDivAirportSeqIDsString",
+                    "NewAddedRawDerivedStringDimension", 
"NewAddedRawDerivedMVIntDimension",
+                    "NewAddedDerivedNullString"))))
         .get("columnIndexSizeMap");
     assertEquals(columnIndexSizeMap.size(), 6);
     JsonNode originalColumnIndexSizes = 
columnIndexSizeMap.get("DivAirportSeqIDs");
@@ -1675,7 +1781,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     // Trigger reload
     reloadAllSegments(SELECT_STAR_QUERY, true, getCountStarResult());
     JsonNode segmentsMetadata = JsonUtils.stringToJsonNode(
-        
sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName(),
 "*")));
+        
sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName(),
 List.of("*"))));
     assertEquals(segmentsMetadata.size(), 12);
     for (JsonNode segmentMetadata : segmentsMetadata) {
       assertEquals(segmentMetadata.get("columns").size(), 75);
@@ -1701,6 +1807,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     double numTotalDocsInDouble = (double) numTotalDocs;
 
     // Test queries with each new added columns
+    //@formatter:off
     String h2Query = "SELECT COUNT(*) FROM mytable";
     String pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedIntMetric 
= 1";
     testQuery(pinotQuery, h2Query);
@@ -1847,13 +1954,15 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     pinotQuery = "SELECT SUM(NewAddedLongMetric) FROM mytable WHERE 
DaysSinceEpoch > 16312";
     h2Query = "SELECT COUNT(*) FROM mytable WHERE DaysSinceEpoch > 16312";
     testQuery(pinotQuery, h2Query);
+    //@formatter:on
 
     // Test other query forms with new added columns
-    pinotQuery =
-        "SELECT "
-            + (useMultiStageQueryEngine() ? 
"arrayToMV(NewAddedMVStringDimension)" : "NewAddedMVStringDimension")
-            + ", SUM(NewAddedFloatMetric) FROM mytable GROUP BY "
-            + (useMultiStageQueryEngine() ? 
"arrayToMV(NewAddedMVStringDimension)" : "NewAddedMVStringDimension");
+    //@formatter:off
+    pinotQuery = "SELECT "
+        + (useMultiStageQueryEngine() ? "arrayToMV(NewAddedMVStringDimension)" 
: "NewAddedMVStringDimension")
+        + ", SUM(NewAddedFloatMetric) FROM mytable GROUP BY "
+        + (useMultiStageQueryEngine() ? "arrayToMV(NewAddedMVStringDimension)" 
: "NewAddedMVStringDimension");
+    //@formatter:on
     response = postQuery(pinotQuery);
     rows = response.get("resultTable").get("rows");
     assertEquals(rows.size(), 1);
@@ -1864,9 +1973,8 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
 
     // The multi-stage query engine doesn't support BIG_DECIMAL data type 
currently.
     if (!useMultiStageQueryEngine()) {
-      pinotQuery =
-          "SELECT NewAddedSVBytesDimension, SUM(NewAddedBigDecimalMetric) FROM 
mytable "
-              + "GROUP BY NewAddedSVBytesDimension";
+      pinotQuery = "SELECT NewAddedSVBytesDimension, 
SUM(NewAddedBigDecimalMetric) FROM mytable "
+          + "GROUP BY NewAddedSVBytesDimension";
       response = postQuery(pinotQuery);
       rows = response.get("resultTable").get("rows");
       assertEquals(rows.size(), 1);
@@ -1876,11 +1984,12 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
       assertEquals(row.get(1).asDouble(), 0.0);
     }
 
-    pinotQuery =
-        "SELECT "
-            + (useMultiStageQueryEngine() ? 
"arrayToMV(NewAddedMVLongDimension)" : "NewAddedMVLongDimension")
-            + ", SUM(NewAddedIntMetric) FROM mytable GROUP BY "
-            + (useMultiStageQueryEngine() ? 
"arrayToMV(NewAddedMVLongDimension)" : "NewAddedMVLongDimension");
+    //@formatter:off
+    pinotQuery = "SELECT "
+        + (useMultiStageQueryEngine() ? "arrayToMV(NewAddedMVLongDimension)" : 
"NewAddedMVLongDimension")
+        + ", SUM(NewAddedIntMetric) FROM mytable GROUP BY "
+        + (useMultiStageQueryEngine() ? "arrayToMV(NewAddedMVLongDimension)" : 
"NewAddedMVLongDimension");
+    //@formatter:on
     response = postQuery(pinotQuery);
     rows = response.get("resultTable").get("rows");
     assertEquals(rows.size(), 1);
@@ -1889,8 +1998,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertEquals(row.get(0).asLong(), Long.MIN_VALUE);
     assertEquals(row.get(1).asDouble(), numTotalDocsInDouble);
 
+    //@formatter:off
     String newAddedDimensions = useMultiStageQueryEngine()
-        ? "arrayToMV(NewAddedMVIntDimension), 
arrayToMV(NewAddedMVLongDimension), arrayToMV(NewAddedMVFloatDimension), "
+        ?
+        "arrayToMV(NewAddedMVIntDimension), 
arrayToMV(NewAddedMVLongDimension), arrayToMV(NewAddedMVFloatDimension), "
             + "arrayToMV(NewAddedMVDoubleDimension), 
arrayToMV(NewAddedMVBooleanDimension), "
             + "arrayToMV(NewAddedMVTimestampDimension), 
arrayToMV(NewAddedMVStringDimension), "
             + "NewAddedSVJSONDimension, NewAddedSVBytesDimension"
@@ -1898,6 +2009,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         "NewAddedMVIntDimension, NewAddedMVLongDimension, 
NewAddedMVFloatDimension, NewAddedMVDoubleDimension, "
             + "NewAddedMVBooleanDimension, NewAddedMVTimestampDimension, 
NewAddedMVStringDimension, "
             + "NewAddedSVJSONDimension, NewAddedSVBytesDimension";
+    //@formatter:on
     pinotQuery = "SELECT " + newAddedDimensions + ", SUM(NewAddedIntMetric), 
SUM(NewAddedLongMetric), "
         + "SUM(NewAddedFloatMetric), SUM(NewAddedDoubleMetric) FROM mytable 
GROUP BY " + newAddedDimensions;
     response = postQuery(pinotQuery);
@@ -2111,7 +2223,8 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testGroupByUDFV1() throws Exception {
+  public void testGroupByUDFV1()
+      throws Exception {
     setUseMultiStageQueryEngine(false);
     String query = "SELECT add(DaysSinceEpoch,DaysSinceEpoch,15), COUNT(*) 
FROM mytable "
         + "GROUP BY add(DaysSinceEpoch,DaysSinceEpoch,15) ORDER BY COUNT(*) 
DESC";
@@ -2154,7 +2267,8 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testGroupByUDFV2() throws Exception {
+  public void testGroupByUDFV2()
+      throws Exception {
     setUseMultiStageQueryEngine(true);
     String query = "SELECT add(DaysSinceEpoch,add(DaysSinceEpoch,15)), 
COUNT(*) FROM mytable "
         + "GROUP BY add(DaysSinceEpoch,add(DaysSinceEpoch,15)) ORDER BY 
COUNT(*) DESC";
@@ -2259,6 +2373,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         "select DaysSinceEpoch, count(*) as num_trips from mytable GROUP BY 
DaysSinceEpoch order by DaysSinceEpoch";
     JsonNode tmpTableResult = 
postQuery(tmpTableQuery).get("resultTable").get("rows");
 
+    //@formatter:off
     String query = "WITH tmp AS (\n"
         + "  select count(*) as num_trips, DaysSinceEpoch  from mytable GROUP 
BY DaysSinceEpoch\n"
         + ")\n"
@@ -2270,6 +2385,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         + "    num_trips - LAG(num_trips, 2) OVER (ORDER BY DaysSinceEpoch) AS 
difference\n"
         + "FROM\n"
         + "    tmp";
+    //@formatter:on
     JsonNode response = postQuery(query);
     JsonNode resultTable = response.get("resultTable");
     
assertEquals(resultTable.get("dataSchema").get("columnDataTypes").toString(),
@@ -2295,6 +2411,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
       assertEquals(row.get(1).asLong() - row.get(2).asLong(), 
row.get(3).asLong());
     }
 
+    //@formatter:off
     query = "WITH tmp AS (\n"
         + "  select count(*) as num_trips, DaysSinceEpoch  from mytable GROUP 
BY DaysSinceEpoch\n"
         + ")\n"
@@ -2306,6 +2423,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         + "    num_trips - LAG(num_trips, '2') OVER (ORDER BY DaysSinceEpoch) 
AS difference\n"
         + "FROM\n"
         + "    tmp";
+    //@formatter:on
     response = postQuery(query);
     resultTable = response.get("resultTable");
     
assertEquals(resultTable.get("dataSchema").get("columnDataTypes").toString(),
@@ -3121,11 +3239,13 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
       throws Exception {
     setUseMultiStageQueryEngine(true);
     // language=sql
+    //@formatter:off
     String query1 = "EXPLAIN PLAN WITHOUT IMPLEMENTATION FOR "
         + "SELECT count(*) AS count, Carrier AS name "
         + "FROM mytable "
         + "GROUP BY name "
         + "ORDER BY 1";
+    //@formatter:on
     String response1 = postQuery(query1).get("resultTable").toString();
 
     // Replace string "docs:[0-9]+" with "docs:*" so that test doesn't fail 
when number of documents change. This is
@@ -3548,7 +3668,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
 
   @Test(dataProvider = "useBothQueryEngines")
   public void testFilteredAggregationWithGroupByOrdering(boolean 
useMultiStageQueryEngine)
-    throws Exception {
+      throws Exception {
     setUseMultiStageQueryEngine(useMultiStageQueryEngine);
 
     // Test the ordering is correctly applied to the correct aggregation (the 
one without FILTER clause)
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
index cb1fb992b3..bd4506e2cf 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
@@ -93,10 +93,7 @@ public class BigDecimalColumnPreIndexStatsCollector extends 
AbstractColumnStatis
 
   @Override
   public int getLengthOfLargestElement() {
-    if (_sealed) {
-      return _maxLength;
-    }
-    throw new IllegalStateException("you must seal the collector first before 
asking for longest value");
+    return _maxLength;
   }
 
   @Override
@@ -106,10 +103,7 @@ public class BigDecimalColumnPreIndexStatsCollector 
extends AbstractColumnStatis
 
   @Override
   public int getCardinality() {
-    if (_sealed) {
-      return _sortedValues.length;
-    }
-    throw new IllegalStateException("you must seal the collector first before 
asking for cardinality");
+    return _sealed ? _sortedValues.length : _values.size();
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
index 34cb0ecc1e..0fca8a3c77 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
@@ -105,10 +105,7 @@ public class BytesColumnPredIndexStatsCollector extends 
AbstractColumnStatistics
 
   @Override
   public int getLengthOfLargestElement() {
-    if (_sealed) {
-      return _maxLength;
-    }
-    throw new IllegalStateException("you must seal the collector first before 
asking for longest value");
+    return _maxLength;
   }
 
   @Override
@@ -118,10 +115,7 @@ public class BytesColumnPredIndexStatsCollector extends 
AbstractColumnStatistics
 
   @Override
   public int getCardinality() {
-    if (_sealed) {
-      return _sortedValues.length;
-    }
-    throw new IllegalStateException("you must seal the collector first before 
asking for cardinality");
+    return _sealed ? _sortedValues.length : _values.size();
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
index 6e9de875b8..9edd89b027 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
@@ -101,10 +101,7 @@ public class DoubleColumnPreIndexStatsCollector extends 
AbstractColumnStatistics
 
   @Override
   public int getCardinality() {
-    if (_sealed) {
-      return _sortedValues.length;
-    }
-    throw new IllegalStateException("you must seal the collector first before 
asking for cardinality");
+    return _sealed ? _sortedValues.length : _values.size();
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
index ce060c7fe4..4be248124e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
@@ -101,10 +101,7 @@ public class FloatColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsC
 
   @Override
   public int getCardinality() {
-    if (_sealed) {
-      return _sortedValues.length;
-    }
-    throw new IllegalStateException("you must seal the collector first before 
asking for cardinality");
+    return _sealed ? _sortedValues.length : _values.size();
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
index 5e3bd03320..29678f6805 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
@@ -101,10 +101,7 @@ public class IntColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
 
   @Override
   public int getCardinality() {
-    if (_sealed) {
-      return _sortedValues.length;
-    }
-    throw new IllegalStateException("you must seal the collector first before 
asking for cardinality");
+    return _sealed ? _sortedValues.length : _values.size();
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
index 0e17dd7439..3d4c63d03b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
@@ -101,10 +101,7 @@ public class LongColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCo
 
   @Override
   public int getCardinality() {
-    if (_sealed) {
-      return _sortedValues.length;
-    }
-    throw new IllegalStateException("you must seal the collector first before 
asking for cardinality");
+    return _sealed ? _sortedValues.length : _values.size();
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
index 478ab27889..5198031a65 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pinot.segment.local.segment.creator.impl.stats;
 
-import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
-import java.util.HashMap;
+import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
+import java.util.Arrays;
 import java.util.Map;
 import org.apache.pinot.common.utils.PinotDataType;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
@@ -44,26 +44,19 @@ import 
org.apache.pinot.spi.utils.builder.TableConfigBuilder;
  * Assumptions that are made:
  * 1. Each key has a single type for the value's associated with it across all 
documents.
  * 2. At this point in the  Pinot process, the type consistency of a key 
should already be enforced, so if a
- * heterogenous value types for a key are encountered will constructing the 
Map statistics it can be raised as a
- * fault.
+ * heterogeneous value types for a key are encountered will construct the Map 
statistics it can be raised as a fault.
  */
 public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCollector {
-  private ObjectOpenHashSet<String> _keys = new 
ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
-  private final HashMap<String, AbstractColumnStatisticsCollector> _keyStats;
+  private final Object2ObjectOpenHashMap<String, 
AbstractColumnStatisticsCollector> _keyStats =
+      new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
   private String[] _sortedValues;
   private int _minLength = Integer.MAX_VALUE;
   private int _maxLength = 0;
-  private int _maxRowLength = 0;
-  private String _minValue = null;
-  private String _maxValue = null;
   private boolean _sealed = false;
-  private final String _column;
 
   public MapColumnPreIndexStatsCollector(String column, StatsCollectorConfig 
statsCollectorConfig) {
     super(column, statsCollectorConfig);
-    super._sorted = false;
-    _keyStats = new HashMap<>();
-    _column = column;
+    _sorted = false;
   }
 
   public AbstractColumnStatisticsCollector getKeyStatistics(String key) {
@@ -75,40 +68,24 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
     assert !_sealed;
 
     if (entry instanceof Map) {
-      final Map<String, Object> mapValue = (Map<String, Object>) entry;
-      byte[] serializeMap = MapUtils.serializeMap(mapValue);
+      //noinspection unchecked
+      Map<String, Object> mapValue = (Map<String, Object>) entry;
+      int length = MapUtils.serializeMap(mapValue).length;
+      _minLength = Math.min(_minLength, length);
+      _maxLength = Math.max(_maxLength, length);
 
-      _maxRowLength = Math.max(_maxRowLength, serializeMap.length);
       for (Map.Entry<String, Object> mapValueEntry : mapValue.entrySet()) {
-        final String key = mapValueEntry.getKey();
-
-        // Record statistics about the key
-        int length = serializeMap.length;
-        if (_keys.add(key)) {
+        String key = mapValueEntry.getKey();
+        Object value = mapValueEntry.getValue();
+        AbstractColumnStatisticsCollector keyStats = _keyStats.get(key);
+        if (keyStats == null) {
+          keyStats = createKeyStatsCollector(key, value);
+          _keyStats.put(key, keyStats);
           if (isPartitionEnabled()) {
             updatePartition(key);
           }
-          if (_minValue == null) {
-            _minValue = key;
-          } else {
-            if (key.compareTo(_minValue) < 0) {
-              _minValue = key;
-            }
-          }
-          if (_maxValue == null) {
-            _maxValue = key;
-          } else {
-            if (key.compareTo(_maxValue) > 0) {
-              _maxValue = key;
-            }
-          }
-          _minLength = Math.min(_minLength, length);
-          _maxLength = Math.max(_maxLength, length);
         }
-
-        // Record statistics about the value within the key
-        AbstractColumnStatisticsCollector keyStats = 
getOrCreateKeyStatsCollector(key, mapValueEntry.getValue());
-        keyStats.collect(mapValueEntry.getValue());
+        keyStats.collect(value);
       }
       _totalNumberOfEntries++;
     } else {
@@ -119,7 +96,7 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
   @Override
   public String getMinValue() {
     if (_sealed) {
-      return _minValue;
+      return _sortedValues[0];
     }
     throw new IllegalStateException("you must seal the collector first before 
asking for min value");
   }
@@ -127,7 +104,7 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
   @Override
   public String getMaxValue() {
     if (_sealed) {
-      return _maxValue;
+      return _sortedValues[_sortedValues.length - 1];
     }
     throw new IllegalStateException("you must seal the collector first before 
asking for max value");
   }
@@ -147,30 +124,24 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
 
   @Override
   public int getLengthOfLargestElement() {
-    if (_sealed) {
-      return _maxLength;
-    }
-    throw new IllegalStateException("you must seal the collector first before 
asking for longest value");
+    return _maxLength;
   }
 
   @Override
   public int getMaxRowLengthInBytes() {
-    return _maxRowLength;
+    return _maxLength;
   }
 
   @Override
   public int getCardinality() {
-    if (_sealed) {
-      return _sortedValues.length;
-    }
-    throw new IllegalStateException("you must seal the collector first before 
asking for cardinality");
+    return _keyStats.size();
   }
 
   @Override
   public void seal() {
     if (!_sealed) {
-      _sortedValues = _keys.stream().sorted().toArray(String[]::new);
-      _keys = null;
+      _sortedValues = _keyStats.keySet().toArray(new String[0]);
+      Arrays.sort(_sortedValues);
 
       // Iterate through every key stats collector and seal them
       for (AbstractColumnStatisticsCollector keyStatsCollector : 
_keyStats.values()) {
@@ -186,79 +157,31 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
    *
    * NOTE: this could raise an issue if there are millions of keys with very 
few values (Sparse keys, in other words).
    * So a less memory intensive option may be better for this.
-   *
-   * @param key
-   * @param value
-   * @return
    */
-  private AbstractColumnStatisticsCollector 
getOrCreateKeyStatsCollector(String key, Object value) {
-    // Check if the key stats collector exists just return it
-    if (!_keyStats.containsKey(key)) {
-      // Get the type of the value
-      PinotDataType type = PinotDataType.getSingleValueType(value.getClass());
-      TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
-          .setTableName(_column)
-          .build();
-      Schema keySchema = new Schema.SchemaBuilder()
-          .setSchemaName(key)
-          .addField(new DimensionFieldSpec(key, convertToDataType(type), 
false))
-          .build();
-      StatsCollectorConfig config = new StatsCollectorConfig(tableConfig, 
keySchema, null);
-
-      AbstractColumnStatisticsCollector keyStatsCollector = null;
-      switch (type) {
-        case INTEGER:
-          keyStatsCollector = new IntColumnPreIndexStatsCollector(key, config);
-          break;
-        case LONG:
-          keyStatsCollector = new LongColumnPreIndexStatsCollector(key, 
config);
-          break;
-        case FLOAT:
-          keyStatsCollector = new FloatColumnPreIndexStatsCollector(key, 
config);
-          break;
-        case DOUBLE:
-          keyStatsCollector = new DoubleColumnPreIndexStatsCollector(key, 
config);
-          break;
-        case BIG_DECIMAL:
-          keyStatsCollector = new BigDecimalColumnPreIndexStatsCollector(key, 
config);
-          break;
-        case STRING:
-          keyStatsCollector = new StringColumnPreIndexStatsCollector(key, 
config);
-          break;
-        case TIMESTAMP:
-        case BOOLEAN:
-        case BYTE:
-        case CHARACTER:
-        case SHORT:
-        case JSON:
-        case BYTES:
-        case OBJECT:
-        case MAP:
-        case BYTE_ARRAY:
-        case CHARACTER_ARRAY:
-        case SHORT_ARRAY:
-        case PRIMITIVE_INT_ARRAY:
-        case INTEGER_ARRAY:
-        case PRIMITIVE_LONG_ARRAY:
-        case LONG_ARRAY:
-        case PRIMITIVE_FLOAT_ARRAY:
-        case FLOAT_ARRAY:
-        case PRIMITIVE_DOUBLE_ARRAY:
-        case DOUBLE_ARRAY:
-        case BOOLEAN_ARRAY:
-        case TIMESTAMP_ARRAY:
-        case STRING_ARRAY:
-        case BYTES_ARRAY:
-        case COLLECTION:
-        case OBJECT_ARRAY:
-        default:
-          throw new UnsupportedOperationException(String.format("MAP column 
does not yet support '%s'", type));
-      }
-
-      _keyStats.put(key, keyStatsCollector);
+  private AbstractColumnStatisticsCollector createKeyStatsCollector(String 
key, Object value) {
+    // Get the type of the value
+    PinotDataType type = PinotDataType.getSingleValueType(value.getClass());
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(key).build();
+    Schema keySchema = new Schema.SchemaBuilder().setSchemaName(key)
+        .addField(new DimensionFieldSpec(key, convertToDataType(type), 
false)).build();
+    StatsCollectorConfig config = new StatsCollectorConfig(tableConfig, 
keySchema, null);
+
+    switch (type) {
+      case INTEGER:
+        return new IntColumnPreIndexStatsCollector(key, config);
+      case LONG:
+        return new LongColumnPreIndexStatsCollector(key, config);
+      case FLOAT:
+        return new FloatColumnPreIndexStatsCollector(key, config);
+      case DOUBLE:
+        return new DoubleColumnPreIndexStatsCollector(key, config);
+      case BIG_DECIMAL:
+        return new BigDecimalColumnPreIndexStatsCollector(key, config);
+      case STRING:
+        return new StringColumnPreIndexStatsCollector(key, config);
+      default:
+        throw new UnsupportedOperationException(String.format("MAP column does 
not yet support '%s'", type));
     }
-
-    return _keyStats.get(key);
   }
 
   static FieldSpec.DataType convertToDataType(PinotDataType ty) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
index d606c784d2..d43adbe3c7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
@@ -125,10 +125,7 @@ public class StringColumnPreIndexStatsCollector extends 
AbstractColumnStatistics
 
   @Override
   public int getLengthOfLargestElement() {
-    if (_sealed) {
-      return _maxLength;
-    }
-    throw new IllegalStateException("you must seal the collector first before 
asking for longest value");
+    return _maxLength;
   }
 
   @Override
@@ -138,10 +135,7 @@ public class StringColumnPreIndexStatsCollector extends 
AbstractColumnStatistics
 
   @Override
   public int getCardinality() {
-    if (_sealed) {
-      return _sortedValues.length;
-    }
-    throw new IllegalStateException("you must seal the collector first before 
asking for cardinality");
+    return _sealed ? _sortedValues.length : _values.size();
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index 4e3ab129c2..3b654a7ebc 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.segment.index.loader;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.File;
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Collections;
@@ -65,6 +66,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -277,9 +279,9 @@ public class ForwardIndexHandler extends BaseIndexHandler {
 
         // If the dictionary is not enabled on the existing column it must be 
on the new noDictionary column list.
         // Cannot enable the dictionary for a column with forward index 
disabled.
-        Preconditions.checkState(existingHasDict || !newIsDict,
-            String.format("Cannot regenerate the dictionary for column %s with 
forward index disabled. Please "
-                + "refresh or back-fill the data to add back the forward 
index", column));
+        Preconditions.checkState(existingHasDict || !newIsDict, String.format(
+            "Cannot regenerate the dictionary for column %s with forward index 
disabled. Please refresh or back-fill "
+                + "the data to add back the forward index", column));
 
         if (existingHasDict && !newIsDict) {
           // Dictionary is currently enabled on this column but is supposed to 
be disabled. Remove the dictionary
@@ -459,27 +461,33 @@ public class ForwardIndexHandler extends BaseIndexHandler 
{
       SegmentDirectory.Writer segmentWriter)
       throws Exception {
     try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter, 
columnMetadata)) {
-      int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
-      IndexCreationContext context;
-      if (columnMetadata.isSingleValue()) {
-        context = 
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
-            .withLengthOfLongestEntry(lengthOfLongestEntry).build();
-      } else {
-        // For VarByte MV columns like String and Bytes, the storage 
representation of each row contains the following
-        // components:
-        // 1. bytes required to store the actual elements of the MV row (A)
-        // 2. bytes required to store the number of elements in the MV row (B)
-        // 3. bytes required to store the length of each MV element (C)
-        //
-        // lengthOfLongestEntry = A + B + C
-        // maxRowLengthInBytes = A
-        int maxNumValuesPerEntry = columnMetadata.getMaxNumberOfMultiValues();
-        int maxRowLengthInBytes =
-            
MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry,
 maxNumValuesPerEntry);
-        context = 
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
-            
.withLengthOfLongestEntry(lengthOfLongestEntry).withMaxRowLengthInBytes(maxRowLengthInBytes).build();
+      IndexCreationContext.Builder builder =
+          
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata);
+      // Set entry length info for raw index creators. No need to set this 
when changing dictionary id compression type.
+      if (!reader.isDictionaryEncoded() && 
!columnMetadata.getDataType().getStoredType().isFixedWidth()) {
+        int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
+        if (lengthOfLongestEntry < 0) {
+          // When this info is not available from the reader, we need to scan 
the column.
+          lengthOfLongestEntry = getMaxRowLength(columnMetadata, reader, null);
+        }
+        if (columnMetadata.isSingleValue()) {
+          builder.withLengthOfLongestEntry(lengthOfLongestEntry);
+        } else {
+          // For VarByte MV columns like String and Bytes, the storage 
representation of each row contains the following
+          // components:
+          // 1. bytes required to store the actual elements of the MV row (A)
+          // 2. bytes required to store the number of elements in the MV row 
(B)
+          // 3. bytes required to store the length of each MV element (C)
+          //
+          // lengthOfLongestEntry = A + B + C
+          // maxRowLengthInBytes = A
+          int maxNumValuesPerEntry = 
columnMetadata.getMaxNumberOfMultiValues();
+          int maxRowLengthInBytes =
+              
MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry,
 maxNumValuesPerEntry);
+          builder.withMaxRowLengthInBytes(maxRowLengthInBytes);
+        }
       }
-
+      IndexCreationContext context = builder.build();
       ForwardIndexConfig config = 
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
       try (ForwardIndexCreator creator = 
StandardIndexes.forward().createIndexCreator(context, config)) {
         if (!reader.getStoredType().equals(creator.getValueType())) {
@@ -645,7 +653,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
       Dictionary dictionaryReader) {
     C readerContext = reader.createContext();
     boolean isSVColumn = reader.isSingleValue();
-    FieldSpec.DataType storedType = 
dictionaryReader.getValueType().getStoredType();
+    DataType storedType = dictionaryReader.getValueType().getStoredType();
 
     switch (storedType) {
       case INT: {
@@ -876,10 +884,8 @@ public class ForwardIndexHandler extends BaseIndexHandler {
       SegmentDirectory.Writer segmentWriter, File indexDir, 
SegmentDictionaryCreator dictionaryCreator)
       throws Exception {
     try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter, 
existingColMetadata)) {
-      int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
       IndexCreationContext.Builder builder =
-          
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
-              .withLengthOfLongestEntry(lengthOfLongestEntry);
+          
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata);
       // existingColMetadata has dictEnable=false. Overwrite the value.
       builder.withDictionary(true);
       IndexCreationContext context = builder.build();
@@ -923,7 +929,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
     }
 
     LOGGER.info("Creating raw forward index for segment={} and column={}", 
segmentName, column);
-    rewriteDictToRawForwardIndex(column, existingColMetadata, segmentWriter, 
indexDir);
+    rewriteDictToRawForwardIndex(existingColMetadata, segmentWriter, indexDir);
 
     // Remove dictionary and forward index
     segmentWriter.removeIndex(column, StandardIndexes.forward());
@@ -946,100 +952,80 @@ public class ForwardIndexHandler extends 
BaseIndexHandler {
     LOGGER.info("Created raw based forward index for segment: {}, column: {}", 
segmentName, column);
   }
 
-  private void rewriteDictToRawForwardIndex(String column, ColumnMetadata 
existingColMetadata,
-      SegmentDirectory.Writer segmentWriter, File indexDir)
+  private void rewriteDictToRawForwardIndex(ColumnMetadata columnMetadata, 
SegmentDirectory.Writer segmentWriter,
+      File indexDir)
       throws Exception {
-    try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter, 
existingColMetadata)) {
-      Dictionary dictionary = DictionaryIndexType.read(segmentWriter, 
existingColMetadata);
+    String column = columnMetadata.getColumnName();
+    try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter, 
columnMetadata)) {
+      Dictionary dictionary = DictionaryIndexType.read(segmentWriter, 
columnMetadata);
       IndexCreationContext.Builder builder =
-          
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata);
+          
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata);
       builder.withDictionary(false);
-
-      if (existingColMetadata.isSingleValue()) {
-        // lengthOfLongestEntry is available for dict columns from metadata.
-        int lengthOfLongestEntry = existingColMetadata.getColumnMaxLength();
-        builder.withLengthOfLongestEntry(lengthOfLongestEntry);
-      } else {
-        FieldSpec.DataType dataType = existingColMetadata.getDataType();
-        boolean isFixedWidth = dataType.getStoredType().isFixedWidth();
-
-        if (!isFixedWidth) {
-          // For variable length stored types, maxRowLengthInBytes is required 
to create the forwardIndexCreator.
-          // This can only be determined by reading the entire MV forward 
index.
-          int maxRowLength = getMaxRowLengthForMVColumn(column, reader, 
dictionary);
-          builder.withMaxRowLengthInBytes(maxRowLength);
+      if (!columnMetadata.getDataType().getStoredType().isFixedWidth()) {
+        if (columnMetadata.isSingleValue()) {
+          // lengthOfLongestEntry is available for dict columns from metadata.
+          
builder.withLengthOfLongestEntry(columnMetadata.getColumnMaxLength());
+        } else {
+          // maxRowLength can only be determined by scanning the column.
+          builder.withMaxRowLengthInBytes(getMaxRowLength(columnMetadata, 
reader, dictionary));
         }
       }
-
       IndexCreationContext context = builder.build();
       ForwardIndexConfig config = 
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
-
       try (ForwardIndexCreator creator = 
StandardIndexes.forward().createIndexCreator(context, config)) {
-        int numDocs = existingColMetadata.getTotalDocs();
-        forwardIndexRewriteHelper(column, existingColMetadata, reader, 
creator, numDocs, null, dictionary);
+        forwardIndexRewriteHelper(column, columnMetadata, reader, creator, 
columnMetadata.getTotalDocs(), null,
+            dictionary);
       }
     }
   }
 
-  private int getMaxRowLengthForMVColumn(String column, ForwardIndexReader<?> 
reader, Dictionary dictionary)
-      throws Exception {
-    ColumnMetadata existingColMetadata = 
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
-    AbstractColumnStatisticsCollector statsCollector =
-        getStatsCollector(column, dictionary.getValueType().getStoredType());
-    PinotSegmentColumnReader columnReader =
-        new PinotSegmentColumnReader(reader, dictionary, null, 
existingColMetadata.getMaxNumberOfMultiValues());
-    int numDocs = existingColMetadata.getTotalDocs();
-
-    
Preconditions.checkState(!existingColMetadata.getDataType().getStoredType().isFixedWidth(),
-        "Column " + column + "is not a fixed width column.");
-    Preconditions.checkState(!existingColMetadata.isSingleValue(), "Column " + 
column + "is not MV.");
-
-    for (int i = 0; i < numDocs; i++) {
-      Object obj = columnReader.getValue(i);
-      statsCollector.collect(obj);
+  /**
+   * Returns the max row length for a column.
+   * - For SV column, this is the length of the longest value.
+   * - For MV column, this is the length of the longest MV entry (sum of 
lengths of all elements).
+   */
+  private int getMaxRowLength(ColumnMetadata columnMetadata, 
ForwardIndexReader<?> forwardIndex,
+      @Nullable Dictionary dictionary)
+      throws IOException {
+    String column = columnMetadata.getColumnName();
+    DataType storedType = columnMetadata.getDataType().getStoredType();
+    assert !storedType.isFixedWidth();
+
+    try (PinotSegmentColumnReader columnReader = new 
PinotSegmentColumnReader(forwardIndex, dictionary, null,
+        columnMetadata.getMaxNumberOfMultiValues())) {
+      AbstractColumnStatisticsCollector statsCollector = 
getStatsCollector(column, storedType);
+      int numDocs = columnMetadata.getTotalDocs();
+      for (int i = 0; i < numDocs; i++) {
+        statsCollector.collect(columnReader.getValue(i));
+      }
+      // NOTE: No need to seal the stats collector because value length is 
updated while collecting stats.
+      return columnMetadata.isSingleValue() ? 
statsCollector.getLengthOfLargestElement()
+          : statsCollector.getMaxRowLengthInBytes();
     }
-
-    statsCollector.seal();
-    return statsCollector.getMaxRowLengthInBytes();
   }
 
-  private AbstractColumnStatisticsCollector getStatsCollector(String column, 
FieldSpec.DataType storedType)
-      throws Exception {
+  private AbstractColumnStatisticsCollector getStatsCollector(String column, 
DataType storedType) {
     StatsCollectorConfig statsCollectorConfig = new 
StatsCollectorConfig(_tableConfig, _schema, null);
-    AbstractColumnStatisticsCollector statsCollector;
-
-    // TODO(Vivek): Check if checking stored type will be problematic if 
column has dictionary. ie will dictionary
-    //  return INT as the stored type.
     switch (storedType) {
       case INT:
-        statsCollector = new IntColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
-        break;
+        return new IntColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
       case LONG:
-        statsCollector = new LongColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
-        break;
+        return new LongColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
       case FLOAT:
-        statsCollector = new FloatColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
-        break;
+        return new FloatColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
       case DOUBLE:
-        statsCollector = new DoubleColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
-        break;
+        return new DoubleColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
+      case BIG_DECIMAL:
+        return new BigDecimalColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
       case STRING:
-        statsCollector = new StringColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
-        break;
+        return new StringColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
       case BYTES:
-        statsCollector = new BytesColumnPredIndexStatsCollector(column, 
statsCollectorConfig);
-        break;
+        return new BytesColumnPredIndexStatsCollector(column, 
statsCollectorConfig);
       case MAP:
-        statsCollector = new MapColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
-        break;
-      case BIG_DECIMAL:
-        statsCollector = new BigDecimalColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
-        break;
+        return new MapColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
       default:
-        throw new IllegalStateException("Unsupported storedType=" + 
storedType.toString() + " for column=" + column);
+        throw new IllegalStateException("Unsupported storedType=" + storedType 
+ " for column=" + column);
     }
-
-    return statsCollector;
   }
 
   private boolean hasIndex(String column, IndexType<?, ?, ?> indexType) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
index 93bc517dff..0125777777 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
@@ -39,7 +39,6 @@ public class PinotSegmentColumnReader implements Closeable {
   private final Dictionary _dictionary;
   private final NullValueVectorReader _nullValueVectorReader;
   private final int[] _dictIdBuffer;
-  private final int _maxNumValuesPerMVEntry;
 
   public PinotSegmentColumnReader(IndexSegment indexSegment, String column) {
     DataSource dataSource = indexSegment.getDataSource(column);
@@ -51,11 +50,10 @@ public class PinotSegmentColumnReader implements Closeable {
     _nullValueVectorReader = dataSource.getNullValueVector();
     if (_forwardIndexReader.isSingleValue()) {
       _dictIdBuffer = null;
-      _maxNumValuesPerMVEntry = -1;
     } else {
-      _maxNumValuesPerMVEntry = 
dataSource.getDataSourceMetadata().getMaxNumValuesPerMVEntry();
-      Preconditions.checkState(_maxNumValuesPerMVEntry >= 0, 
"maxNumValuesPerMVEntry is negative for an MV column.");
-      _dictIdBuffer = new int[_maxNumValuesPerMVEntry];
+      int maxNumValuesPerMVEntry = 
dataSource.getDataSourceMetadata().getMaxNumValuesPerMVEntry();
+      Preconditions.checkState(maxNumValuesPerMVEntry >= 0, 
"maxNumValuesPerMVEntry is negative for an MV column.");
+      _dictIdBuffer = new int[maxNumValuesPerMVEntry];
     }
   }
 
@@ -65,7 +63,6 @@ public class PinotSegmentColumnReader implements Closeable {
     _forwardIndexReaderContext = _forwardIndexReader.createContext();
     _dictionary = dictionary;
     _nullValueVectorReader = nullValueVectorReader;
-    _maxNumValuesPerMVEntry = maxNumValuesPerMVEntry;
     if (_forwardIndexReader.isSingleValue()) {
       _dictIdBuffer = null;
     } else {
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
index 2a3040cd20..2de3f12390 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.index.loader;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -48,6 +49,7 @@ import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -62,6 +64,7 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.AfterMethod;
@@ -1017,6 +1020,81 @@ public class ForwardIndexHandlerTest {
     }
   }
 
+  @Test
+  public void testChangeCompressionAndIndexVersion()
+      throws Exception {
+    List<String> columns = new ArrayList<>(RAW_SNAPPY_COLUMNS.size() + 
RAW_SORTED_COLUMNS.size());
+    columns.addAll(RAW_SNAPPY_COLUMNS);
+    columns.addAll(RAW_SORTED_COLUMNS);
+    for (String column : columns) {
+      // Convert from SNAPPY v2 to LZ4 v4
+      SegmentMetadataImpl existingSegmentMetadata;
+      FieldConfig existingFieldConfig;
+      try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
+          SegmentDirectory.Writer writer = segmentDirectory.createWriter()) {
+        _segmentDirectory = segmentDirectory;
+        _writer = writer;
+
+        existingSegmentMetadata = segmentDirectory.getSegmentMetadata();
+        ForwardIndexConfig forwardIndexConfig =
+            new 
ForwardIndexConfig.Builder().withCompressionCodec(CompressionCodec.LZ4).withRawIndexWriterVersion(4)
+                .build();
+        ObjectNode indexes = JsonUtils.newObjectNode();
+        indexes.set("forward", forwardIndexConfig.toJsonNode());
+        FieldConfig fieldConfig =
+            new 
FieldConfig.Builder(column).withEncodingType(FieldConfig.EncodingType.RAW).withIndexes(indexes).build();
+        existingFieldConfig = _fieldConfigMap.put(column, fieldConfig);
+        assertNotNull(existingFieldConfig);
+        ForwardIndexHandler handler = createForwardIndexHandler();
+        assertTrue(handler.needUpdateIndices(writer));
+        handler.updateIndices(writer);
+        handler.postUpdateIndicesCleanup(writer);
+        // Tear down before validation. Because columns.psf and index map 
cleanup happens at segmentDirectory.close()
+      }
+
+      // Validation
+      ColumnMetadata metadata = 
existingSegmentMetadata.getColumnMetadataFor(column);
+      testIndexExists(column, StandardIndexes.forward());
+      validateIndexMap(column, false, false);
+      validateForwardIndex(column, CompressionCodec.LZ4, metadata.isSorted());
+
+      // Validate metadata properties. Nothing should change when a 
forwardIndex is rewritten for compressionType
+      // change.
+      validateMetadataProperties(column, metadata.hasDictionary(), 
metadata.getColumnMaxLength(),
+          metadata.getCardinality(), metadata.getTotalDocs(), 
metadata.getDataType(), metadata.getFieldType(),
+          metadata.isSorted(), metadata.isSingleValue(), 
metadata.getMaxNumberOfMultiValues(),
+          metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), 
metadata.getMinValue(),
+          metadata.getMaxValue(), false);
+
+      // Convert it back
+      try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
+          SegmentDirectory.Writer writer = segmentDirectory.createWriter()) {
+        _segmentDirectory = segmentDirectory;
+        _writer = writer;
+        _fieldConfigMap.put(column, existingFieldConfig);
+        ForwardIndexHandler handler = createForwardIndexHandler();
+        assertTrue(handler.needUpdateIndices(writer));
+        handler.updateIndices(writer);
+        handler.postUpdateIndicesCleanup(writer);
+        // Tear down before validation. Because columns.psf and index map 
cleanup happens at segmentDirectory.close()
+      }
+
+      // Validation
+      metadata = existingSegmentMetadata.getColumnMetadataFor(column);
+      testIndexExists(column, StandardIndexes.forward());
+      validateIndexMap(column, false, false);
+      validateForwardIndex(column, CompressionCodec.SNAPPY, 
metadata.isSorted());
+
+      // Validate metadata properties. Nothing should change when a 
forwardIndex is rewritten for compressionType
+      // change.
+      validateMetadataProperties(column, metadata.hasDictionary(), 
metadata.getColumnMaxLength(),
+          metadata.getCardinality(), metadata.getTotalDocs(), 
metadata.getDataType(), metadata.getFieldType(),
+          metadata.isSorted(), metadata.isSingleValue(), 
metadata.getMaxNumberOfMultiValues(),
+          metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), 
metadata.getMinValue(),
+          metadata.getMaxValue(), false);
+    }
+  }
+
   @Test
   public void testChangeDictCompression()
       throws Exception {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 19bed50b68..98a2a5aefe 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -170,8 +171,7 @@ public class ControllerRequestURLBuilder {
   }
 
   public String forTenantInstancesToggle(String tenant, String tenantType, 
String state) {
-    return StringUtil.join("/", _baseUrl, "tenants", tenant)
-        + "?type=" + tenantType + "&state=" + state;
+    return StringUtil.join("/", _baseUrl, "tenants", tenant) + "?type=" + 
tenantType + "&state=" + state;
   }
 
   public String forLiveBrokerTablesGet() {
@@ -297,6 +297,31 @@ public class ControllerRequestURLBuilder {
     return StringUtil.join("/", _baseUrl, "tables", tableName, "externalview");
   }
 
+  public String forTableAggregateMetadata(String tableName) {
+    return StringUtil.join("/", _baseUrl, "tables", tableName, "metadata");
+  }
+
+  public String forTableAggregateMetadata(String tableName, @Nullable 
List<String> columns) {
+    return StringUtil.join("/", _baseUrl, "tables", tableName, "metadata") + 
constructColumnsParameter(columns);
+  }
+
+  private String constructColumnsParameter(@Nullable List<String> columns) {
+    if (!CollectionUtils.isEmpty(columns)) {
+      StringBuilder parameter = new StringBuilder();
+      parameter.append("?columns=");
+      parameter.append(columns.get(0));
+      int numColumns = columns.size();
+      if (numColumns > 1) {
+        for (int i = 1; i < numColumns; i++) {
+          parameter.append("&columns=").append(columns.get(i));
+        }
+      }
+      return parameter.toString();
+    } else {
+      return "";
+    }
+  }
+
   public String forSchemaValidate() {
     return StringUtil.join("/", _baseUrl, "schemas", "validate");
   }
@@ -368,9 +393,10 @@ public class ControllerRequestURLBuilder {
   }
 
   public String forSegmentsMetadataFromServer(String tableName) {
-    return forSegmentsMetadataFromServer(tableName, null);
+    return forSegmentsMetadataFromServer(tableName, (List<String>) null);
   }
 
+  @Deprecated
   public String forSegmentsMetadataFromServer(String tableName, @Nullable 
String columns) {
     String url = StringUtil.join("/", _baseUrl, "segments", tableName, 
"metadata");
     if (columns != null) {
@@ -379,6 +405,10 @@ public class ControllerRequestURLBuilder {
     return url;
   }
 
+  public String forSegmentsMetadataFromServer(String tableName, @Nullable 
List<String> columns) {
+    return StringUtil.join("/", _baseUrl, "segments", tableName, "metadata") + 
constructColumnsParameter(columns);
+  }
+
   public String forSegmentMetadata(String tableName, String segmentName) {
     return StringUtil.join("/", _baseUrl, "segments", tableName, 
encode(segmentName), "metadata");
   }
@@ -441,8 +471,7 @@ public class ControllerRequestURLBuilder {
 
   public String forDeleteMultipleSegments(String tableName, String tableType, 
List<String> segments) {
     StringBuilder fullUrl = new StringBuilder(
-        StringUtil.join("?", StringUtil.join("/", _baseUrl, "segments", 
tableName),
-             "type=" + tableType));
+        StringUtil.join("?", StringUtil.join("/", _baseUrl, "segments", 
tableName), "type=" + tableType));
     for (String segment : segments) {
       fullUrl.append("&segments=").append(segment);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to