This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f7067dbead Fix raw index conversion from v4 (#14171)
f7067dbead is described below
commit f7067dbead7aa0b7376105470daa51fb5f3e3028
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Sun Oct 6 15:27:46 2024 -0700
Fix raw index conversion from v4 (#14171)
---
.../MultiNodesOfflineClusterIntegrationTest.java | 9 +-
.../tests/OfflineClusterIntegrationTest.java | 170 +++++++++++++++++---
.../BigDecimalColumnPreIndexStatsCollector.java | 10 +-
.../stats/BytesColumnPredIndexStatsCollector.java | 10 +-
.../stats/DoubleColumnPreIndexStatsCollector.java | 5 +-
.../stats/FloatColumnPreIndexStatsCollector.java | 5 +-
.../stats/IntColumnPreIndexStatsCollector.java | 5 +-
.../stats/LongColumnPreIndexStatsCollector.java | 5 +-
.../stats/MapColumnPreIndexStatsCollector.java | 173 ++++++--------------
.../stats/StringColumnPreIndexStatsCollector.java | 10 +-
.../segment/index/loader/ForwardIndexHandler.java | 178 ++++++++++-----------
.../segment/readers/PinotSegmentColumnReader.java | 9 +-
.../index/loader/ForwardIndexHandlerTest.java | 78 +++++++++
.../utils/builder/ControllerRequestURLBuilder.java | 39 ++++-
14 files changed, 408 insertions(+), 298 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
index 2a6054f6ba..a981a930ce 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
@@ -296,7 +296,14 @@ public class MultiNodesOfflineClusterIntegrationTest
extends OfflineClusterInteg
// Disabled because with multiple replicas, there is no guarantee that all
replicas are reloaded
@Test(enabled = false)
@Override
- public void testDefaultColumns(boolean useMultiStageQueryEngineg) {
+ public void testDefaultColumns(boolean useMultiStageQueryEngine) {
+ // Ignored
+ }
+
+ // Disabled because with multiple replicas, there is no guarantee that all
replicas are reloaded
+ @Test(enabled = false)
+ @Override
+ public void testForwardIndexTriggering() {
// Ignored
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 2bcfcabef1..97f39f66a4 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -61,6 +61,7 @@ import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.spi.config.instance.InstanceType;
@@ -191,8 +192,8 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
HelixConfigScope scope =
new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
.build();
- _helixManager.getConfigAccessor().set(scope,
CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY,
- Integer.toString(12));
+ _helixManager.getConfigAccessor()
+ .set(scope, CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY,
Integer.toString(12));
startBrokers();
startServers();
@@ -1295,7 +1296,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
assertEquals(getTableSize(getTableName()), _tableSize);
}
- @Test(dependsOnMethods = "testDefaultColumns")
+ @Test(dependsOnMethods = "testForwardIndexTriggering")
public void testBloomFilterTriggering()
throws Exception {
long numTotalDocs = getCountStarResult();
@@ -1318,6 +1319,111 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
assertEquals(getTableSize(getTableName()), _tableSize);
}
+ @Test(dependsOnMethods = "testDefaultColumns")
+ public void testForwardIndexTriggering()
+ throws Exception {
+ String column = "DestCityName";
+ JsonNode columnIndexSize = getColumnIndexSize(column);
+ assertTrue(columnIndexSize.has("dictionary"));
+ assertTrue(columnIndexSize.has("forward_index"));
+ double dictionarySize = columnIndexSize.get("dictionary").asDouble();
+ double forwardIndexSize = columnIndexSize.get("forward_index").asDouble();
+
+ // Convert 'DestCityName' to raw index
+ TableConfig tableConfig = getOfflineTableConfig();
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+ List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns();
+ assertNotNull(noDictionaryColumns);
+ noDictionaryColumns.add(column);
+ updateTableConfig(tableConfig);
+ long numTotalDocs = getCountStarResult();
+ reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
+ columnIndexSize = getColumnIndexSize(column);
+ assertFalse(columnIndexSize.has("dictionary"));
+ assertTrue(columnIndexSize.has("forward_index"));
+ double v2rawIndexSize = columnIndexSize.get("forward_index").asDouble();
+ assertTrue(v2rawIndexSize > forwardIndexSize);
+
+ // NOTE: Currently Pinot doesn't support directly changing raw index
version, so we need to first reset it back to
+ // dictionary encoding.
+ // TODO: Support it
+ resetForwardIndex(dictionarySize, forwardIndexSize);
+
+ // Convert 'DestCityName' to v4 raw index
+ List<FieldConfig> fieldConfigs = tableConfig.getFieldConfigList();
+ assertNotNull(fieldConfigs);
+ ForwardIndexConfig forwardIndexConfig = new
ForwardIndexConfig.Builder().withRawIndexWriterVersion(4).build();
+ ObjectNode indexes = JsonUtils.newObjectNode();
+ indexes.set("forward", forwardIndexConfig.toJsonNode());
+ FieldConfig fieldConfig =
+ new
FieldConfig.Builder(column).withEncodingType(FieldConfig.EncodingType.RAW).withIndexes(indexes).build();
+ fieldConfigs.add(fieldConfig);
+ updateTableConfig(tableConfig);
+ reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
+ columnIndexSize = getColumnIndexSize(column);
+ assertFalse(columnIndexSize.has("dictionary"));
+ assertTrue(columnIndexSize.has("forward_index"));
+ double v4RawIndexSize = columnIndexSize.get("forward_index").asDouble();
+ assertTrue(v4RawIndexSize < v2rawIndexSize && v4RawIndexSize >
forwardIndexSize);
+
+ // Convert 'DestCityName' to SNAPPY compression
+ forwardIndexConfig =
+ new
ForwardIndexConfig.Builder().withCompressionCodec(CompressionCodec.SNAPPY).withRawIndexWriterVersion(4)
+ .build();
+ indexes.set("forward", forwardIndexConfig.toJsonNode());
+ fieldConfig =
+ new
FieldConfig.Builder(column).withEncodingType(FieldConfig.EncodingType.RAW).withIndexes(indexes).build();
+ fieldConfigs.set(fieldConfigs.size() - 1, fieldConfig);
+ updateTableConfig(tableConfig);
+ reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
+ columnIndexSize = getColumnIndexSize(column);
+ assertFalse(columnIndexSize.has("dictionary"));
+ assertTrue(columnIndexSize.has("forward_index"));
+ double v4SnappyRawIndexSize =
columnIndexSize.get("forward_index").asDouble();
+ assertTrue(v4SnappyRawIndexSize > v2rawIndexSize);
+
+ // Removing FieldConfig should be no-op because compression is not
explicitly set
+ fieldConfigs.remove(fieldConfigs.size() - 1);
+ updateTableConfig(tableConfig);
+ reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
+ columnIndexSize = getColumnIndexSize(column);
+ assertFalse(columnIndexSize.has("dictionary"));
+ assertTrue(columnIndexSize.has("forward_index"));
+ assertEquals(columnIndexSize.get("forward_index").asDouble(),
v4SnappyRawIndexSize);
+
+ // Adding 'LZ4' compression explicitly should trigger the conversion
+ forwardIndexConfig = new
ForwardIndexConfig.Builder().withCompressionCodec(CompressionCodec.LZ4).build();
+ indexes.set("forward", forwardIndexConfig.toJsonNode());
+ fieldConfig =
+ new
FieldConfig.Builder(column).withEncodingType(FieldConfig.EncodingType.RAW).withIndexes(indexes).build();
+ fieldConfigs.add(fieldConfig);
+ updateTableConfig(tableConfig);
+ reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
+ columnIndexSize = getColumnIndexSize(column);
+ assertFalse(columnIndexSize.has("dictionary"));
+ assertTrue(columnIndexSize.has("forward_index"));
+ assertEquals(columnIndexSize.get("forward_index").asDouble(),
v2rawIndexSize);
+
+ resetForwardIndex(dictionarySize, forwardIndexSize);
+ }
+
+ private JsonNode getColumnIndexSize(String column)
+ throws Exception {
+ return JsonUtils.stringToJsonNode(
+
sendGetRequest(_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(),
List.of(column))))
+ .get("columnIndexSizeMap").get(column);
+ }
+
+ private void resetForwardIndex(double expectedDictionarySize, double
expectedForwardIndexSize)
+ throws Exception {
+ TableConfig tableConfig = createOfflineTableConfig();
+ updateTableConfig(tableConfig);
+ reloadAllSegments(SELECT_STAR_QUERY, false, getCountStarResult());
+ JsonNode columnIndexSize = getColumnIndexSize("DestCityName");
+ assertEquals(columnIndexSize.get("dictionary").asDouble(),
expectedDictionarySize);
+ assertEquals(columnIndexSize.get("forward_index").asDouble(),
expectedForwardIndexSize);
+ }
+
/**
* Check if server returns error response quickly without timing out Broker.
*/
@@ -1617,10 +1723,10 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
// Verify the index sizes
JsonNode columnIndexSizeMap = JsonUtils.stringToJsonNode(sendGetRequest(
- getControllerBaseApiUrl() +
"/tables/mytable/metadata?columns=DivAirportSeqIDs"
- +
"&columns=NewAddedDerivedDivAirportSeqIDs&columns=NewAddedDerivedDivAirportSeqIDsString"
- +
"&columns=NewAddedRawDerivedStringDimension&columns=NewAddedRawDerivedMVIntDimension"
- + "&columns=NewAddedDerivedNullString"))
+
_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(),
+ List.of("DivAirportSeqIDs", "NewAddedDerivedDivAirportSeqIDs",
"NewAddedDerivedDivAirportSeqIDsString",
+ "NewAddedRawDerivedStringDimension",
"NewAddedRawDerivedMVIntDimension",
+ "NewAddedDerivedNullString"))))
.get("columnIndexSizeMap");
assertEquals(columnIndexSizeMap.size(), 6);
JsonNode originalColumnIndexSizes =
columnIndexSizeMap.get("DivAirportSeqIDs");
@@ -1675,7 +1781,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
// Trigger reload
reloadAllSegments(SELECT_STAR_QUERY, true, getCountStarResult());
JsonNode segmentsMetadata = JsonUtils.stringToJsonNode(
-
sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName(),
"*")));
+
sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName(),
List.of("*"))));
assertEquals(segmentsMetadata.size(), 12);
for (JsonNode segmentMetadata : segmentsMetadata) {
assertEquals(segmentMetadata.get("columns").size(), 75);
@@ -1701,6 +1807,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
double numTotalDocsInDouble = (double) numTotalDocs;
// Test queries with each new added columns
+ //@formatter:off
String h2Query = "SELECT COUNT(*) FROM mytable";
String pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedIntMetric
= 1";
testQuery(pinotQuery, h2Query);
@@ -1847,13 +1954,15 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
pinotQuery = "SELECT SUM(NewAddedLongMetric) FROM mytable WHERE
DaysSinceEpoch > 16312";
h2Query = "SELECT COUNT(*) FROM mytable WHERE DaysSinceEpoch > 16312";
testQuery(pinotQuery, h2Query);
+ //@formatter:on
// Test other query forms with new added columns
- pinotQuery =
- "SELECT "
- + (useMultiStageQueryEngine() ?
"arrayToMV(NewAddedMVStringDimension)" : "NewAddedMVStringDimension")
- + ", SUM(NewAddedFloatMetric) FROM mytable GROUP BY "
- + (useMultiStageQueryEngine() ?
"arrayToMV(NewAddedMVStringDimension)" : "NewAddedMVStringDimension");
+ //@formatter:off
+ pinotQuery = "SELECT "
+ + (useMultiStageQueryEngine() ? "arrayToMV(NewAddedMVStringDimension)"
: "NewAddedMVStringDimension")
+ + ", SUM(NewAddedFloatMetric) FROM mytable GROUP BY "
+ + (useMultiStageQueryEngine() ? "arrayToMV(NewAddedMVStringDimension)"
: "NewAddedMVStringDimension");
+ //@formatter:on
response = postQuery(pinotQuery);
rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 1);
@@ -1864,9 +1973,8 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
// The multi-stage query engine doesn't support BIG_DECIMAL data type
currently.
if (!useMultiStageQueryEngine()) {
- pinotQuery =
- "SELECT NewAddedSVBytesDimension, SUM(NewAddedBigDecimalMetric) FROM
mytable "
- + "GROUP BY NewAddedSVBytesDimension";
+ pinotQuery = "SELECT NewAddedSVBytesDimension,
SUM(NewAddedBigDecimalMetric) FROM mytable "
+ + "GROUP BY NewAddedSVBytesDimension";
response = postQuery(pinotQuery);
rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 1);
@@ -1876,11 +1984,12 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
assertEquals(row.get(1).asDouble(), 0.0);
}
- pinotQuery =
- "SELECT "
- + (useMultiStageQueryEngine() ?
"arrayToMV(NewAddedMVLongDimension)" : "NewAddedMVLongDimension")
- + ", SUM(NewAddedIntMetric) FROM mytable GROUP BY "
- + (useMultiStageQueryEngine() ?
"arrayToMV(NewAddedMVLongDimension)" : "NewAddedMVLongDimension");
+ //@formatter:off
+ pinotQuery = "SELECT "
+ + (useMultiStageQueryEngine() ? "arrayToMV(NewAddedMVLongDimension)" :
"NewAddedMVLongDimension")
+ + ", SUM(NewAddedIntMetric) FROM mytable GROUP BY "
+ + (useMultiStageQueryEngine() ? "arrayToMV(NewAddedMVLongDimension)" :
"NewAddedMVLongDimension");
+ //@formatter:on
response = postQuery(pinotQuery);
rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 1);
@@ -1889,8 +1998,10 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
assertEquals(row.get(0).asLong(), Long.MIN_VALUE);
assertEquals(row.get(1).asDouble(), numTotalDocsInDouble);
+ //@formatter:off
String newAddedDimensions = useMultiStageQueryEngine()
- ? "arrayToMV(NewAddedMVIntDimension),
arrayToMV(NewAddedMVLongDimension), arrayToMV(NewAddedMVFloatDimension), "
+ ?
+ "arrayToMV(NewAddedMVIntDimension),
arrayToMV(NewAddedMVLongDimension), arrayToMV(NewAddedMVFloatDimension), "
+ "arrayToMV(NewAddedMVDoubleDimension),
arrayToMV(NewAddedMVBooleanDimension), "
+ "arrayToMV(NewAddedMVTimestampDimension),
arrayToMV(NewAddedMVStringDimension), "
+ "NewAddedSVJSONDimension, NewAddedSVBytesDimension"
@@ -1898,6 +2009,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
"NewAddedMVIntDimension, NewAddedMVLongDimension,
NewAddedMVFloatDimension, NewAddedMVDoubleDimension, "
+ "NewAddedMVBooleanDimension, NewAddedMVTimestampDimension,
NewAddedMVStringDimension, "
+ "NewAddedSVJSONDimension, NewAddedSVBytesDimension";
+ //@formatter:on
pinotQuery = "SELECT " + newAddedDimensions + ", SUM(NewAddedIntMetric),
SUM(NewAddedLongMetric), "
+ "SUM(NewAddedFloatMetric), SUM(NewAddedDoubleMetric) FROM mytable
GROUP BY " + newAddedDimensions;
response = postQuery(pinotQuery);
@@ -2111,7 +2223,8 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
}
@Test
- public void testGroupByUDFV1() throws Exception {
+ public void testGroupByUDFV1()
+ throws Exception {
setUseMultiStageQueryEngine(false);
String query = "SELECT add(DaysSinceEpoch,DaysSinceEpoch,15), COUNT(*)
FROM mytable "
+ "GROUP BY add(DaysSinceEpoch,DaysSinceEpoch,15) ORDER BY COUNT(*)
DESC";
@@ -2154,7 +2267,8 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
}
@Test
- public void testGroupByUDFV2() throws Exception {
+ public void testGroupByUDFV2()
+ throws Exception {
setUseMultiStageQueryEngine(true);
String query = "SELECT add(DaysSinceEpoch,add(DaysSinceEpoch,15)),
COUNT(*) FROM mytable "
+ "GROUP BY add(DaysSinceEpoch,add(DaysSinceEpoch,15)) ORDER BY
COUNT(*) DESC";
@@ -2259,6 +2373,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
"select DaysSinceEpoch, count(*) as num_trips from mytable GROUP BY
DaysSinceEpoch order by DaysSinceEpoch";
JsonNode tmpTableResult =
postQuery(tmpTableQuery).get("resultTable").get("rows");
+ //@formatter:off
String query = "WITH tmp AS (\n"
+ " select count(*) as num_trips, DaysSinceEpoch from mytable GROUP
BY DaysSinceEpoch\n"
+ ")\n"
@@ -2270,6 +2385,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
+ " num_trips - LAG(num_trips, 2) OVER (ORDER BY DaysSinceEpoch) AS
difference\n"
+ "FROM\n"
+ " tmp";
+ //@formatter:on
JsonNode response = postQuery(query);
JsonNode resultTable = response.get("resultTable");
assertEquals(resultTable.get("dataSchema").get("columnDataTypes").toString(),
@@ -2295,6 +2411,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
assertEquals(row.get(1).asLong() - row.get(2).asLong(),
row.get(3).asLong());
}
+ //@formatter:off
query = "WITH tmp AS (\n"
+ " select count(*) as num_trips, DaysSinceEpoch from mytable GROUP
BY DaysSinceEpoch\n"
+ ")\n"
@@ -2306,6 +2423,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
+ " num_trips - LAG(num_trips, '2') OVER (ORDER BY DaysSinceEpoch)
AS difference\n"
+ "FROM\n"
+ " tmp";
+ //@formatter:on
response = postQuery(query);
resultTable = response.get("resultTable");
assertEquals(resultTable.get("dataSchema").get("columnDataTypes").toString(),
@@ -3121,11 +3239,13 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
throws Exception {
setUseMultiStageQueryEngine(true);
// language=sql
+ //@formatter:off
String query1 = "EXPLAIN PLAN WITHOUT IMPLEMENTATION FOR "
+ "SELECT count(*) AS count, Carrier AS name "
+ "FROM mytable "
+ "GROUP BY name "
+ "ORDER BY 1";
+ //@formatter:on
String response1 = postQuery(query1).get("resultTable").toString();
// Replace string "docs:[0-9]+" with "docs:*" so that test doesn't fail
when number of documents change. This is
@@ -3548,7 +3668,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
@Test(dataProvider = "useBothQueryEngines")
public void testFilteredAggregationWithGroupByOrdering(boolean
useMultiStageQueryEngine)
- throws Exception {
+ throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
// Test the ordering is correctly applied to the correct aggregation (the
one without FILTER clause)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
index cb1fb992b3..bd4506e2cf 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
@@ -93,10 +93,7 @@ public class BigDecimalColumnPreIndexStatsCollector extends
AbstractColumnStatis
@Override
public int getLengthOfLargestElement() {
- if (_sealed) {
- return _maxLength;
- }
- throw new IllegalStateException("you must seal the collector first before
asking for longest value");
+ return _maxLength;
}
@Override
@@ -106,10 +103,7 @@ public class BigDecimalColumnPreIndexStatsCollector
extends AbstractColumnStatis
@Override
public int getCardinality() {
- if (_sealed) {
- return _sortedValues.length;
- }
- throw new IllegalStateException("you must seal the collector first before
asking for cardinality");
+ return _sealed ? _sortedValues.length : _values.size();
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
index 34cb0ecc1e..0fca8a3c77 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
@@ -105,10 +105,7 @@ public class BytesColumnPredIndexStatsCollector extends
AbstractColumnStatistics
@Override
public int getLengthOfLargestElement() {
- if (_sealed) {
- return _maxLength;
- }
- throw new IllegalStateException("you must seal the collector first before
asking for longest value");
+ return _maxLength;
}
@Override
@@ -118,10 +115,7 @@ public class BytesColumnPredIndexStatsCollector extends
AbstractColumnStatistics
@Override
public int getCardinality() {
- if (_sealed) {
- return _sortedValues.length;
- }
- throw new IllegalStateException("you must seal the collector first before
asking for cardinality");
+ return _sealed ? _sortedValues.length : _values.size();
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
index 6e9de875b8..9edd89b027 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
@@ -101,10 +101,7 @@ public class DoubleColumnPreIndexStatsCollector extends
AbstractColumnStatistics
@Override
public int getCardinality() {
- if (_sealed) {
- return _sortedValues.length;
- }
- throw new IllegalStateException("you must seal the collector first before
asking for cardinality");
+ return _sealed ? _sortedValues.length : _values.size();
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
index ce060c7fe4..4be248124e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
@@ -101,10 +101,7 @@ public class FloatColumnPreIndexStatsCollector extends
AbstractColumnStatisticsC
@Override
public int getCardinality() {
- if (_sealed) {
- return _sortedValues.length;
- }
- throw new IllegalStateException("you must seal the collector first before
asking for cardinality");
+ return _sealed ? _sortedValues.length : _values.size();
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
index 5e3bd03320..29678f6805 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
@@ -101,10 +101,7 @@ public class IntColumnPreIndexStatsCollector extends
AbstractColumnStatisticsCol
@Override
public int getCardinality() {
- if (_sealed) {
- return _sortedValues.length;
- }
- throw new IllegalStateException("you must seal the collector first before
asking for cardinality");
+ return _sealed ? _sortedValues.length : _values.size();
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
index 0e17dd7439..3d4c63d03b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
@@ -101,10 +101,7 @@ public class LongColumnPreIndexStatsCollector extends
AbstractColumnStatisticsCo
@Override
public int getCardinality() {
- if (_sealed) {
- return _sortedValues.length;
- }
- throw new IllegalStateException("you must seal the collector first before
asking for cardinality");
+ return _sealed ? _sortedValues.length : _values.size();
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
index 478ab27889..5198031a65 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
@@ -18,8 +18,8 @@
*/
package org.apache.pinot.segment.local.segment.creator.impl.stats;
-import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
-import java.util.HashMap;
+import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
+import java.util.Arrays;
import java.util.Map;
import org.apache.pinot.common.utils.PinotDataType;
import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
@@ -44,26 +44,19 @@ import
org.apache.pinot.spi.utils.builder.TableConfigBuilder;
* Assumptions that are made:
* 1. Each key has a single type for the value's associated with it across all
documents.
* 2. At this point in the Pinot process, the type consistency of a key
should already be enforced, so if a
- * heterogenous value types for a key are encountered will constructing the
Map statistics it can be raised as a
- * fault.
+ * heterogeneous value types for a key are encountered will construct the Map
statistics it can be raised as a fault.
*/
public class MapColumnPreIndexStatsCollector extends
AbstractColumnStatisticsCollector {
- private ObjectOpenHashSet<String> _keys = new
ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
- private final HashMap<String, AbstractColumnStatisticsCollector> _keyStats;
+ private final Object2ObjectOpenHashMap<String,
AbstractColumnStatisticsCollector> _keyStats =
+ new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
private String[] _sortedValues;
private int _minLength = Integer.MAX_VALUE;
private int _maxLength = 0;
- private int _maxRowLength = 0;
- private String _minValue = null;
- private String _maxValue = null;
private boolean _sealed = false;
- private final String _column;
public MapColumnPreIndexStatsCollector(String column, StatsCollectorConfig
statsCollectorConfig) {
super(column, statsCollectorConfig);
- super._sorted = false;
- _keyStats = new HashMap<>();
- _column = column;
+ _sorted = false;
}
public AbstractColumnStatisticsCollector getKeyStatistics(String key) {
@@ -75,40 +68,24 @@ public class MapColumnPreIndexStatsCollector extends
AbstractColumnStatisticsCol
assert !_sealed;
if (entry instanceof Map) {
- final Map<String, Object> mapValue = (Map<String, Object>) entry;
- byte[] serializeMap = MapUtils.serializeMap(mapValue);
+ //noinspection unchecked
+ Map<String, Object> mapValue = (Map<String, Object>) entry;
+ int length = MapUtils.serializeMap(mapValue).length;
+ _minLength = Math.min(_minLength, length);
+ _maxLength = Math.max(_maxLength, length);
- _maxRowLength = Math.max(_maxRowLength, serializeMap.length);
for (Map.Entry<String, Object> mapValueEntry : mapValue.entrySet()) {
- final String key = mapValueEntry.getKey();
-
- // Record statistics about the key
- int length = serializeMap.length;
- if (_keys.add(key)) {
+ String key = mapValueEntry.getKey();
+ Object value = mapValueEntry.getValue();
+ AbstractColumnStatisticsCollector keyStats = _keyStats.get(key);
+ if (keyStats == null) {
+ keyStats = createKeyStatsCollector(key, value);
+ _keyStats.put(key, keyStats);
if (isPartitionEnabled()) {
updatePartition(key);
}
- if (_minValue == null) {
- _minValue = key;
- } else {
- if (key.compareTo(_minValue) < 0) {
- _minValue = key;
- }
- }
- if (_maxValue == null) {
- _maxValue = key;
- } else {
- if (key.compareTo(_maxValue) > 0) {
- _maxValue = key;
- }
- }
- _minLength = Math.min(_minLength, length);
- _maxLength = Math.max(_maxLength, length);
}
-
- // Record statistics about the value within the key
- AbstractColumnStatisticsCollector keyStats =
getOrCreateKeyStatsCollector(key, mapValueEntry.getValue());
- keyStats.collect(mapValueEntry.getValue());
+ keyStats.collect(value);
}
_totalNumberOfEntries++;
} else {
@@ -119,7 +96,7 @@ public class MapColumnPreIndexStatsCollector extends
AbstractColumnStatisticsCol
@Override
public String getMinValue() {
if (_sealed) {
- return _minValue;
+ return _sortedValues[0];
}
throw new IllegalStateException("you must seal the collector first before
asking for min value");
}
@@ -127,7 +104,7 @@ public class MapColumnPreIndexStatsCollector extends
AbstractColumnStatisticsCol
@Override
public String getMaxValue() {
if (_sealed) {
- return _maxValue;
+ return _sortedValues[_sortedValues.length - 1];
}
throw new IllegalStateException("you must seal the collector first before
asking for max value");
}
@@ -147,30 +124,24 @@ public class MapColumnPreIndexStatsCollector extends
AbstractColumnStatisticsCol
@Override
public int getLengthOfLargestElement() {
- if (_sealed) {
- return _maxLength;
- }
- throw new IllegalStateException("you must seal the collector first before
asking for longest value");
+ return _maxLength;
}
@Override
public int getMaxRowLengthInBytes() {
- return _maxRowLength;
+ return _maxLength;
}
@Override
public int getCardinality() {
- if (_sealed) {
- return _sortedValues.length;
- }
- throw new IllegalStateException("you must seal the collector first before
asking for cardinality");
+ return _keyStats.size();
}
@Override
public void seal() {
if (!_sealed) {
- _sortedValues = _keys.stream().sorted().toArray(String[]::new);
- _keys = null;
+ _sortedValues = _keyStats.keySet().toArray(new String[0]);
+ Arrays.sort(_sortedValues);
// Iterate through every key stats collector and seal them
for (AbstractColumnStatisticsCollector keyStatsCollector :
_keyStats.values()) {
@@ -186,79 +157,31 @@ public class MapColumnPreIndexStatsCollector extends
AbstractColumnStatisticsCol
*
* NOTE: this could raise an issue if there are millions of keys with very
few values (Sparse keys, in other words).
* So a less memory intensive option may be better for this.
- *
- * @param key
- * @param value
- * @return
*/
- private AbstractColumnStatisticsCollector
getOrCreateKeyStatsCollector(String key, Object value) {
- // Check if the key stats collector exists just return it
- if (!_keyStats.containsKey(key)) {
- // Get the type of the value
- PinotDataType type = PinotDataType.getSingleValueType(value.getClass());
- TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
- .setTableName(_column)
- .build();
- Schema keySchema = new Schema.SchemaBuilder()
- .setSchemaName(key)
- .addField(new DimensionFieldSpec(key, convertToDataType(type),
false))
- .build();
- StatsCollectorConfig config = new StatsCollectorConfig(tableConfig,
keySchema, null);
-
- AbstractColumnStatisticsCollector keyStatsCollector = null;
- switch (type) {
- case INTEGER:
- keyStatsCollector = new IntColumnPreIndexStatsCollector(key, config);
- break;
- case LONG:
- keyStatsCollector = new LongColumnPreIndexStatsCollector(key,
config);
- break;
- case FLOAT:
- keyStatsCollector = new FloatColumnPreIndexStatsCollector(key,
config);
- break;
- case DOUBLE:
- keyStatsCollector = new DoubleColumnPreIndexStatsCollector(key,
config);
- break;
- case BIG_DECIMAL:
- keyStatsCollector = new BigDecimalColumnPreIndexStatsCollector(key,
config);
- break;
- case STRING:
- keyStatsCollector = new StringColumnPreIndexStatsCollector(key,
config);
- break;
- case TIMESTAMP:
- case BOOLEAN:
- case BYTE:
- case CHARACTER:
- case SHORT:
- case JSON:
- case BYTES:
- case OBJECT:
- case MAP:
- case BYTE_ARRAY:
- case CHARACTER_ARRAY:
- case SHORT_ARRAY:
- case PRIMITIVE_INT_ARRAY:
- case INTEGER_ARRAY:
- case PRIMITIVE_LONG_ARRAY:
- case LONG_ARRAY:
- case PRIMITIVE_FLOAT_ARRAY:
- case FLOAT_ARRAY:
- case PRIMITIVE_DOUBLE_ARRAY:
- case DOUBLE_ARRAY:
- case BOOLEAN_ARRAY:
- case TIMESTAMP_ARRAY:
- case STRING_ARRAY:
- case BYTES_ARRAY:
- case COLLECTION:
- case OBJECT_ARRAY:
- default:
- throw new UnsupportedOperationException(String.format("MAP column
does not yet support '%s'", type));
- }
-
- _keyStats.put(key, keyStatsCollector);
+ private AbstractColumnStatisticsCollector createKeyStatsCollector(String
key, Object value) {
+ // Get the type of the value
+ PinotDataType type = PinotDataType.getSingleValueType(value.getClass());
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(key).build();
+ Schema keySchema = new Schema.SchemaBuilder().setSchemaName(key)
+ .addField(new DimensionFieldSpec(key, convertToDataType(type),
false)).build();
+ StatsCollectorConfig config = new StatsCollectorConfig(tableConfig,
keySchema, null);
+
+ switch (type) {
+ case INTEGER:
+ return new IntColumnPreIndexStatsCollector(key, config);
+ case LONG:
+ return new LongColumnPreIndexStatsCollector(key, config);
+ case FLOAT:
+ return new FloatColumnPreIndexStatsCollector(key, config);
+ case DOUBLE:
+ return new DoubleColumnPreIndexStatsCollector(key, config);
+ case BIG_DECIMAL:
+ return new BigDecimalColumnPreIndexStatsCollector(key, config);
+ case STRING:
+ return new StringColumnPreIndexStatsCollector(key, config);
+ default:
+ throw new UnsupportedOperationException(String.format("MAP column does
not yet support '%s'", type));
}
-
- return _keyStats.get(key);
}
static FieldSpec.DataType convertToDataType(PinotDataType ty) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
index d606c784d2..d43adbe3c7 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
@@ -125,10 +125,7 @@ public class StringColumnPreIndexStatsCollector extends
AbstractColumnStatistics
@Override
public int getLengthOfLargestElement() {
- if (_sealed) {
- return _maxLength;
- }
- throw new IllegalStateException("you must seal the collector first before
asking for longest value");
+ return _maxLength;
}
@Override
@@ -138,10 +135,7 @@ public class StringColumnPreIndexStatsCollector extends
AbstractColumnStatistics
@Override
public int getCardinality() {
- if (_sealed) {
- return _sortedValues.length;
- }
- throw new IllegalStateException("you must seal the collector first before
asking for cardinality");
+ return _sealed ? _sortedValues.length : _values.size();
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index 4e3ab129c2..3b654a7ebc 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.segment.index.loader;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
+import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collections;
@@ -65,6 +66,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -277,9 +279,9 @@ public class ForwardIndexHandler extends BaseIndexHandler {
// If the dictionary is not enabled on the existing column it must be
on the new noDictionary column list.
// Cannot enable the dictionary for a column with forward index
disabled.
- Preconditions.checkState(existingHasDict || !newIsDict,
- String.format("Cannot regenerate the dictionary for column %s with
forward index disabled. Please "
- + "refresh or back-fill the data to add back the forward
index", column));
+ Preconditions.checkState(existingHasDict || !newIsDict, String.format(
+ "Cannot regenerate the dictionary for column %s with forward index
disabled. Please refresh or back-fill "
+ + "the data to add back the forward index", column));
if (existingHasDict && !newIsDict) {
// Dictionary is currently enabled on this column but is supposed to
be disabled. Remove the dictionary
@@ -459,27 +461,33 @@ public class ForwardIndexHandler extends BaseIndexHandler
{
SegmentDirectory.Writer segmentWriter)
throws Exception {
try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter,
columnMetadata)) {
- int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
- IndexCreationContext context;
- if (columnMetadata.isSingleValue()) {
- context =
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
- .withLengthOfLongestEntry(lengthOfLongestEntry).build();
- } else {
- // For VarByte MV columns like String and Bytes, the storage
representation of each row contains the following
- // components:
- // 1. bytes required to store the actual elements of the MV row (A)
- // 2. bytes required to store the number of elements in the MV row (B)
- // 3. bytes required to store the length of each MV element (C)
- //
- // lengthOfLongestEntry = A + B + C
- // maxRowLengthInBytes = A
- int maxNumValuesPerEntry = columnMetadata.getMaxNumberOfMultiValues();
- int maxRowLengthInBytes =
-
MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry,
maxNumValuesPerEntry);
- context =
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
-
.withLengthOfLongestEntry(lengthOfLongestEntry).withMaxRowLengthInBytes(maxRowLengthInBytes).build();
+ IndexCreationContext.Builder builder =
+
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata);
+ // Set entry length info for raw index creators. No need to set this
when changing dictionary id compression type.
+ if (!reader.isDictionaryEncoded() &&
!columnMetadata.getDataType().getStoredType().isFixedWidth()) {
+ int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
+ if (lengthOfLongestEntry < 0) {
+ // When this info is not available from the reader, we need to scan
the column.
+ lengthOfLongestEntry = getMaxRowLength(columnMetadata, reader, null);
+ }
+ if (columnMetadata.isSingleValue()) {
+ builder.withLengthOfLongestEntry(lengthOfLongestEntry);
+ } else {
+ // For VarByte MV columns like String and Bytes, the storage
representation of each row contains the following
+ // components:
+ // 1. bytes required to store the actual elements of the MV row (A)
+ // 2. bytes required to store the number of elements in the MV row
(B)
+ // 3. bytes required to store the length of each MV element (C)
+ //
+ // lengthOfLongestEntry = A + B + C
+ // maxRowLengthInBytes = A
+ int maxNumValuesPerEntry =
columnMetadata.getMaxNumberOfMultiValues();
+ int maxRowLengthInBytes =
+
MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry,
maxNumValuesPerEntry);
+ builder.withMaxRowLengthInBytes(maxRowLengthInBytes);
+ }
}
-
+ IndexCreationContext context = builder.build();
ForwardIndexConfig config =
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
try (ForwardIndexCreator creator =
StandardIndexes.forward().createIndexCreator(context, config)) {
if (!reader.getStoredType().equals(creator.getValueType())) {
@@ -645,7 +653,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
Dictionary dictionaryReader) {
C readerContext = reader.createContext();
boolean isSVColumn = reader.isSingleValue();
- FieldSpec.DataType storedType =
dictionaryReader.getValueType().getStoredType();
+ DataType storedType = dictionaryReader.getValueType().getStoredType();
switch (storedType) {
case INT: {
@@ -876,10 +884,8 @@ public class ForwardIndexHandler extends BaseIndexHandler {
SegmentDirectory.Writer segmentWriter, File indexDir,
SegmentDictionaryCreator dictionaryCreator)
throws Exception {
try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter,
existingColMetadata)) {
- int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
IndexCreationContext.Builder builder =
-
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
- .withLengthOfLongestEntry(lengthOfLongestEntry);
+
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata);
// existingColMetadata has dictEnable=false. Overwrite the value.
builder.withDictionary(true);
IndexCreationContext context = builder.build();
@@ -923,7 +929,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
}
LOGGER.info("Creating raw forward index for segment={} and column={}",
segmentName, column);
- rewriteDictToRawForwardIndex(column, existingColMetadata, segmentWriter,
indexDir);
+ rewriteDictToRawForwardIndex(existingColMetadata, segmentWriter, indexDir);
// Remove dictionary and forward index
segmentWriter.removeIndex(column, StandardIndexes.forward());
@@ -946,100 +952,80 @@ public class ForwardIndexHandler extends
BaseIndexHandler {
LOGGER.info("Created raw based forward index for segment: {}, column: {}",
segmentName, column);
}
- private void rewriteDictToRawForwardIndex(String column, ColumnMetadata
existingColMetadata,
- SegmentDirectory.Writer segmentWriter, File indexDir)
+ private void rewriteDictToRawForwardIndex(ColumnMetadata columnMetadata,
SegmentDirectory.Writer segmentWriter,
+ File indexDir)
throws Exception {
- try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter,
existingColMetadata)) {
- Dictionary dictionary = DictionaryIndexType.read(segmentWriter,
existingColMetadata);
+ String column = columnMetadata.getColumnName();
+ try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter,
columnMetadata)) {
+ Dictionary dictionary = DictionaryIndexType.read(segmentWriter,
columnMetadata);
IndexCreationContext.Builder builder =
-
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata);
+
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata);
builder.withDictionary(false);
-
- if (existingColMetadata.isSingleValue()) {
- // lengthOfLongestEntry is available for dict columns from metadata.
- int lengthOfLongestEntry = existingColMetadata.getColumnMaxLength();
- builder.withLengthOfLongestEntry(lengthOfLongestEntry);
- } else {
- FieldSpec.DataType dataType = existingColMetadata.getDataType();
- boolean isFixedWidth = dataType.getStoredType().isFixedWidth();
-
- if (!isFixedWidth) {
- // For variable length stored types, maxRowLengthInBytes is required
to create the forwardIndexCreator.
- // This can only be determined by reading the entire MV forward
index.
- int maxRowLength = getMaxRowLengthForMVColumn(column, reader,
dictionary);
- builder.withMaxRowLengthInBytes(maxRowLength);
+ if (!columnMetadata.getDataType().getStoredType().isFixedWidth()) {
+ if (columnMetadata.isSingleValue()) {
+ // lengthOfLongestEntry is available for dict columns from metadata.
+
builder.withLengthOfLongestEntry(columnMetadata.getColumnMaxLength());
+ } else {
+ // maxRowLength can only be determined by scanning the column.
+ builder.withMaxRowLengthInBytes(getMaxRowLength(columnMetadata,
reader, dictionary));
}
}
-
IndexCreationContext context = builder.build();
ForwardIndexConfig config =
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
-
try (ForwardIndexCreator creator =
StandardIndexes.forward().createIndexCreator(context, config)) {
- int numDocs = existingColMetadata.getTotalDocs();
- forwardIndexRewriteHelper(column, existingColMetadata, reader,
creator, numDocs, null, dictionary);
+ forwardIndexRewriteHelper(column, columnMetadata, reader, creator,
columnMetadata.getTotalDocs(), null,
+ dictionary);
}
}
}
- private int getMaxRowLengthForMVColumn(String column, ForwardIndexReader<?>
reader, Dictionary dictionary)
- throws Exception {
- ColumnMetadata existingColMetadata =
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
- AbstractColumnStatisticsCollector statsCollector =
- getStatsCollector(column, dictionary.getValueType().getStoredType());
- PinotSegmentColumnReader columnReader =
- new PinotSegmentColumnReader(reader, dictionary, null,
existingColMetadata.getMaxNumberOfMultiValues());
- int numDocs = existingColMetadata.getTotalDocs();
-
-
Preconditions.checkState(!existingColMetadata.getDataType().getStoredType().isFixedWidth(),
- "Column " + column + "is not a fixed width column.");
- Preconditions.checkState(!existingColMetadata.isSingleValue(), "Column " +
column + "is not MV.");
-
- for (int i = 0; i < numDocs; i++) {
- Object obj = columnReader.getValue(i);
- statsCollector.collect(obj);
+ /**
+ * Returns the max row length for a column.
+ * - For SV column, this is the length of the longest value.
+ * - For MV column, this is the length of the longest MV entry (sum of
lengths of all elements).
+ */
+ private int getMaxRowLength(ColumnMetadata columnMetadata,
ForwardIndexReader<?> forwardIndex,
+ @Nullable Dictionary dictionary)
+ throws IOException {
+ String column = columnMetadata.getColumnName();
+ DataType storedType = columnMetadata.getDataType().getStoredType();
+ assert !storedType.isFixedWidth();
+
+ try (PinotSegmentColumnReader columnReader = new
PinotSegmentColumnReader(forwardIndex, dictionary, null,
+ columnMetadata.getMaxNumberOfMultiValues())) {
+ AbstractColumnStatisticsCollector statsCollector =
getStatsCollector(column, storedType);
+ int numDocs = columnMetadata.getTotalDocs();
+ for (int i = 0; i < numDocs; i++) {
+ statsCollector.collect(columnReader.getValue(i));
+ }
+ // NOTE: No need to seal the stats collector because value length is
updated while collecting stats.
+ return columnMetadata.isSingleValue() ?
statsCollector.getLengthOfLargestElement()
+ : statsCollector.getMaxRowLengthInBytes();
}
-
- statsCollector.seal();
- return statsCollector.getMaxRowLengthInBytes();
}
- private AbstractColumnStatisticsCollector getStatsCollector(String column,
FieldSpec.DataType storedType)
- throws Exception {
+ private AbstractColumnStatisticsCollector getStatsCollector(String column,
DataType storedType) {
StatsCollectorConfig statsCollectorConfig = new
StatsCollectorConfig(_tableConfig, _schema, null);
- AbstractColumnStatisticsCollector statsCollector;
-
- // TODO(Vivek): Check if checking stored type will be problematic if
column has dictionary. ie will dictionary
- // return INT as the stored type.
switch (storedType) {
case INT:
- statsCollector = new IntColumnPreIndexStatsCollector(column,
statsCollectorConfig);
- break;
+ return new IntColumnPreIndexStatsCollector(column,
statsCollectorConfig);
case LONG:
- statsCollector = new LongColumnPreIndexStatsCollector(column,
statsCollectorConfig);
- break;
+ return new LongColumnPreIndexStatsCollector(column,
statsCollectorConfig);
case FLOAT:
- statsCollector = new FloatColumnPreIndexStatsCollector(column,
statsCollectorConfig);
- break;
+ return new FloatColumnPreIndexStatsCollector(column,
statsCollectorConfig);
case DOUBLE:
- statsCollector = new DoubleColumnPreIndexStatsCollector(column,
statsCollectorConfig);
- break;
+ return new DoubleColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ case BIG_DECIMAL:
+ return new BigDecimalColumnPreIndexStatsCollector(column,
statsCollectorConfig);
case STRING:
- statsCollector = new StringColumnPreIndexStatsCollector(column,
statsCollectorConfig);
- break;
+ return new StringColumnPreIndexStatsCollector(column,
statsCollectorConfig);
case BYTES:
- statsCollector = new BytesColumnPredIndexStatsCollector(column,
statsCollectorConfig);
- break;
+ return new BytesColumnPredIndexStatsCollector(column,
statsCollectorConfig);
case MAP:
- statsCollector = new MapColumnPreIndexStatsCollector(column,
statsCollectorConfig);
- break;
- case BIG_DECIMAL:
- statsCollector = new BigDecimalColumnPreIndexStatsCollector(column,
statsCollectorConfig);
- break;
+ return new MapColumnPreIndexStatsCollector(column,
statsCollectorConfig);
default:
- throw new IllegalStateException("Unsupported storedType=" +
storedType.toString() + " for column=" + column);
+ throw new IllegalStateException("Unsupported storedType=" + storedType
+ " for column=" + column);
}
-
- return statsCollector;
}
private boolean hasIndex(String column, IndexType<?, ?, ?> indexType) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
index 93bc517dff..0125777777 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
@@ -39,7 +39,6 @@ public class PinotSegmentColumnReader implements Closeable {
private final Dictionary _dictionary;
private final NullValueVectorReader _nullValueVectorReader;
private final int[] _dictIdBuffer;
- private final int _maxNumValuesPerMVEntry;
public PinotSegmentColumnReader(IndexSegment indexSegment, String column) {
DataSource dataSource = indexSegment.getDataSource(column);
@@ -51,11 +50,10 @@ public class PinotSegmentColumnReader implements Closeable {
_nullValueVectorReader = dataSource.getNullValueVector();
if (_forwardIndexReader.isSingleValue()) {
_dictIdBuffer = null;
- _maxNumValuesPerMVEntry = -1;
} else {
- _maxNumValuesPerMVEntry =
dataSource.getDataSourceMetadata().getMaxNumValuesPerMVEntry();
- Preconditions.checkState(_maxNumValuesPerMVEntry >= 0,
"maxNumValuesPerMVEntry is negative for an MV column.");
- _dictIdBuffer = new int[_maxNumValuesPerMVEntry];
+ int maxNumValuesPerMVEntry =
dataSource.getDataSourceMetadata().getMaxNumValuesPerMVEntry();
+ Preconditions.checkState(maxNumValuesPerMVEntry >= 0,
"maxNumValuesPerMVEntry is negative for an MV column.");
+ _dictIdBuffer = new int[maxNumValuesPerMVEntry];
}
}
@@ -65,7 +63,6 @@ public class PinotSegmentColumnReader implements Closeable {
_forwardIndexReaderContext = _forwardIndexReader.createContext();
_dictionary = dictionary;
_nullValueVectorReader = nullValueVectorReader;
- _maxNumValuesPerMVEntry = maxNumValuesPerMVEntry;
if (_forwardIndexReader.isSingleValue()) {
_dictIdBuffer = null;
} else {
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
index 2a3040cd20..2de3f12390 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.segment.index.loader;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
@@ -48,6 +49,7 @@ import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.IndexType;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -62,6 +64,7 @@ import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.AfterMethod;
@@ -1017,6 +1020,81 @@ public class ForwardIndexHandlerTest {
}
}
+ @Test
+ public void testChangeCompressionAndIndexVersion()
+ throws Exception {
+ List<String> columns = new ArrayList<>(RAW_SNAPPY_COLUMNS.size() +
RAW_SORTED_COLUMNS.size());
+ columns.addAll(RAW_SNAPPY_COLUMNS);
+ columns.addAll(RAW_SORTED_COLUMNS);
+ for (String column : columns) {
+ // Convert from SNAPPY v2 to LZ4 v4
+ SegmentMetadataImpl existingSegmentMetadata;
+ FieldConfig existingFieldConfig;
+ try (SegmentDirectory segmentDirectory = new
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
+ SegmentDirectory.Writer writer = segmentDirectory.createWriter()) {
+ _segmentDirectory = segmentDirectory;
+ _writer = writer;
+
+ existingSegmentMetadata = segmentDirectory.getSegmentMetadata();
+ ForwardIndexConfig forwardIndexConfig =
+ new
ForwardIndexConfig.Builder().withCompressionCodec(CompressionCodec.LZ4).withRawIndexWriterVersion(4)
+ .build();
+ ObjectNode indexes = JsonUtils.newObjectNode();
+ indexes.set("forward", forwardIndexConfig.toJsonNode());
+ FieldConfig fieldConfig =
+ new
FieldConfig.Builder(column).withEncodingType(FieldConfig.EncodingType.RAW).withIndexes(indexes).build();
+ existingFieldConfig = _fieldConfigMap.put(column, fieldConfig);
+ assertNotNull(existingFieldConfig);
+ ForwardIndexHandler handler = createForwardIndexHandler();
+ assertTrue(handler.needUpdateIndices(writer));
+ handler.updateIndices(writer);
+ handler.postUpdateIndicesCleanup(writer);
+ // Tear down before validation. Because columns.psf and index map
cleanup happens at segmentDirectory.close()
+ }
+
+ // Validation
+ ColumnMetadata metadata =
existingSegmentMetadata.getColumnMetadataFor(column);
+ testIndexExists(column, StandardIndexes.forward());
+ validateIndexMap(column, false, false);
+ validateForwardIndex(column, CompressionCodec.LZ4, metadata.isSorted());
+
+ // Validate metadata properties. Nothing should change when a
forwardIndex is rewritten for compressionType
+ // change.
+ validateMetadataProperties(column, metadata.hasDictionary(),
metadata.getColumnMaxLength(),
+ metadata.getCardinality(), metadata.getTotalDocs(),
metadata.getDataType(), metadata.getFieldType(),
+ metadata.isSorted(), metadata.isSingleValue(),
metadata.getMaxNumberOfMultiValues(),
+ metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
metadata.getMinValue(),
+ metadata.getMaxValue(), false);
+
+ // Convert it back
+ try (SegmentDirectory segmentDirectory = new
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
+ SegmentDirectory.Writer writer = segmentDirectory.createWriter()) {
+ _segmentDirectory = segmentDirectory;
+ _writer = writer;
+ _fieldConfigMap.put(column, existingFieldConfig);
+ ForwardIndexHandler handler = createForwardIndexHandler();
+ assertTrue(handler.needUpdateIndices(writer));
+ handler.updateIndices(writer);
+ handler.postUpdateIndicesCleanup(writer);
+ // Tear down before validation. Because columns.psf and index map
cleanup happens at segmentDirectory.close()
+ }
+
+ // Validation
+ metadata = existingSegmentMetadata.getColumnMetadataFor(column);
+ testIndexExists(column, StandardIndexes.forward());
+ validateIndexMap(column, false, false);
+ validateForwardIndex(column, CompressionCodec.SNAPPY,
metadata.isSorted());
+
+ // Validate metadata properties. Nothing should change when a
forwardIndex is rewritten for compressionType
+ // change.
+ validateMetadataProperties(column, metadata.hasDictionary(),
metadata.getColumnMaxLength(),
+ metadata.getCardinality(), metadata.getTotalDocs(),
metadata.getDataType(), metadata.getFieldType(),
+ metadata.isSorted(), metadata.isSingleValue(),
metadata.getMaxNumberOfMultiValues(),
+ metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
metadata.getMinValue(),
+ metadata.getMaxValue(), false);
+ }
+ }
+
@Test
public void testChangeDictCompression()
throws Exception {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 19bed50b68..98a2a5aefe 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -170,8 +171,7 @@ public class ControllerRequestURLBuilder {
}
public String forTenantInstancesToggle(String tenant, String tenantType,
String state) {
- return StringUtil.join("/", _baseUrl, "tenants", tenant)
- + "?type=" + tenantType + "&state=" + state;
+ return StringUtil.join("/", _baseUrl, "tenants", tenant) + "?type=" +
tenantType + "&state=" + state;
}
public String forLiveBrokerTablesGet() {
@@ -297,6 +297,31 @@ public class ControllerRequestURLBuilder {
return StringUtil.join("/", _baseUrl, "tables", tableName, "externalview");
}
+ public String forTableAggregateMetadata(String tableName) {
+ return StringUtil.join("/", _baseUrl, "tables", tableName, "metadata");
+ }
+
+ public String forTableAggregateMetadata(String tableName, @Nullable
List<String> columns) {
+ return StringUtil.join("/", _baseUrl, "tables", tableName, "metadata") +
constructColumnsParameter(columns);
+ }
+
+ private String constructColumnsParameter(@Nullable List<String> columns) {
+ if (!CollectionUtils.isEmpty(columns)) {
+ StringBuilder parameter = new StringBuilder();
+ parameter.append("?columns=");
+ parameter.append(columns.get(0));
+ int numColumns = columns.size();
+ if (numColumns > 1) {
+ for (int i = 1; i < numColumns; i++) {
+ parameter.append("&columns=").append(columns.get(i));
+ }
+ }
+ return parameter.toString();
+ } else {
+ return "";
+ }
+ }
+
public String forSchemaValidate() {
return StringUtil.join("/", _baseUrl, "schemas", "validate");
}
@@ -368,9 +393,10 @@ public class ControllerRequestURLBuilder {
}
public String forSegmentsMetadataFromServer(String tableName) {
- return forSegmentsMetadataFromServer(tableName, null);
+ return forSegmentsMetadataFromServer(tableName, (List<String>) null);
}
+ @Deprecated
public String forSegmentsMetadataFromServer(String tableName, @Nullable
String columns) {
String url = StringUtil.join("/", _baseUrl, "segments", tableName,
"metadata");
if (columns != null) {
@@ -379,6 +405,10 @@ public class ControllerRequestURLBuilder {
return url;
}
+ public String forSegmentsMetadataFromServer(String tableName, @Nullable
List<String> columns) {
+ return StringUtil.join("/", _baseUrl, "segments", tableName, "metadata") +
constructColumnsParameter(columns);
+ }
+
public String forSegmentMetadata(String tableName, String segmentName) {
return StringUtil.join("/", _baseUrl, "segments", tableName,
encode(segmentName), "metadata");
}
@@ -441,8 +471,7 @@ public class ControllerRequestURLBuilder {
public String forDeleteMultipleSegments(String tableName, String tableType,
List<String> segments) {
StringBuilder fullUrl = new StringBuilder(
- StringUtil.join("?", StringUtil.join("/", _baseUrl, "segments",
tableName),
- "type=" + tableType));
+ StringUtil.join("?", StringUtil.join("/", _baseUrl, "segments",
tableName), "type=" + tableType));
for (String segment : segments) {
fullUrl.append("&segments=").append(segment);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]