This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5ec7028e69 handle segments not tracked by partition mgr and add
skipUpsertView query option (#13415)
5ec7028e69 is described below
commit 5ec7028e69d861104f502f91b649661c312b56d3
Author: Xiaobing <[email protected]>
AuthorDate: Mon Jun 17 17:18:51 2024 -0700
handle segments not tracked by partition mgr and add skipUpsertView query
option (#13415)
* handle segments not tracked by partition mgr and add skipUpsertView query
option for easy debug
---
.../common/utils/config/QueryOptionsUtils.java | 4 +
.../apache/pinot/core/plan/FilterPlanNodeTest.java | 3 +-
.../java/org/apache/pinot/core/plan/TestUtils.java | 43 --------
...adataAndDictionaryAggregationPlanMakerTest.java | 4 +-
.../upsert/BasePartitionUpsertMetadataManager.java | 117 +++++++++++----------
.../upsert/BaseTableUpsertMetadataManager.java | 10 +-
.../ConcurrentMapTableUpsertMetadataManager.java | 21 +++-
.../pinot/segment/local/upsert/UpsertUtils.java | 11 ++
.../BasePartitionUpsertMetadataManagerTest.java | 79 +++++++++++++-
.../apache/pinot/segment/spi/SegmentContext.java | 5 +-
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
11 files changed, 191 insertions(+), 107 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index a7c45c45d3..12bc750679 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -139,6 +139,10 @@ public class QueryOptionsUtils {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UPSERT));
}
+ public static boolean isSkipUpsertView(Map<String, String> queryOptions) {
+ return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UPSERT_VIEW));
+ }
+
public static long getUpsertViewFreshnessMs(Map<String, String>
queryOptions) {
String freshnessMsString =
queryOptions.get(QueryOptionKey.UPSERT_VIEW_FRESHNESS_MS);
return freshnessMsString != null ? Long.parseLong(freshnessMsString) : -1;
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/plan/FilterPlanNodeTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/plan/FilterPlanNodeTest.java
index 2fe671fa29..4f86f8d85a 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/plan/FilterPlanNodeTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/plan/FilterPlanNodeTest.java
@@ -24,6 +24,7 @@ import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.operator.blocks.FilterBlock;
import org.apache.pinot.core.operator.filter.BaseFilterOperator;
import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.local.upsert.UpsertUtils;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;
@@ -71,7 +72,7 @@ public class FilterPlanNodeTest {
// Result should be invariant - always exactly 3 docs
for (int i = 0; i < 10_000; i++) {
SegmentContext segmentContext = new SegmentContext(segment);
-
segmentContext.setQueryableDocIdsSnapshot(TestUtils.getQueryableDocIdsSnapshotFromSegment(segment));
+
segmentContext.setQueryableDocIdsSnapshot(UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
assertEquals(getNumberOfFilteredDocs(segmentContext, queryContext), 3);
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/TestUtils.java
b/pinot-core/src/test/java/org/apache/pinot/core/plan/TestUtils.java
deleted file mode 100644
index 171489e5ed..0000000000
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/TestUtils.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.plan;
-
-import org.apache.pinot.segment.spi.IndexSegment;
-import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
-
-
-public class TestUtils {
- private TestUtils() {
- }
-
- public static MutableRoaringBitmap
getQueryableDocIdsSnapshotFromSegment(IndexSegment segment) {
- MutableRoaringBitmap queryableDocIdsSnapshot = null;
- ThreadSafeMutableRoaringBitmap queryableDocIds =
segment.getQueryableDocIds();
- if (queryableDocIds != null) {
- queryableDocIdsSnapshot = queryableDocIds.getMutableRoaringBitmap();
- } else {
- ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
- if (validDocIds != null) {
- queryableDocIdsSnapshot = validDocIds.getMutableRoaringBitmap();
- }
- }
- return queryableDocIdsSnapshot;
- }
-}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 6d932489e9..e1b9008799 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -32,7 +32,6 @@ import
org.apache.pinot.core.operator.query.FastFilteredCountOperator;
import org.apache.pinot.core.operator.query.GroupByOperator;
import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
-import org.apache.pinot.core.plan.TestUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
@@ -40,6 +39,7 @@ import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoa
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import
org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.UpsertContext;
+import org.apache.pinot.segment.local.upsert.UpsertUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -161,7 +161,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
assertTrue(operatorClass.isInstance(operator));
SegmentContext segmentContext = new SegmentContext(_upsertIndexSegment);
-
segmentContext.setQueryableDocIdsSnapshot(TestUtils.getQueryableDocIdsSnapshotFromSegment(_upsertIndexSegment));
+
segmentContext.setQueryableDocIdsSnapshot(UpsertUtils.getQueryableDocIdsSnapshotFromSegment(_upsertIndexSegment));
Operator<?> upsertOperator =
PLAN_MAKER.makeSegmentPlanNode(segmentContext, queryContext).run();
assertTrue(upsertOperatorClass.isInstance(upsertOperator));
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index d59d77b540..182e75129b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -104,7 +104,9 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
protected final ServerMetrics _serverMetrics;
protected final Logger _logger;
- // Tracks all the segments managed by this manager (excluding EmptySegment)
+ // Tracks all the segments managed by this manager, excluding EmptySegment
and segments out of metadata TTL.
+ // Basically, it's possible that some segments in the table partition are
not tracked here, as their upsert metadata
+ // is not managed by the manager currently.
protected final Set<IndexSegment> _trackedSegments =
ConcurrentHashMap.newKeySet();
// Track all the immutable segments where changes took place since last
snapshot was taken.
// Note: we need take to take _snapshotLock RLock while updating this set as
it may be updated by the multiple
@@ -1124,24 +1126,29 @@ public abstract class
BasePartitionUpsertMetadataManager implements PartitionUps
RecordInfo recordInfo) {
if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
_upsertViewLock.writeLock().lock();
+ try {
+ doRemoveDocId(oldSegment, oldDocId);
+ doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
+ } finally {
+ _upsertViewLock.writeLock().unlock();
+ }
} else if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
_upsertViewLock.readLock().lock();
- }
- try {
- doRemoveDocId(oldSegment, oldDocId);
- doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
- } finally {
- if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
- _upsertViewLock.writeLock().unlock();
- } else if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
+ try {
+ doRemoveDocId(oldSegment, oldDocId);
+ doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
_updatedSegmentsSinceLastRefresh.add(newSegment);
_updatedSegmentsSinceLastRefresh.add(oldSegment);
+ } finally {
_upsertViewLock.readLock().unlock();
// Batch refresh takes WLock. Do it outside RLock for clarity. The R/W
lock ensures that only one thread
// can refresh the bitmaps. The other threads that are about to update
the bitmaps will be blocked until
// refreshing is done.
doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
}
+ } else {
+ doRemoveDocId(oldSegment, oldDocId);
+ doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
}
trackUpdatedSegmentsSinceLastSnapshot(oldSegment);
}
@@ -1154,21 +1161,14 @@ public abstract class
BasePartitionUpsertMetadataManager implements PartitionUps
*/
protected void replaceDocId(IndexSegment segment,
ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId,
int newDocId, RecordInfo recordInfo) {
- if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
+ if (_consistencyMode != UpsertConfig.ConsistencyMode.SNAPSHOT) {
+ doReplaceDocId(validDocIds, queryableDocIds, oldDocId, newDocId,
recordInfo);
+ } else {
_upsertViewLock.readLock().lock();
- }
- try {
- validDocIds.replace(oldDocId, newDocId);
- if (queryableDocIds != null) {
- if (recordInfo.isDeleteRecord()) {
- queryableDocIds.remove(oldDocId);
- } else {
- queryableDocIds.replace(oldDocId, newDocId);
- }
- }
- } finally {
- if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
+ try {
+ doReplaceDocId(validDocIds, queryableDocIds, oldDocId, newDocId,
recordInfo);
_updatedSegmentsSinceLastRefresh.add(segment);
+ } finally {
_upsertViewLock.readLock().unlock();
// Batch refresh takes WLock. Do it outside RLock for clarity.
doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
@@ -1176,16 +1176,28 @@ public abstract class
BasePartitionUpsertMetadataManager implements PartitionUps
}
}
+ private void doReplaceDocId(ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId,
int newDocId, RecordInfo recordInfo) {
+ validDocIds.replace(oldDocId, newDocId);
+ if (queryableDocIds != null) {
+ if (recordInfo.isDeleteRecord()) {
+ queryableDocIds.remove(oldDocId);
+ } else {
+ queryableDocIds.replace(oldDocId, newDocId);
+ }
+ }
+ }
+
protected void addDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap
validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId,
RecordInfo recordInfo) {
- if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
- _upsertViewLock.readLock().lock();
- }
- try {
+ if (_consistencyMode != UpsertConfig.ConsistencyMode.SNAPSHOT) {
doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
- } finally {
- if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
+ } else {
+ _upsertViewLock.readLock().lock();
+ try {
+ doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
_updatedSegmentsSinceLastRefresh.add(segment);
+ } finally {
_upsertViewLock.readLock().unlock();
// Batch refresh takes WLock. Do it outside RLock for clarity.
doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
@@ -1193,8 +1205,8 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
}
- private void doAddDocId(ThreadSafeMutableRoaringBitmap validDocIds,
ThreadSafeMutableRoaringBitmap queryableDocIds,
- int docId, RecordInfo recordInfo) {
+ private void doAddDocId(ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId,
RecordInfo recordInfo) {
validDocIds.add(docId);
if (queryableDocIds != null && !recordInfo.isDeleteRecord()) {
queryableDocIds.add(docId);
@@ -1202,14 +1214,14 @@ public abstract class
BasePartitionUpsertMetadataManager implements PartitionUps
}
protected void removeDocId(IndexSegment segment, int docId) {
- if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
- _upsertViewLock.readLock().lock();
- }
- try {
+ if (_consistencyMode != UpsertConfig.ConsistencyMode.SNAPSHOT) {
doRemoveDocId(segment, docId);
- } finally {
- if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
+ } else {
+ _upsertViewLock.readLock().lock();
+ try {
+ doRemoveDocId(segment, docId);
_updatedSegmentsSinceLastRefresh.add(segment);
+ } finally {
_upsertViewLock.readLock().unlock();
// Batch refresh takes WLock. Do it outside RLock for clarity.
doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
@@ -1276,25 +1288,11 @@ public abstract class
BasePartitionUpsertMetadataManager implements PartitionUps
}
}
- private static MutableRoaringBitmap
getQueryableDocIdsSnapshotFromSegment(IndexSegment segment) {
- MutableRoaringBitmap queryableDocIdsSnapshot = null;
- ThreadSafeMutableRoaringBitmap queryableDocIds =
segment.getQueryableDocIds();
- if (queryableDocIds != null) {
- queryableDocIdsSnapshot = queryableDocIds.getMutableRoaringBitmap();
- } else {
- ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
- if (validDocIds != null) {
- queryableDocIdsSnapshot = validDocIds.getMutableRoaringBitmap();
- }
- }
- return queryableDocIdsSnapshot;
- }
-
private void setSegmentContexts(List<SegmentContext> segmentContexts) {
for (SegmentContext segmentContext : segmentContexts) {
IndexSegment segment = segmentContext.getIndexSegment();
if (_trackedSegments.contains(segment)) {
-
segmentContext.setQueryableDocIdsSnapshot(getQueryableDocIdsSnapshotFromSegment(segment));
+
segmentContext.setQueryableDocIdsSnapshot(UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
}
}
}
@@ -1320,8 +1318,11 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
Map<IndexSegment, MutableRoaringBitmap> updated = new HashMap<>();
for (IndexSegment segment : _trackedSegments) {
- if (current == null ||
_updatedSegmentsSinceLastRefresh.contains(segment)) {
- updated.put(segment, getQueryableDocIdsSnapshotFromSegment(segment));
+ // Update bitmap for segment updated since last refresh or not in the
view yet. This also handles segments
+ // that are tracked by _trackedSegments but not by
_updatedSegmentsSinceLastRefresh, like those didn't update
+ // any bitmaps as their docs simply lost all the upsert comparisons
with the existing docs.
+ if (current == null || current.get(segment) == null ||
_updatedSegmentsSinceLastRefresh.contains(segment)) {
+ updated.put(segment,
UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
} else {
updated.put(segment, current.get(segment));
}
@@ -1335,6 +1336,16 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
}
+ @VisibleForTesting
+ Map<IndexSegment, MutableRoaringBitmap> getSegmentQueryableDocIdsMap() {
+ return _segmentQueryableDocIdsMap;
+ }
+
+ @VisibleForTesting
+ Set<IndexSegment> getUpdatedSegmentsSinceLastRefresh() {
+ return _updatedSegmentsSinceLastRefresh;
+ }
+
protected void doClose()
throws IOException {
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index ea5aba5d21..2bdcc798f9 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -39,6 +39,7 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
protected String _tableNameWithType;
protected UpsertContext _context;
+ protected UpsertConfig.ConsistencyMode _consistencyMode;
@Override
public void init(TableConfig tableConfig, Schema schema, TableDataManager
tableDataManager) {
@@ -68,14 +69,17 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
boolean enablePreload = upsertConfig.isEnablePreload();
double metadataTTL = upsertConfig.getMetadataTTL();
double deletedKeysTTL = upsertConfig.getDeletedKeysTTL();
- UpsertConfig.ConsistencyMode consistencyMode =
upsertConfig.getConsistencyMode();
+ _consistencyMode = upsertConfig.getConsistencyMode();
+ if (_consistencyMode == null) {
+ _consistencyMode = UpsertConfig.ConsistencyMode.NONE;
+ }
long upsertViewRefreshIntervalMs =
upsertConfig.getUpsertViewRefreshIntervalMs();
File tableIndexDir = tableDataManager.getTableDataDir();
_context = new
UpsertContext.Builder().setTableConfig(tableConfig).setSchema(schema)
.setPrimaryKeyColumns(primaryKeyColumns).setComparisonColumns(comparisonColumns)
.setDeleteRecordColumn(deleteRecordColumn).setHashFunction(hashFunction)
.setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot).setEnablePreload(enablePreload)
-
.setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setConsistencyMode(consistencyMode)
+
.setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setConsistencyMode(_consistencyMode)
.setUpsertViewRefreshIntervalMs(upsertViewRefreshIntervalMs).setTableIndexDir(tableIndexDir)
.setTableDataManager(tableDataManager).build();
LOGGER.info(
@@ -84,7 +88,7 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
+ " deleted Keys TTL: {}, consistency mode: {}, upsert view
refresh interval: {}ms, table index dir: {}",
getClass().getSimpleName(), _tableNameWithType, primaryKeyColumns,
comparisonColumns, deleteRecordColumn,
hashFunction, upsertConfig.getMode(), enableSnapshot, enablePreload,
metadataTTL, deletedKeysTTL,
- consistencyMode, upsertViewRefreshIntervalMs, tableIndexDir);
+ _consistencyMode, upsertViewRefreshIntervalMs, tableIndexDir);
initCustomVariables();
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 4c2f995df8..7b216acfc3 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -24,7 +24,10 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.spi.config.table.UpsertConfig;
/**
@@ -59,9 +62,21 @@ public class ConcurrentMapTableUpsertMetadataManager extends
BaseTableUpsertMeta
@Override
public void setSegmentContexts(List<SegmentContext> segmentContexts,
Map<String, String> queryOptions) {
- _partitionMetadataManagerMap.forEach(
- (partitionID, upsertMetadataManager) ->
upsertMetadataManager.setSegmentContexts(segmentContexts,
- queryOptions));
+ if (_consistencyMode != UpsertConfig.ConsistencyMode.NONE &&
!QueryOptionsUtils.isSkipUpsertView(queryOptions)) {
+ // Get queryableDocIds bitmaps from partitionMetadataManagers if any
consistency mode is used.
+ _partitionMetadataManagerMap.forEach(
+ (partitionID, upsertMetadataManager) ->
upsertMetadataManager.setSegmentContexts(segmentContexts,
+ queryOptions));
+ }
+ // If no consistency mode is used, we get queryableDocIds bitmaps as kept
by the segment objects directly.
+ // Even if consistency mode is used, we should still check if any segment
doesn't get its validDocIds bitmap,
+ // because partitionMetadataManagers may not track all segments of the
table, like those out of the metadata TTL.
+ for (SegmentContext segmentContext : segmentContexts) {
+ if (segmentContext.getQueryableDocIdsSnapshot() == null) {
+ IndexSegment segment = segmentContext.getIndexSegment();
+
segmentContext.setQueryableDocIdsSnapshot(UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
+ }
+ }
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index 8dd1d8c475..b29daab533 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -26,6 +26,7 @@ import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.spi.IndexSegment;
+import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.ByteArray;
@@ -38,6 +39,16 @@ public class UpsertUtils {
private UpsertUtils() {
}
+ @Nullable
+ public static MutableRoaringBitmap
getQueryableDocIdsSnapshotFromSegment(IndexSegment segment) {
+ ThreadSafeMutableRoaringBitmap queryableDocIds =
segment.getQueryableDocIds();
+ if (queryableDocIds != null) {
+ return queryableDocIds.getMutableRoaringBitmap();
+ }
+ ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
+ return validDocIds != null ? validDocIds.getMutableRoaringBitmap() : null;
+ }
+
/**
* Returns an iterator of {@link RecordInfo} for all the documents from the
segment.
*/
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index 3a6984829e..24b85d1121 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -412,7 +412,7 @@ public class BasePartitionUpsertMetadataManagerTest {
} finally {
executor.shutdownNow();
}
-
+ assertEquals(segmentContexts.size(), 3);
for (SegmentContext sc : segmentContexts) {
ThreadSafeMutableRoaringBitmap validDocIds =
segmentQueryableDocIdsMap.get(sc.getIndexSegment());
assertNotNull(validDocIds);
@@ -493,10 +493,14 @@ public class BasePartitionUpsertMetadataManagerTest {
} finally {
executor.shutdownNow();
}
+ assertEquals(segmentContexts.size(), 3);
+ assertEquals(upsertMetadataManager.getSegmentQueryableDocIdsMap().size(),
3);
+
assertTrue(upsertMetadataManager.getUpdatedSegmentsSinceLastRefresh().isEmpty());
// Get the upsert view again, and the existing bitmap objects should be
set in segment contexts.
// The segmentContexts initialized above holds the same bitmaps objects as
from the upsert view.
List<SegmentContext> reuseSegmentContexts = new ArrayList<>();
+ segmentQueryableDocIdsMap.forEach((k, v) -> reuseSegmentContexts.add(new
SegmentContext(k)));
upsertMetadataManager.setSegmentContexts(reuseSegmentContexts, new
HashMap<>());
for (SegmentContext reuseSC : reuseSegmentContexts) {
for (SegmentContext sc : segmentContexts) {
@@ -520,7 +524,80 @@ public class BasePartitionUpsertMetadataManagerTest {
}
// Force refresh the upsert view when getting it, so different bitmap
objects should be set in segment contexts.
+
upsertMetadataManager.getUpdatedSegmentsSinceLastRefresh().addAll(Arrays.asList(seg01,
seg02, seg03));
+ List<SegmentContext> refreshSegmentContexts = new ArrayList<>();
+ segmentQueryableDocIdsMap.forEach((k, v) -> refreshSegmentContexts.add(new
SegmentContext(k)));
+ Map<String, String> queryOptions = new HashMap<>();
+ queryOptions.put("upsertViewFreshnessMs", "0");
+ upsertMetadataManager.setSegmentContexts(refreshSegmentContexts,
queryOptions);
+ for (SegmentContext refreshSC : refreshSegmentContexts) {
+ for (SegmentContext sc : segmentContexts) {
+ if (refreshSC.getIndexSegment() == sc.getIndexSegment()) {
+ assertNotSame(refreshSC.getQueryableDocIdsSnapshot(),
sc.getQueryableDocIdsSnapshot());
+ }
+ }
+ ThreadSafeMutableRoaringBitmap validDocIds =
segmentQueryableDocIdsMap.get(refreshSC.getIndexSegment());
+ assertNotNull(validDocIds);
+ // The upsert view holds a clone of the original queryableDocIds held by
the segment object.
+ assertNotSame(refreshSC.getQueryableDocIdsSnapshot(),
validDocIds.getMutableRoaringBitmap());
+ assertEquals(refreshSC.getQueryableDocIdsSnapshot(),
validDocIds.getMutableRoaringBitmap());
+ // docId=0 in seg01 got invalidated.
+ if (refreshSC.getIndexSegment() == seg01) {
+ assertFalse(refreshSC.getQueryableDocIdsSnapshot().contains(0));
+ }
+ // docId=12 in seg03 was newly added.
+ if (refreshSC.getIndexSegment() == seg03) {
+ assertTrue(refreshSC.getQueryableDocIdsSnapshot().contains(12));
+ }
+ }
+ }
+
+ @Test
+ public void testConsistencyModeSnapshotWithUntrackedSegments()
+ throws Exception {
+ UpsertContext upsertContext = mock(UpsertContext.class);
+
when(upsertContext.getConsistencyMode()).thenReturn(UpsertConfig.ConsistencyMode.SNAPSHOT);
+ when(upsertContext.getUpsertViewRefreshIntervalMs()).thenReturn(3000L);
+ DummyPartitionUpsertMetadataManager upsertMetadataManager =
+ new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Map<IndexSegment, ThreadSafeMutableRoaringBitmap>
segmentQueryableDocIdsMap = new HashMap<>();
+ IndexSegment seg01 = mock(IndexSegment.class);
+ ThreadSafeMutableRoaringBitmap validDocIds01 =
createThreadSafeMutableRoaringBitmap(10);
+ AtomicBoolean called = new AtomicBoolean(false);
+ when(seg01.getValidDocIds()).thenReturn(validDocIds01);
+ upsertMetadataManager.trackSegment(seg01);
+ segmentQueryableDocIdsMap.put(seg01, validDocIds01);
+
+ IndexSegment seg02 = mock(IndexSegment.class);
+ ThreadSafeMutableRoaringBitmap validDocIds02 =
createThreadSafeMutableRoaringBitmap(11);
+ when(seg02.getValidDocIds()).thenReturn(validDocIds02);
+ upsertMetadataManager.trackSegment(seg02);
+ segmentQueryableDocIdsMap.put(seg02, validDocIds02);
+
+ IndexSegment seg03 = mock(IndexSegment.class);
+ ThreadSafeMutableRoaringBitmap validDocIds03 =
createThreadSafeMutableRoaringBitmap(12);
+ when(seg03.getValidDocIds()).thenReturn(validDocIds03);
+ upsertMetadataManager.trackSegment(seg03);
+ segmentQueryableDocIdsMap.put(seg03, validDocIds03);
+
+ RecordInfo recordInfo = new RecordInfo(null, 5, null, false);
+ upsertMetadataManager.replaceDocId(seg03, validDocIds03, null, seg01, 0,
12, recordInfo);
+
+ List<SegmentContext> segmentContexts = new ArrayList<>();
+ segmentQueryableDocIdsMap.forEach((k, v) -> segmentContexts.add(new
SegmentContext(k)));
+ upsertMetadataManager.setSegmentContexts(segmentContexts, new HashMap<>());
+ assertEquals(segmentContexts.size(), 3);
+ assertEquals(upsertMetadataManager.getSegmentQueryableDocIdsMap().size(),
3);
+
+ // We can force to refresh the upsert view by clearing up the current
upsert view, even though there are no updated
+ // segments tracked in _updatedSegmentsSinceLastRefresh.
+
assertTrue(upsertMetadataManager.getUpdatedSegmentsSinceLastRefresh().isEmpty());
+ upsertMetadataManager.getSegmentQueryableDocIdsMap().clear();
+
List<SegmentContext> refreshSegmentContexts = new ArrayList<>();
+ segmentQueryableDocIdsMap.forEach((k, v) -> refreshSegmentContexts.add(new
SegmentContext(k)));
Map<String, String> queryOptions = new HashMap<>();
queryOptions.put("upsertViewFreshnessMs", "0");
upsertMetadataManager.setSegmentContexts(refreshSegmentContexts,
queryOptions);
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentContext.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentContext.java
index 743a988769..f2c3b8f837 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentContext.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentContext.java
@@ -18,11 +18,13 @@
*/
package org.apache.pinot.segment.spi;
+import javax.annotation.Nullable;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
public class SegmentContext {
private final IndexSegment _indexSegment;
+ @Nullable
private MutableRoaringBitmap _queryableDocIdsSnapshot = null;
public SegmentContext(IndexSegment indexSegment) {
@@ -33,11 +35,12 @@ public class SegmentContext {
return _indexSegment;
}
+ @Nullable
public MutableRoaringBitmap getQueryableDocIdsSnapshot() {
return _queryableDocIdsSnapshot;
}
- public void setQueryableDocIdsSnapshot(MutableRoaringBitmap
queryableDocIdsSnapshot) {
+ public void setQueryableDocIdsSnapshot(@Nullable MutableRoaringBitmap
queryableDocIdsSnapshot) {
_queryableDocIdsSnapshot = queryableDocIdsSnapshot;
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 8b85f861e5..e268a508af 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -352,6 +352,7 @@ public class CommonConstants {
public static class QueryOptionKey {
public static final String TIMEOUT_MS = "timeoutMs";
public static final String SKIP_UPSERT = "skipUpsert";
+ public static final String SKIP_UPSERT_VIEW = "skipUpsertView";
public static final String UPSERT_VIEW_FRESHNESS_MS =
"upsertViewFreshnessMs";
public static final String USE_STAR_TREE = "useStarTree";
public static final String SCAN_STAR_TREE_NODES = "scanStarTreeNodes";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]