This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e4184fc3be Construct new IndexLoadingConfig when loading completed
realtime segments (#9938)
e4184fc3be is described below
commit e4184fc3be55a0dd7a9a622f24e356e2a7f8b992
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Wed Dec 7 18:23:53 2022 -0800
Construct new IndexLoadingConfig when loading completed realtime segments
(#9938)
* Construct new IndeexLoadingConfig when loading completed realtime segments
* Address review comments
---
.../manager/realtime/RealtimeTableDataManager.java | 9 ++++++--
.../tests/LLCRealtimeClusterIntegrationTest.java | 25 +++++++++++++++++++++-
.../segment/index/loader/IndexLoadingConfig.java | 11 ++++++++--
3 files changed, 40 insertions(+), 5 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 6d5bfe1de8..ee29271b15 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -64,6 +64,7 @@ import org.apache.pinot.segment.local.utils.SchemaUtils;
import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -543,9 +544,13 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to get table config
for table: {}", _tableNameWithType);
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
tableConfig);
- indexLoadingConfig.updateTableConfigAndSchema(tableConfig, schema);
+
+ // Construct a new indexLoadingConfig with the updated tableConfig and
schema.
+ InstanceDataManagerConfig instanceDataManagerConfig =
indexLoadingConfig.getInstanceDataManagerConfig();
+ IndexLoadingConfig newIndexLoadingConfig = new
IndexLoadingConfig(instanceDataManagerConfig, tableConfig, schema);
+
try {
- addSegment(indexDir, indexLoadingConfig);
+ addSegment(indexDir, newIndexLoadingConfig);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 69fab5c183..73fb128e73 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -231,7 +231,7 @@ public class LLCRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegr
}
@Test
- public void testInvertedIndexTriggering()
+ public void testAddRemoveInvertedIndex()
throws Exception {
long numTotalDocs = getCountStarResult();
@@ -240,7 +240,9 @@ public class LLCRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegr
assertTrue(queryResponse.get("numEntriesScannedInFilter").asLong() > 0L);
long result =
queryResponse.get("resultTable").get("rows").get(0).get(0).asLong();
+ // Add inverted index and check if numEntriesScannedInFilter is 0.
TableConfig tableConfig = getRealtimeTableConfig();
+ List<String> invertedIndexCols =
tableConfig.getIndexingConfig().getInvertedIndexColumns();
tableConfig.getIndexingConfig().setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS);
updateTableConfig(tableConfig);
reloadRealtimeTable(getTableName());
@@ -261,6 +263,27 @@ public class LLCRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegr
throw new RuntimeException(e);
}
}, 600_000L, "Failed to generate inverted index");
+
+ // Now, remove the inverted index and check if numEntriesScannedInFilter
matches totalDocs.
+ tableConfig = getRealtimeTableConfig();
+ tableConfig.getIndexingConfig().setInvertedIndexColumns(invertedIndexCols);
+ updateTableConfig(tableConfig);
+ reloadRealtimeTable(getTableName());
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ JsonNode queryResponse1 = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
+ // Query result and total docs should not change during reload
+
assertEquals(queryResponse1.get("resultTable").get("rows").get(0).get(0).asLong(),
result);
+ long numEntriesScannedInFilter =
queryResponse1.get("numEntriesScannedInFilter").asLong();
+ long totalDocs = queryResponse1.get("totalDocs").asLong();
+ assertEquals(totalDocs, numTotalDocs);
+
+ return numEntriesScannedInFilter == totalDocs;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, 600_000L, "Failed to remove inverted index");
}
@Test(expectedExceptions = IOException.class)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index 7ff6cf2e65..c4ccc3e7a7 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -57,6 +57,7 @@ public class IndexLoadingConfig {
private static final int DEFAULT_REALTIME_AVG_MULTI_VALUE_COUNT = 2;
public static final String READ_MODE_KEY = "readMode";
+ private InstanceDataManagerConfig _instanceDataManagerConfig = null;
private ReadMode _readMode = ReadMode.DEFAULT_MODE;
private List<String> _sortedColumns = Collections.emptyList();
private Set<String> _invertedIndexColumns = new HashSet<>();
@@ -106,7 +107,7 @@ public class IndexLoadingConfig {
public IndexLoadingConfig(InstanceDataManagerConfig
instanceDataManagerConfig, TableConfig tableConfig,
@Nullable Schema schema) {
extractFromInstanceConfig(instanceDataManagerConfig);
- updateTableConfigAndSchema(tableConfig, schema);
+ extractFromTableConfigAndSchema(tableConfig, schema);
}
@VisibleForTesting
@@ -118,7 +119,11 @@ public class IndexLoadingConfig {
public IndexLoadingConfig() {
}
- public void updateTableConfigAndSchema(TableConfig tableConfig, @Nullable
Schema schema) {
+ public InstanceDataManagerConfig getInstanceDataManagerConfig() {
+ return _instanceDataManagerConfig;
+ }
+
+ private void extractFromTableConfigAndSchema(TableConfig tableConfig,
@Nullable Schema schema) {
if (schema != null) {
TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
}
@@ -298,6 +303,8 @@ public class IndexLoadingConfig {
if (instanceDataManagerConfig == null) {
return;
}
+
+ _instanceDataManagerConfig = instanceDataManagerConfig;
_instanceId = instanceDataManagerConfig.getInstanceId();
ReadMode instanceReadMode = instanceDataManagerConfig.getReadMode();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]