This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9de57b01eb For consuming segment, avoid using setter in
IndexLoadingConfig (#14190)
9de57b01eb is described below
commit 9de57b01eb1502ea3851f15edd6c2aa52f50b698
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Oct 9 11:47:48 2024 -0700
For consuming segment, avoid using setter in IndexLoadingConfig (#14190)
---
.../realtime/RealtimeSegmentDataManager.java | 14 ++-----
.../tests/BaseClusterIntegrationTest.java | 7 ++++
.../tests/LLCRealtimeClusterIntegrationTest.java | 33 +++++++++++++++
.../tests/OfflineClusterIntegrationTest.java | 49 ++++++++++------------
.../converter/RealtimeSegmentConverter.java | 3 --
.../local/realtime/impl/RealtimeSegmentConfig.java | 23 +++++++---
.../segment/index/loader/IndexLoadingConfig.java | 8 ----
7 files changed, 82 insertions(+), 55 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index ecf5cb12cd..bed8f2a310 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -118,6 +118,7 @@ import org.slf4j.LoggerFactory;
/**
* Segment data manager for low level consumer realtime segments, which
manages consumption and segment completion.
*/
+@SuppressWarnings("jol")
public class RealtimeSegmentDataManager extends SegmentDataManager {
@VisibleForTesting
@@ -237,7 +238,6 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private final StreamDataDecoder _streamDataDecoder;
private final int _segmentMaxRowCount;
private final String _resourceDataDir;
- private final IndexLoadingConfig _indexLoadingConfig;
private final Schema _schema;
// Semaphore for each partitionGroupId only, which is to prevent two
different stream consumers
// from consuming with the same partitionGroupId in parallel in the same
host.
@@ -1446,7 +1446,6 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_tableNameWithType = _tableConfig.getTableName();
_realtimeTableDataManager = realtimeTableDataManager;
_resourceDataDir = resourceDataDir;
- _indexLoadingConfig = indexLoadingConfig;
_schema = schema;
_serverMetrics = serverMetrics;
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
@@ -1478,7 +1477,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_segmentZKMetadata.getStatus().toString());
_partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
_acquiredConsumerSemaphore = new AtomicBoolean(false);
- InstanceDataManagerConfig instanceDataManagerConfig =
_indexLoadingConfig.getInstanceDataManagerConfig();
+ InstanceDataManagerConfig instanceDataManagerConfig =
indexLoadingConfig.getInstanceDataManagerConfig();
String clientIdSuffix =
instanceDataManagerConfig != null ?
instanceDataManagerConfig.getConsumerClientIdSuffix() : null;
if (StringUtils.isNotBlank(clientIdSuffix)) {
@@ -1488,7 +1487,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
}
_segmentLogger =
LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" +
_segmentNameStr);
_tableStreamName = _tableNameWithType + "_" + streamTopic;
- if (_indexLoadingConfig.isRealtimeOffHeapAllocation() &&
!_indexLoadingConfig.isDirectRealtimeOffHeapAllocation()) {
+ if (indexLoadingConfig.isRealtimeOffHeapAllocation() &&
!indexLoadingConfig.isDirectRealtimeOffHeapAllocation()) {
_memoryManager =
new MmapMemoryManager(_realtimeTableDataManager.getConsumerDir(),
_segmentNameStr, _serverMetrics);
} else {
@@ -1526,13 +1525,6 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
sortedColumn = null;
}
}
- // Inverted index columns
- // We need to add sorted column into inverted index columns because when
we convert realtime in memory segment into
- // offline segment, we use sorted column's inverted index to maintain the
order of the records so that the records
- // are sorted on the sorted column.
- if (sortedColumn != null) {
- indexLoadingConfig.addInvertedIndexColumns(sortedColumn);
- }
// Read the max number of rows
int segmentMaxRowCount =
segmentZKMetadata.getSizeThresholdToFlushSegment();
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 591a0308dd..ffe846cf9c 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -759,4 +759,11 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
protected long getLongCellValue(JsonNode jsonNode, int colIndex, int
rowIndex) {
return getCellValue(jsonNode, colIndex, rowIndex,
JsonNode::asLong).longValue();
}
+
+ protected JsonNode getColumnIndexSize(String column)
+ throws Exception {
+ return JsonUtils.stringToJsonNode(
+
sendGetRequest(_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(),
List.of(column))))
+ .get("columnIndexSizeMap").get(column);
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 31dd2d26a3..78b34fc563 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -49,6 +49,7 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
import org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory;
import org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConsumer;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -73,6 +74,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -321,6 +323,37 @@ public class LLCRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegr
testReload(false);
}
+ @Test
+ public void testSortedColumn()
+ throws Exception {
+ // There should be no inverted index or range index sealed because the
sorted column is not configured with them
+ JsonNode columnIndexSize = getColumnIndexSize(getSortedColumn());
+ assertFalse(columnIndexSize.has(StandardIndexes.INVERTED_ID));
+ assertFalse(columnIndexSize.has(StandardIndexes.RANGE_ID));
+
+ // For point lookup query, there should be no scan from the
committed/consuming segments, but full scan from the
+ // uploaded segments:
+ // - Committed segments have sorted index
+ // - Consuming segments have inverted index
+ // - Uploaded segments have neither of them
+ String query = "SELECT COUNT(*) FROM myTable WHERE Carrier = 'DL'";
+ JsonNode response = postQuery(query);
+ long numEntriesScannedInFilter =
response.get("numEntriesScannedInFilter").asLong();
+ long numDocsInUploadedSegments = super.getCountStarResult();
+ assertEquals(numEntriesScannedInFilter, numDocsInUploadedSegments);
+
+ // For range query, there should be no scan from the committed segments,
but full scan from the uploaded/consuming
+ // segments:
+ // - Committed segments have sorted index
+ // - Consuming/Uploaded segments do not have sorted index
+ query = "SELECT COUNT(*) FROM myTable WHERE Carrier > 'DL'";
+ response = postQuery(query);
+ numEntriesScannedInFilter =
response.get("numEntriesScannedInFilter").asLong();
+ // NOTE: If this test is running after force commit test, there will be no
records in consuming segments
+ assertTrue(numEntriesScannedInFilter >= numDocsInUploadedSegments);
+ assertTrue(numEntriesScannedInFilter < 2 * numDocsInUploadedSegments);
+ }
+
@Test(dataProvider = "useBothQueryEngines")
public void testAddRemoveDictionaryAndInvertedIndex(boolean
useMultiStageQueryEngine)
throws Exception {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 97f39f66a4..25a75352f7 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1324,10 +1324,10 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
throws Exception {
String column = "DestCityName";
JsonNode columnIndexSize = getColumnIndexSize(column);
- assertTrue(columnIndexSize.has("dictionary"));
- assertTrue(columnIndexSize.has("forward_index"));
- double dictionarySize = columnIndexSize.get("dictionary").asDouble();
- double forwardIndexSize = columnIndexSize.get("forward_index").asDouble();
+ assertTrue(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
+ assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
+ double dictionarySize =
columnIndexSize.get(StandardIndexes.DICTIONARY_ID).asDouble();
+ double forwardIndexSize =
columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble();
// Convert 'DestCityName' to raw index
TableConfig tableConfig = getOfflineTableConfig();
@@ -1339,9 +1339,9 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
long numTotalDocs = getCountStarResult();
reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
columnIndexSize = getColumnIndexSize(column);
- assertFalse(columnIndexSize.has("dictionary"));
- assertTrue(columnIndexSize.has("forward_index"));
- double v2rawIndexSize = columnIndexSize.get("forward_index").asDouble();
+ assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
+ assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
+ double v2rawIndexSize =
columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble();
assertTrue(v2rawIndexSize > forwardIndexSize);
// NOTE: Currently Pinot doesn't support directly changing raw index
version, so we need to first reset it back to
@@ -1361,9 +1361,9 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
updateTableConfig(tableConfig);
reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
columnIndexSize = getColumnIndexSize(column);
- assertFalse(columnIndexSize.has("dictionary"));
- assertTrue(columnIndexSize.has("forward_index"));
- double v4RawIndexSize = columnIndexSize.get("forward_index").asDouble();
+ assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
+ assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
+ double v4RawIndexSize =
columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble();
assertTrue(v4RawIndexSize < v2rawIndexSize && v4RawIndexSize >
forwardIndexSize);
// Convert 'DestCityName' to SNAPPY compression
@@ -1377,9 +1377,9 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
updateTableConfig(tableConfig);
reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
columnIndexSize = getColumnIndexSize(column);
- assertFalse(columnIndexSize.has("dictionary"));
- assertTrue(columnIndexSize.has("forward_index"));
- double v4SnappyRawIndexSize =
columnIndexSize.get("forward_index").asDouble();
+ assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
+ assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
+ double v4SnappyRawIndexSize =
columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble();
assertTrue(v4SnappyRawIndexSize > v2rawIndexSize);
// Removing FieldConfig should be no-op because compression is not
explicitly set
@@ -1387,9 +1387,9 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
updateTableConfig(tableConfig);
reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
columnIndexSize = getColumnIndexSize(column);
- assertFalse(columnIndexSize.has("dictionary"));
- assertTrue(columnIndexSize.has("forward_index"));
- assertEquals(columnIndexSize.get("forward_index").asDouble(),
v4SnappyRawIndexSize);
+ assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
+ assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
+ assertEquals(columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(),
v4SnappyRawIndexSize);
// Adding 'LZ4' compression explicitly should trigger the conversion
forwardIndexConfig = new
ForwardIndexConfig.Builder().withCompressionCodec(CompressionCodec.LZ4).build();
@@ -1400,28 +1400,21 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
updateTableConfig(tableConfig);
reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
columnIndexSize = getColumnIndexSize(column);
- assertFalse(columnIndexSize.has("dictionary"));
- assertTrue(columnIndexSize.has("forward_index"));
- assertEquals(columnIndexSize.get("forward_index").asDouble(),
v2rawIndexSize);
+ assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
+ assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
+ assertEquals(columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(),
v2rawIndexSize);
resetForwardIndex(dictionarySize, forwardIndexSize);
}
- private JsonNode getColumnIndexSize(String column)
- throws Exception {
- return JsonUtils.stringToJsonNode(
-
sendGetRequest(_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(),
List.of(column))))
- .get("columnIndexSizeMap").get(column);
- }
-
private void resetForwardIndex(double expectedDictionarySize, double
expectedForwardIndexSize)
throws Exception {
TableConfig tableConfig = createOfflineTableConfig();
updateTableConfig(tableConfig);
reloadAllSegments(SELECT_STAR_QUERY, false, getCountStarResult());
JsonNode columnIndexSize = getColumnIndexSize("DestCityName");
- assertEquals(columnIndexSize.get("dictionary").asDouble(),
expectedDictionarySize);
- assertEquals(columnIndexSize.get("forward_index").asDouble(),
expectedForwardIndexSize);
+
assertEquals(columnIndexSize.get(StandardIndexes.DICTIONARY_ID).asDouble(),
expectedDictionarySize);
+ assertEquals(columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(),
expectedForwardIndexSize);
}
/**
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
index 2082f35622..65c69682f0 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
@@ -63,9 +63,6 @@ public class RealtimeSegmentConverter {
_segmentZKPropsConfig = segmentZKPropsConfig;
_outputPath = outputPath;
_columnIndicesForRealtimeTable = cdc;
- if (cdc.getSortedColumn() != null) {
-
_columnIndicesForRealtimeTable.getInvertedIndexColumns().remove(cdc.getSortedColumn());
- }
_dataSchema = getUpdatedSchema(schema);
_tableName = tableName;
_tableConfig = tableConfig;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 8d196eb645..5b3aeb26d5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -23,14 +23,16 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.spi.config.table.FieldConfig;
@@ -279,18 +281,29 @@ public class RealtimeSegmentConfig {
}
public Builder(IndexLoadingConfig indexLoadingConfig) {
- this(indexLoadingConfig.getFieldIndexConfigByColName());
+ this(indexLoadingConfig.getFieldIndexConfigByColName(),
indexLoadingConfig.getSortedColumns());
}
public Builder(TableConfig tableConfig, Schema schema) {
- this(FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig,
schema));
+ this(FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig,
schema),
+ tableConfig.getIndexingConfig().getSortedColumn());
}
- public Builder(Map<String, FieldIndexConfigs> indexConfigsByColName) {
- _indexConfigByCol = new
HashMap<>(HashUtil.getHashMapCapacity(indexConfigsByColName.size()));
+ public Builder(Map<String, FieldIndexConfigs> indexConfigsByColName,
@Nullable List<String> sortedColumns) {
+ _indexConfigByCol =
Maps.newHashMapWithExpectedSize(indexConfigsByColName.size());
for (Map.Entry<String, FieldIndexConfigs> entry :
indexConfigsByColName.entrySet()) {
_indexConfigByCol.put(entry.getKey(), new
FieldIndexConfigs.Builder(entry.getValue()));
}
+ // Add inverted index to sorted column for 2 reasons:
+ // 1. Since sorted index doesn't apply to mutable segment, add inverted
index to get better performance
+ // 2. When converting mutable segment to immutable segment, we use
sorted column's inverted index to accelerate
+ // the index creation
+ if (CollectionUtils.isNotEmpty(sortedColumns)) {
+ String sortedColumn = sortedColumns.get(0);
+ FieldIndexConfigs.Builder builder =
+ _indexConfigByCol.computeIfAbsent(sortedColumn, k -> new
FieldIndexConfigs.Builder());
+ builder.add(StandardIndexes.inverted(), new IndexConfig(false));
+ }
}
public Builder setTableNameWithType(String tableNameWithType) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index c24c286000..0684fe9097 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -19,7 +19,6 @@
package org.apache.pinot.segment.local.segment.index.loader;
import com.google.common.annotations.VisibleForTesting;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -462,13 +461,6 @@ public class IndexLoadingConfig {
_dirty = true;
}
- @Deprecated
- @VisibleForTesting
- public void addInvertedIndexColumns(String... invertedIndexColumns) {
- _invertedIndexColumns.addAll(Arrays.asList(invertedIndexColumns));
- _dirty = true;
- }
-
@Deprecated
@VisibleForTesting
public void addNoDictionaryColumns(Collection<String> noDictionaryColumns) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]