Copilot commented on code in PR #17606:
URL: https://github.com/apache/pinot/pull/17606#discussion_r2764397572
##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DimensionTableConfig.java:
##########
@@ -26,12 +27,21 @@
public class DimensionTableConfig extends BaseJsonConfig {
private final boolean _disablePreload;
private final boolean _errorOnDuplicatePrimaryKey;
+ private final boolean _enableUpsert;
+
+ // Fields below are kept for backward compatibility with previous versions
+ // of DimensionTableConfig.
Review Comment:
This backward compatibility constructor is missing Javadoc documentation.
Add a comment explaining that this constructor exists for backward
compatibility with previous versions and delegates to the main constructor with
upsert disabled by default.
```suggestion
// of DimensionTableConfig.
/**
* Backward compatibility constructor that matches previous versions of
DimensionTableConfig.
* Delegates to the main constructor with upsert disabled by default.
*/
```
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java:
##########
@@ -222,38 +262,69 @@ private DimensionTable createFastLookupDimensionTable() {
int totalDocs = 0;
for (SegmentDataManager segmentManager : segmentDataManagers) {
IndexSegment indexSegment = segmentManager.getSegment();
- totalDocs += indexSegment.getSegmentMetadata().getTotalDocs();
+ MutableRoaringBitmap queryableDocIds =
getQueryableDocIdsSnapshot(indexSegment);
+ if (queryableDocIds != null) {
+ totalDocs += queryableDocIds.getCardinality();
+ } else {
+ totalDocs += indexSegment.getSegmentMetadata().getTotalDocs();
+ }
Review Comment:
This logic for computing totalDocs with queryable doc IDs is duplicated in
both `createFastLookupDimensionTable` and `createMemOptimisedDimensionTable`.
Extract this into a helper method like `getTotalDocCount(IndexSegment segment)`
to reduce duplication and improve maintainability.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -2132,6 +2138,89 @@ private void setupDimensionTable() throws Exception {
DIM_NUMBER_OF_RECORDS, 60_000);
}
+ private void setupDimensionUpsertTable() throws Exception {
+ Schema schema = new Schema.SchemaBuilder()
+ .setSchemaName(DIM_UPSERT_TABLE)
+ .addSingleValueDimension("id", FieldSpec.DataType.INT)
+ .addSingleValueDimension("name", FieldSpec.DataType.STRING)
+ .setPrimaryKeyColumns(Collections.singletonList("id"))
+ .build();
+ addSchema(schema);
+
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null,
"REFRESH", "DAILY", true));
+
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(DIM_UPSERT_TABLE)
+ .setIsDimTable(true)
+ .setDimensionTableConfig(new DimensionTableConfig(false, false, true))
+ .setIngestionConfig(ingestionConfig)
+ .build();
+ TenantConfig tenantConfig = new TenantConfig(getBrokerTenant(),
getServerTenant(), null);
+ tableConfig.setTenantConfig(tenantConfig);
+ addTableConfig(tableConfig);
+
+ File firstSegment = new File(_tempDir, "dimUpsert_segment_1.csv");
+ List<String> firstSegmentRows = List.of(
+ "id,name",
+ "1,old",
+ "2,keep");
+ FileUtils.writeLines(firstSegment, firstSegmentRows);
+ createAndUploadSegmentFromFileAndWaitForCountWithoutUpsert(tableConfig,
schema, firstSegment, FileFormat.CSV,
+ "segment_1", 2, 60_000);
Review Comment:
The method name `createAndUploadSegmentFromFileAndWaitForCountWithoutUpsert`
is unclear. The name suggests it waits for a count without upsert, but the
method actually waits for a count using the `skipUpsert=true` query option.
Consider renaming to `createAndUploadSegmentAndWaitForRawCount` to better
convey that it waits for the raw (non-deduplicated) document count.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java:
##########
@@ -397,4 +498,73 @@ public FieldSpec getColumnFieldSpec(String columnName) {
public List<String> getPrimaryKeyColumns() {
return _dimensionTable.get().getPrimaryKeyColumns();
}
+
+ private void applyQueryableDocIdsForRecordLocations(List<SegmentDataManager>
segmentDataManagers,
+ Object2ObjectOpenCustomHashMap<Object[], RecordLocation>
recordLocationMap) {
+ if (!_enableUpsert) {
+ return;
+ }
+ if (recordLocationMap.isEmpty() || segmentDataManagers.isEmpty()) {
+ return;
+ }
+ List<MutableRoaringBitmap> queryableDocIdsBySegment = new
ArrayList<>(segmentDataManagers.size());
+ for (int i = 0; i < segmentDataManagers.size(); i++) {
+ queryableDocIdsBySegment.add(new MutableRoaringBitmap());
+ }
+ for (RecordLocation recordLocation : recordLocationMap.values()) {
+
queryableDocIdsBySegment.get(recordLocation._segmentIndex).add(recordLocation._docId);
+ }
+ applyQueryableDocIdsToSegments(segmentDataManagers,
queryableDocIdsBySegment);
+ }
+
+ private void applyQueryableDocIdsForLookupTable(List<SegmentDataManager>
segmentDataManagers,
+ Object2LongOpenCustomHashMap<Object[]> lookupTable) {
+ if (!_enableUpsert) {
+ return;
+ }
+ if (lookupTable.isEmpty() || segmentDataManagers.isEmpty()) {
+ return;
+ }
+ List<MutableRoaringBitmap> queryableDocIdsBySegment = new
ArrayList<>(segmentDataManagers.size());
+ for (int i = 0; i < segmentDataManagers.size(); i++) {
+ queryableDocIdsBySegment.add(new MutableRoaringBitmap());
+ }
+ for (Object2LongOpenCustomHashMap.Entry<Object[]> entry :
lookupTable.object2LongEntrySet()) {
+ long readerIdxAndDocId = entry.getLongValue();
+ int readerIdx = (int) (readerIdxAndDocId >>> 32);
+ int docId = (int) readerIdxAndDocId;
+ queryableDocIdsBySegment.get(readerIdx).add(docId);
+ }
+ applyQueryableDocIdsToSegments(segmentDataManagers,
queryableDocIdsBySegment);
+ }
+
+ private void applyQueryableDocIdsToSegments(List<SegmentDataManager>
segmentDataManagers,
+ List<MutableRoaringBitmap> queryableDocIdsBySegment) {
+ for (int i = 0; i < segmentDataManagers.size(); i++) {
+ IndexSegment segment = segmentDataManagers.get(i).getSegment();
+ if (!(segment instanceof ImmutableSegmentImpl)) {
+ continue;
+ }
+ MutableRoaringBitmap queryableDocIds = queryableDocIdsBySegment.get(i);
+ ThreadSafeMutableRoaringBitmap existingQueryableDocIds =
segment.getQueryableDocIds();
+ if (existingQueryableDocIds != null) {
+ MutableRoaringBitmap existingSnapshot =
existingQueryableDocIds.getMutableRoaringBitmap();
Review Comment:
The intersection logic (AND operation) with existing queryable doc IDs lacks
explanation. Add a comment clarifying why this intersection is needed—for
example, to preserve any pre-existing filtering constraints that may have been
set on the segment.
```suggestion
MutableRoaringBitmap existingSnapshot =
existingQueryableDocIds.getMutableRoaringBitmap();
// Intersect with existing queryable doc IDs to honor any
pre-existing constraints on the segment
// (e.g. from prior upsert or filtering logic) instead of
overwriting them.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]