This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b241d731b7 Track segments consistently for consistent upsert view
(#13677)
b241d731b7 is described below
commit b241d731b7aec1f0db2bcef86394641bae927372
Author: Xiaobing <[email protected]>
AuthorDate: Fri Aug 16 13:58:25 2024 -0700
Track segments consistently for consistent upsert view (#13677)
* track segments consistently for consistent upsert view and
* add util class UpsertViewManager
* fix a sutble race condition for SNAPSHOT mode
---
.../core/data/manager/DuoSegmentDataManager.java | 118 ++++++++
.../realtime/RealtimeSegmentDataManager.java | 1 +
.../manager/realtime/RealtimeTableDataManager.java | 60 +++-
.../query/executor/ServerQueryExecutorV1Impl.java | 121 ++++++--
.../data/manager/DuoSegmentDataManagerTest.java | 128 ++++++++
.../local/data/manager/SegmentDataManager.java | 12 +-
.../immutable/ImmutableSegmentImpl.java | 3 +
.../indexsegment/mutable/MutableSegmentImpl.java | 49 ++-
.../local/realtime/impl/RealtimeSegmentConfig.java | 32 +-
.../upsert/BasePartitionUpsertMetadataManager.java | 245 +++------------
.../upsert/BaseTableUpsertMetadataManager.java | 5 +
...oncurrentMapPartitionUpsertMetadataManager.java | 11 +-
...nUpsertMetadataManagerForConsistentDeletes.java | 11 +-
.../ConcurrentMapTableUpsertMetadataManager.java | 61 +++-
.../upsert/PartitionUpsertMetadataManager.java | 13 +
.../local/upsert/TableUpsertMetadataManager.java | 14 +
.../pinot/segment/local/upsert/UpsertUtils.java | 39 ++-
.../segment/local/upsert/UpsertViewManager.java | 327 +++++++++++++++++++++
.../BasePartitionUpsertMetadataManagerTest.java | 69 +++--
.../local/upsert/UpsertViewManagerTest.java | 59 ++++
.../apache/pinot/spi/config/table/TableConfig.java | 5 +
21 files changed, 1070 insertions(+), 313 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/DuoSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/DuoSegmentDataManager.java
new file mode 100644
index 0000000000..fb22e78655
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/DuoSegmentDataManager.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * Segment data manager tracking two segments associated with one segment
name, e.g. when committing a mutable
+ * segment, a new immutable segment is created to replace the mutable one, and
the two segments are having same name.
+ * By tracked both with this segment data manager, we can provide queries both
segments for complete data view.
+ * The primary segment represents all segments tracked by this manager for
places asking for segment metadata.
+ */
+public class DuoSegmentDataManager extends SegmentDataManager {
+ private final SegmentDataManager _primary;
+ private final List<SegmentDataManager> _segmentDataManagers;
+
+ public DuoSegmentDataManager(SegmentDataManager primary, SegmentDataManager
secondary) {
+ _primary = primary;
+ _segmentDataManagers = Arrays.asList(_primary, secondary);
+ }
+
+ @Override
+ public long getLoadTimeMs() {
+ return _primary.getLoadTimeMs();
+ }
+
+ @Override
+ public synchronized int getReferenceCount() {
+ return _primary.getReferenceCount();
+ }
+
+ @Override
+ public String getSegmentName() {
+ return _primary.getSegmentName();
+ }
+
+ @Override
+ public IndexSegment getSegment() {
+ return _primary.getSegment();
+ }
+
+ @Override
+ public synchronized boolean increaseReferenceCount() {
+ boolean any = false;
+ for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
+ if (segmentDataManager.increaseReferenceCount()) {
+ any = true;
+ }
+ }
+ return any;
+ }
+
+ @Override
+ public synchronized boolean decreaseReferenceCount() {
+ boolean any = false;
+ for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
+ if (segmentDataManager.decreaseReferenceCount()) {
+ any = true;
+ }
+ }
+ return any;
+ }
+
+ @Override
+ public boolean hasMultiSegments() {
+ return true;
+ }
+
+ @Override
+ public List<IndexSegment> getSegments() {
+ List<IndexSegment> segments = new ArrayList<>(_segmentDataManagers.size());
+ for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
+ if (segmentDataManager.getReferenceCount() > 0) {
+ segments.add(segmentDataManager.getSegment());
+ }
+ }
+ return segments;
+ }
+
+ @Override
+ public void doOffload() {
+ for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
+ if (segmentDataManager.getReferenceCount() == 0) {
+ segmentDataManager.offload();
+ }
+ }
+ }
+
+ @Override
+ protected void doDestroy() {
+ for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
+ if (segmentDataManager.getReferenceCount() == 0) {
+ segmentDataManager.destroy();
+ }
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index b04bf4a36e..db9c63770a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1565,6 +1565,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
.setIngestionAggregationConfigs(IngestionConfigUtils.getAggregationConfigs(tableConfig))
.setNullHandlingEnabled(_nullHandlingEnabled)
.setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode())
+ .setUpsertConsistencyMode(tableConfig.getUpsertConsistencyMode())
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
.setUpsertComparisonColumns(tableConfig.getUpsertComparisonColumns())
.setUpsertDeleteRecordColumn(tableConfig.getUpsertDeleteRecordColumn())
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 8a99610c75..aa4e061204 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -49,6 +49,7 @@ import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.data.manager.BaseTableDataManager;
+import org.apache.pinot.core.data.manager.DuoSegmentDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
@@ -98,7 +99,6 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// The semaphores will stay in the hash map even if the consuming partitions
move to a different host.
// We expect that there will be a small number of semaphores, but that may
be ok.
private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new
ConcurrentHashMap<>();
-
// The old name of the stats file used to be stats.ser which we changed when
we moved all packages
// from com.linkedin to org.apache because of not being able to deserialize
the old files using the newer classes
private static final String STATS_FILE_NAME = "segment-stats.ser";
@@ -507,9 +507,9 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
new RealtimeSegmentDataManager(zkMetadata, tableConfig, this,
_indexDir.getAbsolutePath(), indexLoadingConfig,
schema, llcSegmentName, semaphore, _serverMetrics,
partitionUpsertMetadataManager,
partitionDedupMetadataManager, _isTableReadyToConsumeData);
+ registerSegment(segmentName, realtimeSegmentDataManager,
partitionUpsertMetadataManager);
realtimeSegmentDataManager.startConsumption();
_serverMetrics.addValueToTableGauge(_tableNameWithType,
ServerGauge.SEGMENT_COUNT, 1);
- registerSegment(segmentName, realtimeSegmentDataManager);
_logger.info("Added new CONSUMING segment: {}", segmentName);
}
@@ -603,7 +603,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// Preloading segment is ensured to be handled by a single thread, so no
need to take the segment upsert lock.
// Besides, preloading happens before the table partition is made ready
for any queries.
partitionUpsertMetadataManager.preloadSegment(immutableSegment);
- registerSegment(segmentName, newSegmentManager);
+ registerSegment(segmentName, newSegmentManager,
partitionUpsertMetadataManager);
_logger.info("Preloaded immutable segment: {} with upsert enabled",
segmentName);
return;
}
@@ -614,24 +614,52 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// segments may be invalidated, making the queries see less valid docs
than expected. We should let query
// access the new segment asap even though its validDocId bitmap is
still being filled by
// partitionUpsertMetadataManager.
- registerSegment(segmentName, newSegmentManager);
+ registerSegment(segmentName, newSegmentManager,
partitionUpsertMetadataManager);
partitionUpsertMetadataManager.addSegment(immutableSegment);
_logger.info("Added new immutable segment: {} with upsert enabled",
segmentName);
} else {
- // When replacing a segment, we should register the new segment 'after'
it is fully initialized by
- // partitionUpsertMetadataManager to fill up its validDocId bitmap.
Otherwise, the queries will lose the access
- // to the valid docs in the old segment immediately, but the validDocId
bitmap of the new segment is still
- // being filled by partitionUpsertMetadataManager, making the queries
see less valid docs than expected.
- // When replacing a segment, the new and old segments are assumed to
have same set of valid docs for data
- // consistency, otherwise the new segment should be named differently to
go through the addSegment flow above.
- IndexSegment oldSegment = oldSegmentManager.getSegment();
+ replaceUpsertSegment(segmentName, oldSegmentManager, newSegmentManager,
partitionUpsertMetadataManager);
+ }
+ }
+
+ private void replaceUpsertSegment(String segmentName, SegmentDataManager
oldSegmentManager,
+ ImmutableSegmentDataManager newSegmentManager,
PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
+ // When replacing a segment, we should register the new segment 'after' it
is fully initialized by
+ // partitionUpsertMetadataManager to fill up its validDocId bitmap.
Otherwise, the queries will lose the access
+ // to the valid docs in the old segment immediately, but the validDocId
bitmap of the new segment is still
+ // being filled by partitionUpsertMetadataManager, making the queries see
less valid docs than expected.
+ IndexSegment oldSegment = oldSegmentManager.getSegment();
+ ImmutableSegment immutableSegment = newSegmentManager.getSegment();
+ UpsertConfig.ConsistencyMode consistencyMode =
_tableUpsertMetadataManager.getUpsertConsistencyMode();
+ if (consistencyMode == UpsertConfig.ConsistencyMode.NONE) {
partitionUpsertMetadataManager.replaceSegment(immutableSegment,
oldSegment);
- registerSegment(segmentName, newSegmentManager);
- _logger.info("Replaced {} segment: {} with upsert enabled",
- oldSegment instanceof ImmutableSegment ? "immutable" : "mutable",
segmentName);
- oldSegmentManager.offload();
- releaseSegment(oldSegmentManager);
+ registerSegment(segmentName, newSegmentManager,
partitionUpsertMetadataManager);
+ } else {
+ // By default, when replacing a segment, the old segment is kept intact
and visible to query until the new
+ // segment is registered as in the if-branch above. But the newly
ingested records will invalidate valid
+ // docs in the new segment as the upsert metadata gets updated during
replacement, so the query will miss the
+ // new updates in the new segment, until it's registered after the
replacement is done.
+ // For consistent data view, we make both old and new segment visible to
the query and update both in place
+ // when segment replacement and new data ingestion are happening in
parallel.
+ SegmentDataManager duoSegmentDataManager = new
DuoSegmentDataManager(newSegmentManager, oldSegmentManager);
+ registerSegment(segmentName, duoSegmentDataManager,
partitionUpsertMetadataManager);
+ partitionUpsertMetadataManager.replaceSegment(immutableSegment,
oldSegment);
+ registerSegment(segmentName, newSegmentManager,
partitionUpsertMetadataManager);
+ }
+ _logger.info("Replaced {} segment: {} with upsert enabled and consistency
mode: {}",
+ oldSegment instanceof ImmutableSegment ? "immutable" : "mutable",
segmentName, consistencyMode);
+ oldSegmentManager.offload();
+ releaseSegment(oldSegmentManager);
+ }
+
+ private void registerSegment(String segmentName, SegmentDataManager
segmentDataManager,
+ @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager)
{
+ if (partitionUpsertMetadataManager != null) {
+ // Register segment to the upsert metadata manager before registering it
to table manager, so that the upsert
+ // metadata manger can update the upsert view before the segment becomes
visible to queries.
+
partitionUpsertMetadataManager.trackSegmentForUpsertView(segmentDataManager.getSegment());
}
+ registerSegment(segmentName, segmentDataManager);
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 8c2906db54..7253a8abfc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -67,12 +68,14 @@ import org.apache.pinot.core.query.utils.idset.IdSet;
import org.apache.pinot.core.util.trace.TraceContext;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.QueryCancelledException;
@@ -203,18 +206,63 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
List<String> optionalSegments = queryRequest.getOptionalSegments();
List<String> notAcquiredSegments = new ArrayList<>();
- List<SegmentDataManager> segmentDataManagers =
- tableDataManager.acquireSegments(segmentsToQuery, optionalSegments,
notAcquiredSegments);
- int numSegmentsAcquired = segmentDataManagers.size();
+ int numSegmentsAcquired;
+ List<SegmentDataManager> segmentDataManagers;
+ List<IndexSegment> indexSegments;
+ Map<IndexSegment, SegmentContext> providedSegmentContexts = null;
+ if (!isUpsertTableUsingConsistencyMode(tableDataManager)) {
+ segmentDataManagers = tableDataManager.acquireSegments(segmentsToQuery,
optionalSegments, notAcquiredSegments);
+ numSegmentsAcquired = segmentDataManagers.size();
+ indexSegments = new ArrayList<>(numSegmentsAcquired);
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ indexSegments.add(segmentDataManager.getSegment());
+ }
+ } else {
+ RealtimeTableDataManager rtdm = (RealtimeTableDataManager)
tableDataManager;
+ TableUpsertMetadataManager tumm = rtdm.getTableUpsertMetadataManager();
+ tumm.lockForSegmentContexts();
+ try {
+ // Server can start consuming segment before broker can update its
routing table upon IdealState changes, so
+ // broker can miss the new consuming segment even with the previous
fix #11978. For a complete upsert data view,
+ // the consuming segment should be acquired all the time speculatively
if it's not included by the broker.
+ Set<String> allSegmentsToQuery = new HashSet<>(segmentsToQuery);
+ if (optionalSegments == null) {
+ optionalSegments = new ArrayList<>();
+ } else {
+ allSegmentsToQuery.addAll(optionalSegments);
+ }
+ for (String segmentName : tumm.getOptionalSegments()) {
+ if (!allSegmentsToQuery.contains(segmentName)) {
+ optionalSegments.add(segmentName);
+ }
+ }
+ segmentDataManagers =
tableDataManager.acquireSegments(segmentsToQuery, optionalSegments,
notAcquiredSegments);
+ numSegmentsAcquired = segmentDataManagers.size();
+ indexSegments = new ArrayList<>(numSegmentsAcquired);
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ // When using consistency mode, a segment data manager may track two
segments with same name, and they both
+ // contain part of the valid docs for the segment.
+ if (segmentDataManager.hasMultiSegments()) {
+ indexSegments.addAll(segmentDataManager.getSegments());
+ } else {
+ indexSegments.add(segmentDataManager.getSegment());
+ }
+ }
+ List<SegmentContext> segmentContexts =
+ tableDataManager.getSegmentContexts(indexSegments,
queryContext.getQueryOptions());
+ providedSegmentContexts = new HashMap<>(segmentContexts.size());
+ for (SegmentContext sc : segmentContexts) {
+ providedSegmentContexts.put(sc.getIndexSegment(), sc);
+ }
+ } finally {
+ tumm.unlockForSegmentContexts();
+ }
+ }
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Processing requestId: {} with segmentsToQuery: {},
optionalSegments: {} and acquiredSegments: {}",
requestId, segmentsToQuery, optionalSegments,
segmentDataManagers.stream().map(SegmentDataManager::getSegmentName).collect(Collectors.toList()));
}
- List<IndexSegment> indexSegments = new ArrayList<>(numSegmentsAcquired);
- for (SegmentDataManager segmentDataManager : segmentDataManagers) {
- indexSegments.add(segmentDataManager.getSegment());
- }
// Gather stats for realtime consuming segments
// TODO: the freshness time should not be collected at query time because
there is no guarantee that the consuming
@@ -259,8 +307,8 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
InstanceResponseBlock instanceResponse = null;
try {
instanceResponse =
- executeInternal(tableDataManager, indexSegments, queryContext,
timerContext, executorService, streamer,
- queryRequest.isEnableStreaming());
+ executeInternal(tableDataManager, indexSegments,
providedSegmentContexts, queryContext, timerContext,
+ executorService, streamer, queryRequest.isEnableStreaming());
} catch (Exception e) {
_serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
instanceResponse = new InstanceResponseBlock();
@@ -346,12 +394,27 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
return instanceResponse;
}
+ private boolean isUpsertTableUsingConsistencyMode(TableDataManager
tableDataManager) {
+ // For upsert table using consistency mode, we have to acquire segments
and get their validDocIds atomically.
+ // Otherwise, the query may not have access to new segments added just
before it gets validDocIds for the acquired
+ // segments, thus missing valid docs replaced by the newly added segments.
+ if (tableDataManager instanceof RealtimeTableDataManager) {
+ RealtimeTableDataManager rtdm = (RealtimeTableDataManager)
tableDataManager;
+ if (rtdm.isUpsertEnabled()) {
+ return rtdm.getTableUpsertMetadataManager().getUpsertConsistencyMode()
!= UpsertConfig.ConsistencyMode.NONE;
+ }
+ }
+ return false;
+ }
+
// NOTE: This method might change indexSegments. Do not use it after calling
this method.
private InstanceResponseBlock executeInternal(TableDataManager
tableDataManager, List<IndexSegment> indexSegments,
- QueryContext queryContext, TimerContext timerContext, ExecutorService
executorService,
- @Nullable ResultsBlockStreamer streamer, boolean enableStreaming)
+ @Nullable Map<IndexSegment, SegmentContext> providedSegmentContexts,
QueryContext queryContext,
+ TimerContext timerContext, ExecutorService executorService, @Nullable
ResultsBlockStreamer streamer,
+ boolean enableStreaming)
throws Exception {
- handleSubquery(queryContext, tableDataManager, indexSegments,
timerContext, executorService);
+ handleSubquery(queryContext, tableDataManager, indexSegments,
providedSegmentContexts, timerContext,
+ executorService);
// Compute total docs for the table before pruning the segments
long numTotalDocs = 0;
@@ -382,8 +445,13 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
}
} else {
TimerContext.Timer planBuildTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
- List<SegmentContext> selectedSegmentContexts =
- tableDataManager.getSegmentContexts(selectedSegments,
queryContext.getQueryOptions());
+ List<SegmentContext> selectedSegmentContexts;
+ if (providedSegmentContexts == null) {
+ selectedSegmentContexts =
tableDataManager.getSegmentContexts(selectedSegments,
queryContext.getQueryOptions());
+ } else {
+ selectedSegmentContexts = new ArrayList<>(selectedSegments.size());
+ selectedSegments.forEach(s ->
selectedSegmentContexts.add(providedSegmentContexts.get(s)));
+ }
Plan queryPlan =
enableStreaming ?
_planMaker.makeStreamingInstancePlan(selectedSegmentContexts, queryContext,
executorService,
streamer, _serverMetrics)
@@ -529,11 +597,12 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
* <p>Currently only supports subquery within the filter.
*/
private void handleSubquery(QueryContext queryContext, TableDataManager
tableDataManager,
- List<IndexSegment> indexSegments, TimerContext timerContext,
ExecutorService executorService)
+ List<IndexSegment> indexSegments, @Nullable Map<IndexSegment,
SegmentContext> providedSegmentContexts,
+ TimerContext timerContext, ExecutorService executorService)
throws Exception {
FilterContext filter = queryContext.getFilter();
if (filter != null && !filter.isConstant()) {
- handleSubquery(filter, tableDataManager, indexSegments, timerContext,
executorService,
+ handleSubquery(filter, tableDataManager, indexSegments,
providedSegmentContexts, timerContext, executorService,
queryContext.getEndTimeMs());
}
}
@@ -543,16 +612,18 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
* <p>Currently only supports subquery within the lhs of the predicate.
*/
private void handleSubquery(FilterContext filter, TableDataManager
tableDataManager, List<IndexSegment> indexSegments,
- TimerContext timerContext, ExecutorService executorService, long
endTimeMs)
+ @Nullable Map<IndexSegment, SegmentContext> providedSegmentContexts,
TimerContext timerContext,
+ ExecutorService executorService, long endTimeMs)
throws Exception {
List<FilterContext> children = filter.getChildren();
if (children != null) {
for (FilterContext child : children) {
- handleSubquery(child, tableDataManager, indexSegments, timerContext,
executorService, endTimeMs);
+ handleSubquery(child, tableDataManager, indexSegments,
providedSegmentContexts, timerContext, executorService,
+ endTimeMs);
}
} else {
- handleSubquery(filter.getPredicate().getLhs(), tableDataManager,
indexSegments, timerContext, executorService,
- endTimeMs);
+ handleSubquery(filter.getPredicate().getLhs(), tableDataManager,
indexSegments, providedSegmentContexts,
+ timerContext, executorService, endTimeMs);
}
}
@@ -564,7 +635,8 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
* rewritten to an IN_ID_SET transform function.
*/
private void handleSubquery(ExpressionContext expression, TableDataManager
tableDataManager,
- List<IndexSegment> indexSegments, TimerContext timerContext,
ExecutorService executorService, long endTimeMs)
+ List<IndexSegment> indexSegments, @Nullable Map<IndexSegment,
SegmentContext> providedSegmentContexts,
+ TimerContext timerContext, ExecutorService executorService, long
endTimeMs)
throws Exception {
FunctionContext function = expression.getFunction();
if (function == null) {
@@ -591,8 +663,8 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
subquery.setEndTimeMs(endTimeMs);
// Make a clone of indexSegments because the method might modify the list
InstanceResponseBlock instanceResponse =
- executeInternal(tableDataManager, new ArrayList<>(indexSegments),
subquery, timerContext, executorService,
- null, false);
+ executeInternal(tableDataManager, new ArrayList<>(indexSegments),
providedSegmentContexts, subquery,
+ timerContext, executorService, null, false);
BaseResultsBlock resultsBlock = instanceResponse.getResultsBlock();
Preconditions.checkState(resultsBlock instanceof AggregationResultsBlock,
"Got unexpected results block type: %s, expecting aggregation
results",
@@ -605,7 +677,8 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
arguments.set(1,
ExpressionContext.forLiteral(RequestUtils.getLiteral(((IdSet)
result).toBase64String())));
} else {
for (ExpressionContext argument : arguments) {
- handleSubquery(argument, tableDataManager, indexSegments,
timerContext, executorService, endTimeMs);
+ handleSubquery(argument, tableDataManager, indexSegments,
providedSegmentContexts, timerContext,
+ executorService, endTimeMs);
}
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/DuoSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/DuoSegmentDataManagerTest.java
new file mode 100644
index 0000000000..acc030cfe6
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/DuoSegmentDataManagerTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager;
+
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+
+
+public class DuoSegmentDataManagerTest {
+ @Test
+ public void testGetSegments() {
+ SegmentDataManager sdm1 = mockSegmentDataManager("seg01", false, 1);
+ SegmentDataManager sdm2 = mockSegmentDataManager("seg01", true, 1);
+ DuoSegmentDataManager dsdm = new DuoSegmentDataManager(sdm1, sdm2);
+
+ assertTrue(dsdm.hasMultiSegments());
+ assertSame(dsdm.getSegment(), sdm1.getSegment());
+ assertEquals(dsdm.getSegments(), Arrays.asList(sdm1.getSegment(),
sdm2.getSegment()));
+
+ when(sdm1.getReferenceCount()).thenReturn(0);
+ assertTrue(dsdm.hasMultiSegments());
+ assertSame(dsdm.getSegment(), sdm1.getSegment());
+ assertEquals(dsdm.getSegments(),
Collections.singletonList(sdm2.getSegment()));
+
+ when(sdm2.getReferenceCount()).thenReturn(0);
+ assertTrue(dsdm.hasMultiSegments());
+ assertSame(dsdm.getSegment(), sdm1.getSegment());
+ assertTrue(dsdm.getSegments().isEmpty());
+ }
+
+ @Test
+ public void testIncDecRefCnt() {
+ SegmentDataManager sdm1 = mockSegmentDataManager("seg01", false, 1);
+ SegmentDataManager sdm2 = mockSegmentDataManager("seg01", true, 1);
+ DuoSegmentDataManager dsdm = new DuoSegmentDataManager(sdm1, sdm2);
+
+ when(sdm1.increaseReferenceCount()).thenReturn(false);
+ when(sdm2.increaseReferenceCount()).thenReturn(false);
+ when(sdm1.decreaseReferenceCount()).thenReturn(false);
+ when(sdm2.decreaseReferenceCount()).thenReturn(false);
+ assertFalse(dsdm.increaseReferenceCount());
+ assertFalse(dsdm.decreaseReferenceCount());
+
+ when(sdm1.increaseReferenceCount()).thenReturn(true);
+ when(sdm2.increaseReferenceCount()).thenReturn(false);
+ when(sdm1.decreaseReferenceCount()).thenReturn(true);
+ when(sdm2.decreaseReferenceCount()).thenReturn(false);
+ assertTrue(dsdm.increaseReferenceCount());
+ assertTrue(dsdm.decreaseReferenceCount());
+
+ when(sdm1.increaseReferenceCount()).thenReturn(true);
+ when(sdm2.increaseReferenceCount()).thenReturn(true);
+ when(sdm1.decreaseReferenceCount()).thenReturn(true);
+ when(sdm2.decreaseReferenceCount()).thenReturn(true);
+ assertTrue(dsdm.increaseReferenceCount());
+ assertTrue(dsdm.decreaseReferenceCount());
+ }
+
+ @Test
+ public void testDoOffloadDestroy() {
+ SegmentDataManager sdm1 = mockSegmentDataManager("seg01", false, 1);
+ SegmentDataManager sdm2 = mockSegmentDataManager("seg01", true, 1);
+ DuoSegmentDataManager dsdm = new DuoSegmentDataManager(sdm1, sdm2);
+
+ dsdm.doOffload();
+ verify(sdm1, times(0)).offload();
+ verify(sdm2, times(0)).offload();
+ dsdm.doDestroy();
+ verify(sdm1, times(0)).destroy();
+ verify(sdm2, times(0)).destroy();
+
+ when(sdm1.getReferenceCount()).thenReturn(0);
+ dsdm.doOffload();
+ verify(sdm1, times(1)).offload();
+ verify(sdm2, times(0)).offload();
+ dsdm.doDestroy();
+ verify(sdm1, times(1)).destroy();
+ verify(sdm2, times(0)).destroy();
+
+ reset(sdm1);
+ when(sdm2.getReferenceCount()).thenReturn(0);
+ dsdm.doOffload();
+ verify(sdm1, times(1)).offload();
+ verify(sdm2, times(1)).offload();
+ dsdm.doDestroy();
+ verify(sdm1, times(1)).destroy();
+ verify(sdm2, times(1)).destroy();
+ }
+
+ private SegmentDataManager mockSegmentDataManager(String segmentName,
boolean isMutable, int refCnt) {
+ SegmentDataManager segmentDataManager =
+ isMutable ? mock(RealtimeSegmentDataManager.class) :
mock(ImmutableSegmentDataManager.class);
+ IndexSegment segment = isMutable ? mock(MutableSegment.class) :
mock(ImmutableSegment.class);
+ when(segmentDataManager.getSegmentName()).thenReturn(segmentName);
+ when(segmentDataManager.getSegment()).thenReturn(segment);
+ when(segmentDataManager.getReferenceCount()).thenReturn(refCnt);
+ return segmentDataManager;
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
index 77b01a97e8..2edd3c134a 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
@@ -18,7 +18,8 @@
*/
package org.apache.pinot.segment.local.data.manager;
-import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -36,7 +37,6 @@ public abstract class SegmentDataManager {
return _loadTimeMs;
}
- @VisibleForTesting
public synchronized int getReferenceCount() {
return _referenceCount;
}
@@ -74,6 +74,14 @@ public abstract class SegmentDataManager {
public abstract IndexSegment getSegment();
+ public boolean hasMultiSegments() {
+ return false;
+ }
+
+ public List<IndexSegment> getSegments() {
+ return Collections.emptyList();
+ }
+
/**
* Offloads the segment from the metadata management (e.g. upsert metadata),
but not releases the resources yet
* because there might be queries still accessing the segment.
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index ff49a36c8d..76129dabb8 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -265,6 +265,9 @@ public class ImmutableSegmentImpl implements
ImmutableSegment {
public void destroy() {
String segmentName = getSegmentName();
LOGGER.info("Trying to destroy segment : {}", segmentName);
+ if (_partitionUpsertMetadataManager != null) {
+ _partitionUpsertMetadataManager.untrackSegmentForUpsertView(this);
+ }
// StarTreeIndexContainer refers to other column index containers, so
close it firstly.
if (_starTreeIndexContainer != null) {
try {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 730d7a02ea..908f220471 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -91,6 +91,7 @@ import
org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
@@ -162,6 +163,7 @@ public class MutableSegmentImpl implements MutableSegment {
private final String _dedupTimeColumn;
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+ private final UpsertConfig.ConsistencyMode _upsertConsistencyMode;
private final List<String> _upsertComparisonColumns;
private final String _deleteRecordColumn;
private final String _upsertOutOfOrderRecordColumn;
@@ -363,6 +365,7 @@ public class MutableSegmentImpl implements MutableSegment {
}
_partitionUpsertMetadataManager =
config.getPartitionUpsertMetadataManager();
+ _upsertConsistencyMode = config.getUpsertConsistencyMode();
if (_partitionUpsertMetadataManager != null) {
Preconditions.checkState(!isAggregateMetricsEnabled(),
"Metrics aggregation and upsert cannot be enabled together");
@@ -488,23 +491,37 @@ public class MutableSegmentImpl implements MutableSegment
{
if (isUpsertEnabled()) {
RecordInfo recordInfo = getRecordInfo(row, numDocsIndexed);
GenericRow updatedRow =
_partitionUpsertMetadataManager.updateRecord(row, recordInfo);
- // if record doesn't need to be dropped, then persist in segment and
update metadata hashmap
- // we are doing metadata update first followed by segment data update
here, there can be a scenario where
- // segment indexing or addNewRow call errors out in those scenario,
there can be metadata inconsistency where
- // a key is pointing to some other key's docID
- // TODO fix this metadata mismatch scenario
- boolean isOutOfOrderRecord =
!_partitionUpsertMetadataManager.addRecord(this, recordInfo);
- if (_upsertOutOfOrderRecordColumn != null) {
- updatedRow.putValue(_upsertOutOfOrderRecordColumn,
BooleanUtils.toInt(isOutOfOrderRecord));
- }
- if (!isOutOfOrderRecord || !_upsertDropOutOfOrderRecord) {
+ if (_upsertConsistencyMode != UpsertConfig.ConsistencyMode.NONE) {
updateDictionary(updatedRow);
addNewRow(numDocsIndexed, updatedRow);
- // Update number of documents indexed before handling the upsert
metadata so that the record becomes queryable
- // once validated
numDocsIndexed++;
+ canTakeMore = numDocsIndexed < _capacity;
+ _numDocsIndexed = numDocsIndexed;
+ // Index the record and update _numDocsIndexed counter before updating
the upsert metadata so that the record
+ // becomes queryable before validDocIds bitmaps are updated. This
order is important for consistent upsert view,
+ // otherwise the latest doc can be missed by query due to 'docId <
_numDocs' check in query filter operators.
+ // NOTE: out-of-order records can not be dropped or marked when
consistent upsert view is enabled.
+ _partitionUpsertMetadataManager.addRecord(this, recordInfo);
+ } else {
+ // if record doesn't need to be dropped, then persist in segment and
update metadata hashmap
+ // we are doing metadata update first followed by segment data update
here, there can be a scenario where
+ // segment indexing or addNewRow call errors out in those scenario,
there can be metadata inconsistency where
+ // a key is pointing to some other key's docID
+ // TODO fix this metadata mismatch scenario
+ boolean isOutOfOrderRecord =
!_partitionUpsertMetadataManager.addRecord(this, recordInfo);
+ if (_upsertOutOfOrderRecordColumn != null) {
+ updatedRow.putValue(_upsertOutOfOrderRecordColumn,
BooleanUtils.toInt(isOutOfOrderRecord));
+ }
+ if (!isOutOfOrderRecord || !_upsertDropOutOfOrderRecord) {
+ updateDictionary(updatedRow);
+ addNewRow(numDocsIndexed, updatedRow);
+ // Update number of documents indexed before handling the upsert
metadata so that the record becomes queryable
+ // once validated
+ numDocsIndexed++;
+ }
+ canTakeMore = numDocsIndexed < _capacity;
+ _numDocsIndexed = numDocsIndexed;
}
- canTakeMore = numDocsIndexed < _capacity;
} else {
// Update dictionary first
updateDictionary(row);
@@ -523,8 +540,8 @@ public class MutableSegmentImpl implements MutableSegment {
aggregateMetrics(row, docId);
canTakeMore = true;
}
+ _numDocsIndexed = numDocsIndexed;
}
- _numDocsIndexed = numDocsIndexed;
// Update last indexed time and latest ingestion time
_lastIndexedTimeMs = System.currentTimeMillis();
@@ -960,7 +977,9 @@ public class MutableSegmentImpl implements MutableSegment {
@Override
public void destroy() {
_logger.info("Trying to close RealtimeSegmentImpl : {}", _segmentName);
-
+ if (_partitionUpsertMetadataManager != null) {
+ _partitionUpsertMetadataManager.untrackSegmentForUpsertView(this);
+ }
// Gather statistics for off-heap mode
if (_offHeap) {
if (_numDocsIndexed > 0) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 2d5e7d10d2..6fb0c203f5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -60,6 +60,7 @@ public class RealtimeSegmentConfig {
private final boolean _aggregateMetrics;
private final boolean _nullHandlingEnabled;
private final UpsertConfig.Mode _upsertMode;
+ private final UpsertConfig.ConsistencyMode _upsertConsistencyMode;
private final List<String> _upsertComparisonColumns;
private final String _upsertDeleteRecordColumn;
private final String _upsertOutOfOrderRecordColumn;
@@ -74,14 +75,14 @@ public class RealtimeSegmentConfig {
// TODO: Clean up this constructor. Most of these things can be extracted
from tableConfig.
private RealtimeSegmentConfig(String tableNameWithType, String segmentName,
String streamName, Schema schema,
String timeColumnName, int capacity, int avgNumMultiValues, Map<String,
FieldIndexConfigs> indexConfigByCol,
- SegmentZKMetadata segmentZKMetadata, boolean offHeap,
- PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory
statsHistory, String partitionColumn,
- PartitionFunction partitionFunction, int partitionId, boolean
aggregateMetrics, boolean nullHandlingEnabled,
- String consumerDir, UpsertConfig.Mode upsertMode, List<String>
upsertComparisonColumns,
- String upsertDeleteRecordColumn, String upsertOutOfOrderRecordColumn,
boolean upsertDropOutOfOrderRecord,
- PartitionUpsertMetadataManager partitionUpsertMetadataManager, String
dedupTimeColumn,
- PartitionDedupMetadataManager partitionDedupMetadataManager,
List<FieldConfig> fieldConfigList,
- List<AggregationConfig> ingestionAggregationConfigs) {
+ SegmentZKMetadata segmentZKMetadata, boolean offHeap,
PinotDataBufferMemoryManager memoryManager,
+ RealtimeSegmentStatsHistory statsHistory, String partitionColumn,
PartitionFunction partitionFunction,
+ int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled,
String consumerDir,
+ UpsertConfig.Mode upsertMode, UpsertConfig.ConsistencyMode
upsertConsistencyMode,
+ List<String> upsertComparisonColumns, String upsertDeleteRecordColumn,
String upsertOutOfOrderRecordColumn,
+ boolean upsertDropOutOfOrderRecord, PartitionUpsertMetadataManager
partitionUpsertMetadataManager,
+ String dedupTimeColumn, PartitionDedupMetadataManager
partitionDedupMetadataManager,
+ List<FieldConfig> fieldConfigList, List<AggregationConfig>
ingestionAggregationConfigs) {
_tableNameWithType = tableNameWithType;
_segmentName = segmentName;
_streamName = streamName;
@@ -101,6 +102,7 @@ public class RealtimeSegmentConfig {
_nullHandlingEnabled = nullHandlingEnabled;
_consumerDir = consumerDir;
_upsertMode = upsertMode != null ? upsertMode : UpsertConfig.Mode.NONE;
+ _upsertConsistencyMode = upsertConsistencyMode != null ?
upsertConsistencyMode : UpsertConfig.ConsistencyMode.NONE;
_upsertComparisonColumns = upsertComparisonColumns;
_upsertDeleteRecordColumn = upsertDeleteRecordColumn;
_upsertOutOfOrderRecordColumn = upsertOutOfOrderRecordColumn;
@@ -188,6 +190,10 @@ public class RealtimeSegmentConfig {
return _upsertMode;
}
+ public UpsertConfig.ConsistencyMode getUpsertConsistencyMode() {
+ return _upsertConsistencyMode;
+ }
+
public boolean isDedupEnabled() {
return _partitionDedupMetadataManager != null;
}
@@ -248,6 +254,7 @@ public class RealtimeSegmentConfig {
private boolean _nullHandlingEnabled = false;
private String _consumerDir;
private UpsertConfig.Mode _upsertMode;
+ private UpsertConfig.ConsistencyMode _upsertConsistencyMode;
private List<String> _upsertComparisonColumns;
private String _upsertDeleteRecordColumn;
private String _upsertOutOfOrderRecordColumn;
@@ -383,6 +390,11 @@ public class RealtimeSegmentConfig {
return this;
}
+ public Builder setUpsertConsistencyMode(UpsertConfig.ConsistencyMode
upsertConsistencyMode) {
+ _upsertConsistencyMode = upsertConsistencyMode;
+ return this;
+ }
+
public Builder setUpsertComparisonColumns(List<String>
upsertComparisonColumns) {
_upsertComparisonColumns = upsertComparisonColumns;
return this;
@@ -437,8 +449,8 @@ public class RealtimeSegmentConfig {
return new RealtimeSegmentConfig(_tableNameWithType, _segmentName,
_streamName, _schema, _timeColumnName,
_capacity, _avgNumMultiValues,
Collections.unmodifiableMap(indexConfigByCol), _segmentZKMetadata, _offHeap,
_memoryManager, _statsHistory, _partitionColumn, _partitionFunction,
_partitionId, _aggregateMetrics,
- _nullHandlingEnabled, _consumerDir, _upsertMode,
_upsertComparisonColumns, _upsertDeleteRecordColumn,
- _upsertOutOfOrderRecordColumn, _upsertDropOutOfOrderRecord,
+ _nullHandlingEnabled, _consumerDir, _upsertMode,
_upsertConsistencyMode, _upsertComparisonColumns,
+ _upsertDeleteRecordColumn, _upsertOutOfOrderRecordColumn,
_upsertDropOutOfOrderRecord,
_partitionUpsertMetadataManager, _dedupTimeColumn,
_partitionDedupMetadataManager, _fieldConfigList,
_ingestionAggregationConfigs);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 6951fc8197..880a7ace86 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -33,7 +33,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -58,7 +57,6 @@ import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
-import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
@@ -70,7 +68,6 @@ import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
-import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.segment.spi.V1Constants;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
@@ -138,19 +135,8 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
private final Lock _preloadLock = new ReentrantLock();
private volatile boolean _isPreloading;
- // There are two consistency modes:
- // If using SYNC mode, the upsert threads take the WLock when the upsert
involves two segments' bitmaps; and
- // the query threads take the RLock when getting bitmaps for all its
selected segments.
- // If using SNAPSHOT mode, the query threads don't need to take lock when
getting bitmaps for all its selected
- // segments, as the query threads access a copy of bitmaps that are kept
updated by upsert thread periodically. But
- // the query thread can specify a freshness threshold query option to
refresh the bitmap copies if not fresh enough.
- // By default, the mode is NONE to disable the support for data consistency.
- private final UpsertConfig.ConsistencyMode _consistencyMode;
- private final long _upsertViewRefreshIntervalMs;
- private final ReadWriteLock _upsertViewLock = new ReentrantReadWriteLock();
- private final Set<IndexSegment> _updatedSegmentsSinceLastRefresh =
ConcurrentHashMap.newKeySet();
- private volatile long _lastUpsertViewRefreshTimeMs = 0;
- private volatile Map<IndexSegment, MutableRoaringBitmap>
_segmentQueryableDocIdsMap;
+ // By default, the upsert consistency mode is NONE and upsertViewManager is
disabled.
+ private final UpsertViewManager _upsertViewManager;
protected BasePartitionUpsertMetadataManager(String tableNameWithType, int
partitionId, UpsertContext context) {
_tableNameWithType = tableNameWithType;
@@ -168,8 +154,11 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
_deletedKeysTTL = context.getDeletedKeysTTL();
_tableIndexDir = context.getTableIndexDir();
UpsertConfig.ConsistencyMode cmode = context.getConsistencyMode();
- _consistencyMode = cmode != null ? cmode :
UpsertConfig.ConsistencyMode.NONE;
- _upsertViewRefreshIntervalMs = context.getUpsertViewRefreshIntervalMs();
+ if (cmode == UpsertConfig.ConsistencyMode.SYNC || cmode ==
UpsertConfig.ConsistencyMode.SNAPSHOT) {
+ _upsertViewManager = new UpsertViewManager(cmode, context);
+ } else {
+ _upsertViewManager = null;
+ }
_serverMetrics = ServerMetrics.get();
_logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId +
"-" + getClass().getSimpleName());
if (_metadataTTL > 0) {
@@ -706,8 +695,12 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// (1) skip loading segments without any invalid docs.
// (2) assign the invalid docs from the replaced segment to the new
segment.
String segmentName = segment.getSegmentName();
- MutableRoaringBitmap validDocIdsForOldSegment =
- oldSegment.getValidDocIds() != null ?
oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+ MutableRoaringBitmap validDocIdsForOldSegment = null;
+ if (_upsertViewManager == null) {
+ // When not using consistency mode, we use a copy of the validDocIds
bitmap of the old segment to keep the old
+ // segment intact during segment replacement and queries access the old
segment during segment replacement.
+ validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
+ }
if (recordInfoIterator != null) {
Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
"Got unsupported segment implementation: {} for segment: {}, table:
{}", segment.getClass(), segmentName,
@@ -721,7 +714,11 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds,
queryableDocIds, recordInfoIterator, oldSegment,
validDocIdsForOldSegment);
}
-
+ if (_upsertViewManager != null) {
+ // When using consistency mode, the old segment's bitmap is updated in
place, so we get the validDocIds after
+ // segment replacement is done.
+ validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
+ }
if (validDocIdsForOldSegment != null &&
!validDocIdsForOldSegment.isEmpty()) {
int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
if (_partialUpsertHandler != null) {
@@ -740,6 +737,10 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
}
+ private MutableRoaringBitmap getValidDocIdsForOldSegment(IndexSegment
oldSegment) {
+ return oldSegment.getValidDocIds() != null ?
oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+ }
+
protected void removeSegment(IndexSegment segment, MutableRoaringBitmap
validDocIds) {
try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment,
_primaryKeyColumns)) {
removeSegment(segment,
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
@@ -1132,31 +1133,12 @@ public abstract class
BasePartitionUpsertMetadataManager implements PartitionUps
protected void replaceDocId(IndexSegment newSegment,
ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment
oldSegment, int oldDocId, int newDocId,
RecordInfo recordInfo) {
- if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
- _upsertViewLock.writeLock().lock();
- try {
- doRemoveDocId(oldSegment, oldDocId);
- doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
- } finally {
- _upsertViewLock.writeLock().unlock();
- }
- } else if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
- _upsertViewLock.readLock().lock();
- try {
- doRemoveDocId(oldSegment, oldDocId);
- doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
- _updatedSegmentsSinceLastRefresh.add(newSegment);
- _updatedSegmentsSinceLastRefresh.add(oldSegment);
- } finally {
- _upsertViewLock.readLock().unlock();
- // Batch refresh takes WLock. Do it outside RLock for clarity. The R/W
lock ensures that only one thread
- // can refresh the bitmaps. The other threads that are about to update
the bitmaps will be blocked until
- // refreshing is done.
- doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
- }
+ if (_upsertViewManager == null) {
+ UpsertUtils.doRemoveDocId(oldSegment, oldDocId);
+ UpsertUtils.doAddDocId(validDocIds, queryableDocIds, newDocId,
recordInfo);
} else {
- doRemoveDocId(oldSegment, oldDocId);
- doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
+ _upsertViewManager.replaceDocId(newSegment, validDocIds,
queryableDocIds, oldSegment, oldDocId, newDocId,
+ recordInfo);
}
trackUpdatedSegmentsSinceLastSnapshot(oldSegment);
}
@@ -1169,71 +1151,27 @@ public abstract class
BasePartitionUpsertMetadataManager implements PartitionUps
*/
protected void replaceDocId(IndexSegment segment,
ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId,
int newDocId, RecordInfo recordInfo) {
- if (_consistencyMode != UpsertConfig.ConsistencyMode.SNAPSHOT) {
- doReplaceDocId(validDocIds, queryableDocIds, oldDocId, newDocId,
recordInfo);
+ if (_upsertViewManager == null) {
+ UpsertUtils.doReplaceDocId(validDocIds, queryableDocIds, oldDocId,
newDocId, recordInfo);
} else {
- _upsertViewLock.readLock().lock();
- try {
- doReplaceDocId(validDocIds, queryableDocIds, oldDocId, newDocId,
recordInfo);
- _updatedSegmentsSinceLastRefresh.add(segment);
- } finally {
- _upsertViewLock.readLock().unlock();
- // Batch refresh takes WLock. Do it outside RLock for clarity.
- doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
- }
- }
- }
-
- private void doReplaceDocId(ThreadSafeMutableRoaringBitmap validDocIds,
- @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId,
int newDocId, RecordInfo recordInfo) {
- validDocIds.replace(oldDocId, newDocId);
- if (queryableDocIds != null) {
- if (recordInfo.isDeleteRecord()) {
- queryableDocIds.remove(oldDocId);
- } else {
- queryableDocIds.replace(oldDocId, newDocId);
- }
+ _upsertViewManager.replaceDocId(segment, validDocIds, queryableDocIds,
oldDocId, newDocId, recordInfo);
}
}
protected void addDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap
validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId,
RecordInfo recordInfo) {
- if (_consistencyMode != UpsertConfig.ConsistencyMode.SNAPSHOT) {
- doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
+ if (_upsertViewManager == null) {
+ UpsertUtils.doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
} else {
- _upsertViewLock.readLock().lock();
- try {
- doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
- _updatedSegmentsSinceLastRefresh.add(segment);
- } finally {
- _upsertViewLock.readLock().unlock();
- // Batch refresh takes WLock. Do it outside RLock for clarity.
- doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
- }
- }
- }
-
- private void doAddDocId(ThreadSafeMutableRoaringBitmap validDocIds,
- @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId,
RecordInfo recordInfo) {
- validDocIds.add(docId);
- if (queryableDocIds != null && !recordInfo.isDeleteRecord()) {
- queryableDocIds.add(docId);
+ _upsertViewManager.addDocId(segment, validDocIds, queryableDocIds,
docId, recordInfo);
}
}
protected void removeDocId(IndexSegment segment, int docId) {
- if (_consistencyMode != UpsertConfig.ConsistencyMode.SNAPSHOT) {
- doRemoveDocId(segment, docId);
+ if (_upsertViewManager == null) {
+ UpsertUtils.doRemoveDocId(segment, docId);
} else {
- _upsertViewLock.readLock().lock();
- try {
- doRemoveDocId(segment, docId);
- _updatedSegmentsSinceLastRefresh.add(segment);
- } finally {
- _upsertViewLock.readLock().unlock();
- // Batch refresh takes WLock. Do it outside RLock for clarity.
- doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
- }
+ _upsertViewManager.removeDocId(segment, docId);
}
trackUpdatedSegmentsSinceLastSnapshot(segment);
}
@@ -1249,112 +1187,25 @@ public abstract class
BasePartitionUpsertMetadataManager implements PartitionUps
}
}
- private void doRemoveDocId(IndexSegment segment, int docId) {
- Objects.requireNonNull(segment.getValidDocIds()).remove(docId);
- ThreadSafeMutableRoaringBitmap currentQueryableDocIds =
segment.getQueryableDocIds();
- if (currentQueryableDocIds != null) {
- currentQueryableDocIds.remove(docId);
- }
- }
-
- /**
- * Use the segmentContexts to collect the contexts for selected segments.
Reuse the segmentContext object if
- * present, to avoid overwriting the contexts specified at the others places.
- */
- public void setSegmentContexts(List<SegmentContext> segmentContexts,
Map<String, String> queryOptions) {
- if (_consistencyMode == UpsertConfig.ConsistencyMode.NONE) {
- setSegmentContexts(segmentContexts);
- return;
- }
- if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
- _upsertViewLock.readLock().lock();
- try {
- setSegmentContexts(segmentContexts);
- return;
- } finally {
- _upsertViewLock.readLock().unlock();
- }
- }
- // If batch refresh is enabled, the copy of bitmaps is kept updated and
ready to use for a consistent view.
- // The locking between query threads and upsert threads can be avoided
when using batch refresh.
- // Besides, queries can share the copy of bitmaps, w/o cloning the bitmaps
by every single query.
- // If query has specified a need for certain freshness, check the view and
refresh it as needed.
- // When refreshing the copy of map, we need to take the WLock so only one
thread is refreshing view.
- long upsertViewFreshnessMs =
- Math.min(QueryOptionsUtils.getUpsertViewFreshnessMs(queryOptions),
_upsertViewRefreshIntervalMs);
- if (upsertViewFreshnessMs < 0) {
- upsertViewFreshnessMs = _upsertViewRefreshIntervalMs;
- }
- doBatchRefreshUpsertView(upsertViewFreshnessMs);
- Map<IndexSegment, MutableRoaringBitmap> currentUpsertView =
_segmentQueryableDocIdsMap;
- for (SegmentContext segmentContext : segmentContexts) {
- IndexSegment segment = segmentContext.getIndexSegment();
- MutableRoaringBitmap segmentView = currentUpsertView.get(segment);
- if (segmentView != null) {
- segmentContext.setQueryableDocIdsSnapshot(segmentView);
- }
- }
+ protected void doClose()
+ throws IOException {
}
- private void setSegmentContexts(List<SegmentContext> segmentContexts) {
- for (SegmentContext segmentContext : segmentContexts) {
- IndexSegment segment = segmentContext.getIndexSegment();
- if (_trackedSegments.contains(segment)) {
-
segmentContext.setQueryableDocIdsSnapshot(UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
- }
- }
+ public UpsertViewManager getUpsertViewManager() {
+ return _upsertViewManager;
}
- private boolean skipUpsertViewRefresh(long upsertViewFreshnessMs) {
- if (upsertViewFreshnessMs < 0) {
- return true;
+ @Override
+ public void trackSegmentForUpsertView(IndexSegment segment) {
+ if (_upsertViewManager != null) {
+ _upsertViewManager.trackSegment(segment);
}
- return _lastUpsertViewRefreshTimeMs + upsertViewFreshnessMs >
System.currentTimeMillis();
}
- private void doBatchRefreshUpsertView(long upsertViewFreshnessMs) {
- // Always refresh if the current view is still empty.
- if (skipUpsertViewRefresh(upsertViewFreshnessMs) &&
_segmentQueryableDocIdsMap != null) {
- return;
- }
- _upsertViewLock.writeLock().lock();
- try {
- // Check again with lock, and always refresh if the current view is
still empty.
- Map<IndexSegment, MutableRoaringBitmap> current =
_segmentQueryableDocIdsMap;
- if (skipUpsertViewRefresh(upsertViewFreshnessMs) && current != null) {
- return;
- }
- Map<IndexSegment, MutableRoaringBitmap> updated = new HashMap<>();
- for (IndexSegment segment : _trackedSegments) {
- // Update bitmap for segment updated since last refresh or not in the
view yet. This also handles segments
- // that are tracked by _trackedSegments but not by
_updatedSegmentsSinceLastRefresh, like those didn't update
- // any bitmaps as their docs simply lost all the upsert comparisons
with the existing docs.
- if (current == null || current.get(segment) == null ||
_updatedSegmentsSinceLastRefresh.contains(segment)) {
- updated.put(segment,
UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
- } else {
- updated.put(segment, current.get(segment));
- }
- }
- // Swap in the new consistent set of bitmaps.
- _segmentQueryableDocIdsMap = updated;
- _updatedSegmentsSinceLastRefresh.clear();
- _lastUpsertViewRefreshTimeMs = System.currentTimeMillis();
- } finally {
- _upsertViewLock.writeLock().unlock();
+ @Override
+ public void untrackSegmentForUpsertView(IndexSegment segment) {
+ if (_upsertViewManager != null) {
+ _upsertViewManager.untrackSegment(segment);
}
}
-
- @VisibleForTesting
- Map<IndexSegment, MutableRoaringBitmap> getSegmentQueryableDocIdsMap() {
- return _segmentQueryableDocIdsMap;
- }
-
- @VisibleForTesting
- Set<IndexSegment> getUpdatedSegmentsSinceLastRefresh() {
- return _updatedSegmentsSinceLastRefresh;
- }
-
- protected void doClose()
- throws IOException {
- }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 400519079b..78a76474b6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -111,6 +111,11 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
return _context.getPartialUpsertHandler() == null ? UpsertConfig.Mode.FULL
: UpsertConfig.Mode.PARTIAL;
}
+ @Override
+ public UpsertConfig.ConsistencyMode getUpsertConsistencyMode() {
+ return _consistencyMode;
+ }
+
@Override
public boolean isEnablePreload() {
return _enablePreload;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index a0bf20a2de..ba24f2c912 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -103,9 +103,14 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
// snapshot for the old segment, which can be updated and used
to track the docs not replaced yet.
if (currentSegment == oldSegment) {
if (comparisonResult >= 0) {
- addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
- if (validDocIdsForOldSegment != null) {
- validDocIdsForOldSegment.remove(currentDocId);
+ if (validDocIdsForOldSegment == null && oldSegment != null
&& oldSegment.getValidDocIds() != null) {
+ // Update the old segment's bitmap in place if a copy of
the bitmap was not provided.
+ replaceDocId(segment, validDocIds, queryableDocIds,
oldSegment, currentDocId, newDocId, recordInfo);
+ } else {
+ addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
+ if (validDocIdsForOldSegment != null) {
+ validDocIdsForOldSegment.remove(currentDocId);
+ }
}
return new RecordLocation(segment, newDocId,
newComparisonValue);
} else {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
index 7054b1c030..b49d09e04b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -120,9 +120,14 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
// snapshot for the old segment, which can be updated and used
to track the docs not replaced yet.
if (currentSegment == oldSegment) {
if (comparisonResult >= 0) {
- addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
- if (validDocIdsForOldSegment != null) {
- validDocIdsForOldSegment.remove(currentDocId);
+ if (validDocIdsForOldSegment == null && oldSegment != null
&& oldSegment.getValidDocIds() != null) {
+ // Update the old segment's bitmap in place if a copy of
the bitmap was not provided.
+ replaceDocId(segment, validDocIds, queryableDocIds,
oldSegment, currentDocId, newDocId, recordInfo);
+ } else {
+ addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
+ if (validDocIdsForOldSegment != null) {
+ validDocIdsForOldSegment.remove(currentDocId);
+ }
}
return new RecordLocation(segment, newDocId,
newComparisonValue,
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 45a6487a82..7280b86833 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -20,14 +20,19 @@ package org.apache.pinot.segment.local.upsert;
import java.io.IOException;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -35,6 +40,8 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
*/
@ThreadSafe
public class ConcurrentMapTableUpsertMetadataManager extends
BaseTableUpsertMetadataManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConcurrentMapTableUpsertMetadataManager.class);
+
private final Map<Integer, BasePartitionUpsertMetadataManager>
_partitionMetadataManagerMap =
new ConcurrentHashMap<>();
@@ -62,22 +69,54 @@ public class ConcurrentMapTableUpsertMetadataManager
extends BaseTableUpsertMeta
return partitionToPrimaryKeyCount;
}
+ @Override
+ public void lockForSegmentContexts() {
+ _partitionMetadataManagerMap.forEach(
+ (partitionID, upsertMetadataManager) ->
upsertMetadataManager.getUpsertViewManager().lockTrackedSegments());
+ }
+
+ @Override
+ public void unlockForSegmentContexts() {
+ _partitionMetadataManagerMap.forEach(
+ (partitionID, upsertMetadataManager) ->
upsertMetadataManager.getUpsertViewManager().unlockTrackedSegments());
+ }
+
+ @Override
+ public Set<String> getOptionalSegments() {
+ Set<String> optionalSegments = new HashSet<>();
+ _partitionMetadataManagerMap.forEach((partitionID, upsertMetadataManager)
-> optionalSegments.addAll(
+ upsertMetadataManager.getUpsertViewManager().getOptionalSegments()));
+ return optionalSegments;
+ }
+
@Override
public void setSegmentContexts(List<SegmentContext> segmentContexts,
Map<String, String> queryOptions) {
- if (_consistencyMode != UpsertConfig.ConsistencyMode.NONE &&
!QueryOptionsUtils.isSkipUpsertView(queryOptions)) {
- // Get queryableDocIds bitmaps from partitionMetadataManagers if any
consistency mode is used.
- _partitionMetadataManagerMap.forEach(
- (partitionID, upsertMetadataManager) ->
upsertMetadataManager.setSegmentContexts(segmentContexts,
- queryOptions));
- }
- // If no consistency mode is used, we get queryableDocIds bitmaps as kept
by the segment objects directly.
- // Even if consistency mode is used, we should still check if any segment
doesn't get its validDocIds bitmap,
- // because partitionMetadataManagers may not track all segments of the
table, like those out of the metadata TTL.
- for (SegmentContext segmentContext : segmentContexts) {
- if (segmentContext.getQueryableDocIdsSnapshot() == null) {
+ // Get queryableDocIds bitmaps from partitionMetadataManagers if any
consistency mode is used.
+ // Otherwise, get queryableDocIds bitmaps as kept by the segment objects
directly as before.
+ if (_consistencyMode == UpsertConfig.ConsistencyMode.NONE ||
QueryOptionsUtils.isSkipUpsertView(queryOptions)) {
+ for (SegmentContext segmentContext : segmentContexts) {
IndexSegment segment = segmentContext.getIndexSegment();
segmentContext.setQueryableDocIdsSnapshot(UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
}
+ return;
+ }
+ // All segments should have been tracked by partitionMetadataManagers to
provide queries consistent upsert view.
+ _partitionMetadataManagerMap.forEach(
+ (partitionID, upsertMetadataManager) ->
upsertMetadataManager.getUpsertViewManager()
+ .setSegmentContexts(segmentContexts, queryOptions));
+ if (LOGGER.isDebugEnabled()) {
+ for (SegmentContext segmentContext : segmentContexts) {
+ IndexSegment segment = segmentContext.getIndexSegment();
+ if (segmentContext.getQueryableDocIdsSnapshot() == null) {
+ LOGGER.debug("No upsert view for segment: {}, type: {}, total: {}",
segment.getSegmentName(),
+ (segment instanceof ImmutableSegment ? "imm" : "mut"),
segment.getSegmentMetadata().getTotalDocs());
+ } else {
+ int cardCnt =
segmentContext.getQueryableDocIdsSnapshot().getCardinality();
+ LOGGER.debug("Got upsert view of segment: {}, type: {}, total: {},
valid: {}", segment.getSegmentName(),
+ (segment instanceof ImmutableSegment ? "imm" : "mut"),
segment.getSegmentMetadata().getTotalDocs(),
+ cardCnt);
+ }
+ }
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 1c72470242..56c1ca7e14 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -116,4 +116,17 @@ public interface PartitionUpsertMetadataManager extends
Closeable {
* Stops the metadata manager. After invoking this method, no access to the
metadata will be accepted.
*/
void stop();
+
+ /**
+ * Track segments for upsert view, and this method must be called before
registering segment to the table manager,
+ * so that the segment is included in the upsert view before it becomes
visible to the query.
+ */
+ void trackSegmentForUpsertView(IndexSegment segment);
+
+ /**
+ * Untrack segments for upsert view, and this method must be called when
segment is to be destroyed, when the
+ * segment is not used by any queries. Untrack segment after unregistering
the segment is not safe, as there may be
+ * ongoing queries that are still accessing the segment.
+ */
+ void untrackSegmentForUpsertView(IndexSegment segment);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index 123e3db553..b16c66af50 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -19,8 +19,10 @@
package org.apache.pinot.segment.local.upsert;
import java.io.Closeable;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.spi.SegmentContext;
@@ -41,6 +43,8 @@ public interface TableUpsertMetadataManager extends Closeable
{
UpsertConfig.Mode getUpsertMode();
+ UpsertConfig.ConsistencyMode getUpsertConsistencyMode();
+
boolean isEnablePreload();
/**
@@ -57,4 +61,14 @@ public interface TableUpsertMetadataManager extends
Closeable {
default void setSegmentContexts(List<SegmentContext> segmentContexts,
Map<String, String> queryOptions) {
}
+
+ default void lockForSegmentContexts() {
+ }
+
+ default void unlockForSegmentContexts() {
+ }
+
+ default Set<String> getOptionalSegments() {
+ return Collections.emptySet();
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index 3086da1969..42c20f8aad 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
@@ -41,12 +42,48 @@ public class UpsertUtils {
@Nullable
public static MutableRoaringBitmap
getQueryableDocIdsSnapshotFromSegment(IndexSegment segment) {
+ return getQueryableDocIdsSnapshotFromSegment(segment, false);
+ }
+
+ public static MutableRoaringBitmap
getQueryableDocIdsSnapshotFromSegment(IndexSegment segment,
+ boolean useEmptyForNull) {
ThreadSafeMutableRoaringBitmap queryableDocIds =
segment.getQueryableDocIds();
if (queryableDocIds != null) {
return queryableDocIds.getMutableRoaringBitmap();
}
ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
- return validDocIds != null ? validDocIds.getMutableRoaringBitmap() : null;
+ return validDocIds != null ? validDocIds.getMutableRoaringBitmap()
+ : (useEmptyForNull ? new MutableRoaringBitmap() : null);
+ }
+
+
+ public static void doReplaceDocId(ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId,
int newDocId, RecordInfo recordInfo) {
+ validDocIds.replace(oldDocId, newDocId);
+ if (queryableDocIds != null) {
+ if (recordInfo.isDeleteRecord()) {
+ queryableDocIds.remove(oldDocId);
+ } else {
+ queryableDocIds.replace(oldDocId, newDocId);
+ }
+ }
+ }
+
+ public static void doAddDocId(ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId,
RecordInfo recordInfo) {
+ validDocIds.add(docId);
+ if (queryableDocIds != null && !recordInfo.isDeleteRecord()) {
+ queryableDocIds.add(docId);
+ }
+ }
+
+
+ public static void doRemoveDocId(IndexSegment segment, int docId) {
+ Objects.requireNonNull(segment.getValidDocIds()).remove(docId);
+ ThreadSafeMutableRoaringBitmap currentQueryableDocIds =
segment.getQueryableDocIds();
+ if (currentQueryableDocIds != null) {
+ currentQueryableDocIds.remove(docId);
+ }
}
/**
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertViewManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertViewManager.java
new file mode 100644
index 0000000000..009eac7969
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertViewManager.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.SegmentContext;
+import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class is used to provide the specified consistency mode for upsert
table by tracking the segments and
+ * synchronizing the accesses to the validDocIds of those tracked segments
properly. Two consistency modes are
+ * supported currently:
+ * - SYNC mode, the upsert threads take the WLock when the upsert involves two
segments' bitmaps; and the query
+ * threads take the RLock when getting bitmaps for all its selected segments.
+ * - SNAPSHOT mode, the query threads don't need to take lock when getting
bitmaps for all its selected segments, as
+ * the query threads access a copy of bitmaps that are kept updated by upsert
thread periodically. But the query
+ * thread can specify a freshness threshold query option to refresh the bitmap
copies if not fresh enough.
+ */
+public class UpsertViewManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(UpsertViewManager.class);
+ private final UpsertConfig.ConsistencyMode _consistencyMode;
+
+ // NOTE that we can't reuse _trackedSegments map in
BasePartitionUpsertMetadataManager, as it doesn't track all
+ // segments like those out of the metadata TTL, and it's called after adding
segments to the table manager so the
+ // new segments become queryable before upsert view can get updated. So we
use a separate map to track the segments
+ // properly. Besides, updating the set of tracked segments must be
synchronized with queries getting segment
+ // contexts, so the need of the R/W lock.
+ private final ReadWriteLock _trackedSegmentsLock = new
ReentrantReadWriteLock();
+ private final Set<IndexSegment> _trackedSegments =
ConcurrentHashMap.newKeySet();
+ // Optional segments are part of the tracked segments. They can get
processed by server before getting included in
+ // broker's routing table, like the new consuming segment. Although broker
misses such segments, the server needs
+ // to acquire them to avoid missing the new valid docs in them.
+ private final Set<String> _optionalSegments = ConcurrentHashMap.newKeySet();
+
+ // Updating and accessing segments' validDocIds bitmaps are synchronized
with a separate R/W lock for clarity.
+ // The query threads always get _upsertViewTrackedSegmentsLock then
_upsertViewSegmentDocIdsLock to avoid deadlock.
+ // And the upsert threads never nest the two locks.
+ private final ReadWriteLock _upsertViewLock = new ReentrantReadWriteLock();
+ private volatile Map<IndexSegment, MutableRoaringBitmap>
_segmentQueryableDocIdsMap;
+
+ // For SNAPSHOT mode, track segments that get new updates since last refresh
to reduce the overhead of refreshing.
+ private final Set<IndexSegment> _updatedSegmentsSinceLastRefresh =
ConcurrentHashMap.newKeySet();
+ private volatile long _lastUpsertViewRefreshTimeMs = 0;
+ private final long _upsertViewRefreshIntervalMs;
+
+ public UpsertViewManager(UpsertConfig.ConsistencyMode consistencyMode,
UpsertContext context) {
+ _consistencyMode = consistencyMode;
+ _upsertViewRefreshIntervalMs = context.getUpsertViewRefreshIntervalMs();
+ }
+
+ public void replaceDocId(IndexSegment newSegment,
ThreadSafeMutableRoaringBitmap validDocIds,
+ ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment,
int oldDocId, int newDocId,
+ RecordInfo recordInfo) {
+ if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
+ _upsertViewLock.writeLock().lock();
+ try {
+ UpsertUtils.doRemoveDocId(oldSegment, oldDocId);
+ UpsertUtils.doAddDocId(validDocIds, queryableDocIds, newDocId,
recordInfo);
+ return;
+ } finally {
+ _upsertViewLock.writeLock().unlock();
+ }
+ }
+ // For SNAPSHOT mode, take read lock to sync with the batch refresh.
+ _upsertViewLock.readLock().lock();
+ try {
+ UpsertUtils.doRemoveDocId(oldSegment, oldDocId);
+ UpsertUtils.doAddDocId(validDocIds, queryableDocIds, newDocId,
recordInfo);
+ _updatedSegmentsSinceLastRefresh.add(newSegment);
+ _updatedSegmentsSinceLastRefresh.add(oldSegment);
+ } finally {
+ _upsertViewLock.readLock().unlock();
+ // Batch refresh takes WLock. Do it outside RLock for clarity. The R/W
lock ensures that only one thread
+ // can refresh the bitmaps. The other threads that are about to update
the bitmaps will be blocked until
+ // refreshing is done.
+ doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs, false);
+ }
+ }
+
+ public void replaceDocId(IndexSegment segment,
ThreadSafeMutableRoaringBitmap validDocIds,
+ ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, int
newDocId, RecordInfo recordInfo) {
+ if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
+ UpsertUtils.doReplaceDocId(validDocIds, queryableDocIds, oldDocId,
newDocId, recordInfo);
+ return;
+ }
+ // For SNAPSHOT mode, take read lock to sync with the batch refresh.
+ _upsertViewLock.readLock().lock();
+ try {
+ UpsertUtils.doReplaceDocId(validDocIds, queryableDocIds, oldDocId,
newDocId, recordInfo);
+ _updatedSegmentsSinceLastRefresh.add(segment);
+ } finally {
+ _upsertViewLock.readLock().unlock();
+ // Batch refresh takes WLock. Do it outside RLock for clarity.
+ doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs, false);
+ }
+ }
+
+ public void addDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap
validDocIds,
+ ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, RecordInfo
recordInfo) {
+ if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
+ UpsertUtils.doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
+ return;
+ }
+ // For SNAPSHOT mode, take read lock to sync with the batch refresh.
+ _upsertViewLock.readLock().lock();
+ try {
+ UpsertUtils.doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
+ _updatedSegmentsSinceLastRefresh.add(segment);
+ } finally {
+ _upsertViewLock.readLock().unlock();
+ // Batch refresh takes WLock. Do it outside RLock for clarity.
+ doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs, false);
+ }
+ }
+
+ public void removeDocId(IndexSegment segment, int docId) {
+ if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
+ UpsertUtils.doRemoveDocId(segment, docId);
+ return;
+ }
+ // For SNAPSHOT mode, take read lock to sync with the batch refresh.
+ _upsertViewLock.readLock().lock();
+ try {
+ UpsertUtils.doRemoveDocId(segment, docId);
+ _updatedSegmentsSinceLastRefresh.add(segment);
+ } finally {
+ _upsertViewLock.readLock().unlock();
+ // Batch refresh takes WLock. Do it outside RLock for clarity.
+ doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs, false);
+ }
+ }
+
+ /**
+ * Use the segmentContexts to collect the contexts for selected segments.
Reuse the segmentContext object if
+ * present, to avoid overwriting the contexts specified at the others places.
+ */
+ public void setSegmentContexts(List<SegmentContext> segmentContexts,
Map<String, String> queryOptions) {
+ if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
+ _upsertViewLock.readLock().lock();
+ try {
+ setSegmentContexts(segmentContexts);
+ return;
+ } finally {
+ _upsertViewLock.readLock().unlock();
+ }
+ }
+ // If batch refresh is enabled, the copy of bitmaps is kept updated and
ready to use for a consistent view.
+ // The locking between query threads and upsert threads can be avoided
when using batch refresh.
+ // Besides, queries can share the copy of bitmaps, w/o cloning the bitmaps
by every single query.
+ // If query has specified a need for certain freshness, check the view and
refresh it as needed.
+ // When refreshing the copy of map, we need to take the WLock so only one
thread is refreshing view.
+ long upsertViewFreshnessMs =
+ Math.min(QueryOptionsUtils.getUpsertViewFreshnessMs(queryOptions),
_upsertViewRefreshIntervalMs);
+ if (upsertViewFreshnessMs < 0) {
+ upsertViewFreshnessMs = _upsertViewRefreshIntervalMs;
+ }
+ doBatchRefreshUpsertView(upsertViewFreshnessMs, false);
+ Map<IndexSegment, MutableRoaringBitmap> currentUpsertView =
_segmentQueryableDocIdsMap;
+ for (SegmentContext segmentContext : segmentContexts) {
+ IndexSegment segment = segmentContext.getIndexSegment();
+ MutableRoaringBitmap segmentView = currentUpsertView.get(segment);
+ if (segmentView != null) {
+ segmentContext.setQueryableDocIdsSnapshot(segmentView);
+ }
+ }
+ }
+
+ private void setSegmentContexts(List<SegmentContext> segmentContexts) {
+ for (SegmentContext segmentContext : segmentContexts) {
+ IndexSegment segment = segmentContext.getIndexSegment();
+ if (_trackedSegments.contains(segment)) {
+
segmentContext.setQueryableDocIdsSnapshot(UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment,
true));
+ }
+ }
+ }
+
+ private boolean skipUpsertViewRefresh(long upsertViewFreshnessMs) {
+ if (upsertViewFreshnessMs < 0) {
+ return true;
+ }
+ return _lastUpsertViewRefreshTimeMs + upsertViewFreshnessMs >
System.currentTimeMillis();
+ }
+
+ @VisibleForTesting
+ void doBatchRefreshUpsertView(long upsertViewFreshnessMs, boolean
forceRefresh) {
+ // Always refresh if the current view is still empty.
+ if (!forceRefresh && skipUpsertViewRefresh(upsertViewFreshnessMs) &&
_segmentQueryableDocIdsMap != null) {
+ return;
+ }
+ _upsertViewLock.writeLock().lock();
+ try {
+ // Check again with lock, and always refresh if the current view is
still empty.
+ Map<IndexSegment, MutableRoaringBitmap> current =
_segmentQueryableDocIdsMap;
+ if (!forceRefresh && skipUpsertViewRefresh(upsertViewFreshnessMs) &&
current != null) {
+ return;
+ }
+ if (LOGGER.isDebugEnabled()) {
+ if (current == null) {
+ LOGGER.debug("Current upsert view is still null");
+ } else {
+ current.forEach(
+ (segment, bitmap) -> LOGGER.debug("Current upsert view of
segment: {}, type: {}, total: {}, valid: {}",
+ segment.getSegmentName(), (segment instanceof
ImmutableSegment ? "imm" : "mut"),
+ segment.getSegmentMetadata().getTotalDocs(),
bitmap.getCardinality()));
+ }
+ }
+ Map<IndexSegment, MutableRoaringBitmap> updated = new HashMap<>();
+ for (IndexSegment segment : _trackedSegments) {
+ // Update bitmap for segment updated since last refresh or not in the
view yet. This also handles segments
+ // that are tracked by _trackedSegments but not by
_updatedSegmentsSinceLastRefresh, like those didn't update
+ // any bitmaps as their docs simply lost all the upsert comparisons
with the existing docs.
+ if (current == null || current.get(segment) == null ||
_updatedSegmentsSinceLastRefresh.contains(segment)) {
+ updated.put(segment,
UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment, true));
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Update upsert view of segment: {}, type: {}, total:
{}, valid: {}, reason: {}",
+ segment.getSegmentName(), (segment instanceof ImmutableSegment
? "imm" : "mut"),
+ segment.getSegmentMetadata().getTotalDocs(),
updated.get(segment).getCardinality(),
+ current == null || current.get(segment) == null ? "no view
yet" : "bitmap updated");
+ }
+ } else {
+ updated.put(segment, current.get(segment));
+ }
+ }
+ // Swap in the new consistent set of bitmaps.
+ if (LOGGER.isDebugEnabled()) {
+ updated.forEach(
+ (segment, bitmap) -> LOGGER.debug("Updated upsert view of segment:
{}, type: {}, total: {}, valid: {}",
+ segment.getSegmentName(), (segment instanceof ImmutableSegment
? "imm" : "mut"),
+ segment.getSegmentMetadata().getTotalDocs(),
bitmap.getCardinality()));
+ }
+ _segmentQueryableDocIdsMap = updated;
+ _updatedSegmentsSinceLastRefresh.clear();
+ _lastUpsertViewRefreshTimeMs = System.currentTimeMillis();
+ } finally {
+ _upsertViewLock.writeLock().unlock();
+ }
+ }
+
+ public void lockTrackedSegments() {
+ _trackedSegmentsLock.readLock().lock();
+ }
+
+ public void unlockTrackedSegments() {
+ _trackedSegmentsLock.readLock().unlock();
+ }
+
+ public Set<String> getOptionalSegments() {
+ return _optionalSegments;
+ }
+
+ public void trackSegment(IndexSegment segment) {
+ _trackedSegmentsLock.writeLock().lock();
+ try {
+ _trackedSegments.add(segment);
+ if (segment instanceof MutableSegment) {
+ _optionalSegments.add(segment.getSegmentName());
+ }
+ if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
+ // Note: it's possible the segment is already tracked and the
_trackedSegments doesn't really change here. But
+ // we should force to refresh the upsert view to include the latest
bitmaps of the segments. This is
+ // important to fix a subtle race condition when commiting mutable
segment. During segment replacement, the
+ // queries can access both mutable and immutable segments. But as
replacement is done, the new queries can
+ // only access the immutable segment, thus require latest bitmap of
the segment in the upsert view.
+ // It's required to refresh with _trackedSegmentsLock so queries are
blocked until upsert view is updated.
+ doBatchRefreshUpsertView(0, true);
+ }
+ } finally {
+ _trackedSegmentsLock.writeLock().unlock();
+ }
+ }
+
+ public void untrackSegment(IndexSegment segment) {
+ _trackedSegmentsLock.writeLock().lock();
+ try {
+ _trackedSegments.remove(segment);
+ if (segment instanceof MutableSegment) {
+ _optionalSegments.remove(segment.getSegmentName());
+ }
+ // No need to eagerly refresh the upsert view for SNAPSHOT mode when
untracking a segment, as the untracked
+ // segment won't be used by any new queries, thus it can be removed when
next refresh happens later.
+ } finally {
+ _trackedSegmentsLock.writeLock().unlock();
+ }
+ }
+
+ @VisibleForTesting
+ Map<IndexSegment, MutableRoaringBitmap> getSegmentQueryableDocIdsMap() {
+ return _segmentQueryableDocIdsMap;
+ }
+
+ @VisibleForTesting
+ Set<IndexSegment> getUpdatedSegmentsSinceLastRefresh() {
+ return _updatedSegmentsSinceLastRefresh;
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index 9f86aa3326..fda7408e7a 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -369,19 +369,19 @@ public class BasePartitionUpsertMetadataManagerTest {
latch.await();
return validDocIds01;
});
- upsertMetadataManager.trackSegment(seg01);
+ upsertMetadataManager.trackSegmentForUpsertView(seg01);
segmentQueryableDocIdsMap.put(seg01, validDocIds01);
IndexSegment seg02 = mock(IndexSegment.class);
ThreadSafeMutableRoaringBitmap validDocIds02 =
createThreadSafeMutableRoaringBitmap(11);
when(seg02.getValidDocIds()).thenReturn(validDocIds02);
- upsertMetadataManager.trackSegment(seg02);
+ upsertMetadataManager.trackSegmentForUpsertView(seg02);
segmentQueryableDocIdsMap.put(seg02, validDocIds02);
IndexSegment seg03 = mock(IndexSegment.class);
ThreadSafeMutableRoaringBitmap validDocIds03 =
createThreadSafeMutableRoaringBitmap(12);
when(seg03.getValidDocIds()).thenReturn(validDocIds03);
- upsertMetadataManager.trackSegment(seg03);
+ upsertMetadataManager.trackSegmentForUpsertView(seg03);
segmentQueryableDocIdsMap.put(seg03, validDocIds03);
List<SegmentContext> segmentContexts = new ArrayList<>();
@@ -400,7 +400,7 @@ public class BasePartitionUpsertMetadataManagerTest {
Thread.sleep(10);
}
segmentQueryableDocIdsMap.forEach((k, v) -> segmentContexts.add(new
SegmentContext(k)));
- upsertMetadataManager.setSegmentContexts(segmentContexts, new
HashMap<>());
+
upsertMetadataManager.getUpsertViewManager().setSegmentContexts(segmentContexts,
new HashMap<>());
return System.currentTimeMillis() - startMs;
});
// The first thread can only finish after the latch is counted down
after 2s.
@@ -435,7 +435,7 @@ public class BasePartitionUpsertMetadataManagerTest {
throws Exception {
UpsertContext upsertContext = mock(UpsertContext.class);
when(upsertContext.getConsistencyMode()).thenReturn(UpsertConfig.ConsistencyMode.SNAPSHOT);
- when(upsertContext.getUpsertViewRefreshIntervalMs()).thenReturn(3000L);
+ when(upsertContext.getUpsertViewRefreshIntervalMs()).thenReturn(5L);
DummyPartitionUpsertMetadataManager upsertMetadataManager =
new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext);
@@ -443,31 +443,36 @@ public class BasePartitionUpsertMetadataManagerTest {
Map<IndexSegment, ThreadSafeMutableRoaringBitmap>
segmentQueryableDocIdsMap = new HashMap<>();
IndexSegment seg01 = mock(IndexSegment.class);
ThreadSafeMutableRoaringBitmap validDocIds01 =
createThreadSafeMutableRoaringBitmap(10);
- AtomicBoolean called = new AtomicBoolean(false);
- when(seg01.getValidDocIds()).then(invocationOnMock -> {
- called.set(true);
- latch.await();
- return validDocIds01;
- });
- upsertMetadataManager.trackSegment(seg01);
+ upsertMetadataManager.trackSegmentForUpsertView(seg01);
segmentQueryableDocIdsMap.put(seg01, validDocIds01);
IndexSegment seg02 = mock(IndexSegment.class);
ThreadSafeMutableRoaringBitmap validDocIds02 =
createThreadSafeMutableRoaringBitmap(11);
when(seg02.getValidDocIds()).thenReturn(validDocIds02);
- upsertMetadataManager.trackSegment(seg02);
+ upsertMetadataManager.trackSegmentForUpsertView(seg02);
segmentQueryableDocIdsMap.put(seg02, validDocIds02);
IndexSegment seg03 = mock(IndexSegment.class);
ThreadSafeMutableRoaringBitmap validDocIds03 =
createThreadSafeMutableRoaringBitmap(12);
when(seg03.getValidDocIds()).thenReturn(validDocIds03);
- upsertMetadataManager.trackSegment(seg03);
+ upsertMetadataManager.trackSegmentForUpsertView(seg03);
segmentQueryableDocIdsMap.put(seg03, validDocIds03);
List<SegmentContext> segmentContexts = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(2);
+
+ // Set up the awaiting logic here to not block the
trackSegmentForUpsertView methods above, as they refresh the
+ // upsert view.
+ AtomicBoolean called = new AtomicBoolean(false);
+ when(seg01.getValidDocIds()).then(invocationOnMock -> {
+ called.set(true);
+ latch.await();
+ return validDocIds01;
+ });
try {
- // This thread does replaceDocId and takes WLock first, and it'll
refresh the upsert view
+ // This thread does replaceDocId and takes WLock first, and it'll
refresh the upsert view Sleep a bit here to
+ // make the upsert view stale before doing the replaceDocId, to force it
to refresh the upsert view.
+ Thread.sleep(10);
executor.submit(() -> {
RecordInfo recordInfo = new RecordInfo(null, 5, null, false);
upsertMetadataManager.replaceDocId(seg03, validDocIds03, null, seg01,
0, 12, recordInfo);
@@ -481,7 +486,7 @@ public class BasePartitionUpsertMetadataManagerTest {
}
segmentQueryableDocIdsMap.forEach((k, v) -> segmentContexts.add(new
SegmentContext(k)));
// This thread reuses the upsert view refreshed by the first thread
above.
- upsertMetadataManager.setSegmentContexts(segmentContexts, new
HashMap<>());
+
upsertMetadataManager.getUpsertViewManager().setSegmentContexts(segmentContexts,
new HashMap<>());
return System.currentTimeMillis() - startMs;
});
// The first thread can only finish after the latch is counted down
after 2s.
@@ -494,14 +499,14 @@ public class BasePartitionUpsertMetadataManagerTest {
executor.shutdownNow();
}
assertEquals(segmentContexts.size(), 3);
- assertEquals(upsertMetadataManager.getSegmentQueryableDocIdsMap().size(),
3);
-
assertTrue(upsertMetadataManager.getUpdatedSegmentsSinceLastRefresh().isEmpty());
+
assertEquals(upsertMetadataManager.getUpsertViewManager().getSegmentQueryableDocIdsMap().size(),
3);
+
assertTrue(upsertMetadataManager.getUpsertViewManager().getUpdatedSegmentsSinceLastRefresh().isEmpty());
// Get the upsert view again, and the existing bitmap objects should be
set in segment contexts.
// The segmentContexts initialized above holds the same bitmaps objects as
from the upsert view.
List<SegmentContext> reuseSegmentContexts = new ArrayList<>();
segmentQueryableDocIdsMap.forEach((k, v) -> reuseSegmentContexts.add(new
SegmentContext(k)));
- upsertMetadataManager.setSegmentContexts(reuseSegmentContexts, new
HashMap<>());
+
upsertMetadataManager.getUpsertViewManager().setSegmentContexts(reuseSegmentContexts,
new HashMap<>());
for (SegmentContext reuseSC : reuseSegmentContexts) {
for (SegmentContext sc : segmentContexts) {
if (reuseSC.getIndexSegment() == sc.getIndexSegment()) {
@@ -524,12 +529,13 @@ public class BasePartitionUpsertMetadataManagerTest {
}
// Force refresh the upsert view when getting it, so different bitmap
objects should be set in segment contexts.
-
upsertMetadataManager.getUpdatedSegmentsSinceLastRefresh().addAll(Arrays.asList(seg01,
seg02, seg03));
+
upsertMetadataManager.getUpsertViewManager().getUpdatedSegmentsSinceLastRefresh()
+ .addAll(Arrays.asList(seg01, seg02, seg03));
List<SegmentContext> refreshSegmentContexts = new ArrayList<>();
segmentQueryableDocIdsMap.forEach((k, v) -> refreshSegmentContexts.add(new
SegmentContext(k)));
Map<String, String> queryOptions = new HashMap<>();
queryOptions.put("upsertViewFreshnessMs", "0");
- upsertMetadataManager.setSegmentContexts(refreshSegmentContexts,
queryOptions);
+
upsertMetadataManager.getUpsertViewManager().setSegmentContexts(refreshSegmentContexts,
queryOptions);
for (SegmentContext refreshSC : refreshSegmentContexts) {
for (SegmentContext sc : segmentContexts) {
if (refreshSC.getIndexSegment() == sc.getIndexSegment()) {
@@ -561,25 +567,23 @@ public class BasePartitionUpsertMetadataManagerTest {
DummyPartitionUpsertMetadataManager upsertMetadataManager =
new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext);
- CountDownLatch latch = new CountDownLatch(1);
Map<IndexSegment, ThreadSafeMutableRoaringBitmap>
segmentQueryableDocIdsMap = new HashMap<>();
IndexSegment seg01 = mock(IndexSegment.class);
ThreadSafeMutableRoaringBitmap validDocIds01 =
createThreadSafeMutableRoaringBitmap(10);
- AtomicBoolean called = new AtomicBoolean(false);
when(seg01.getValidDocIds()).thenReturn(validDocIds01);
- upsertMetadataManager.trackSegment(seg01);
+ upsertMetadataManager.trackSegmentForUpsertView(seg01);
segmentQueryableDocIdsMap.put(seg01, validDocIds01);
IndexSegment seg02 = mock(IndexSegment.class);
ThreadSafeMutableRoaringBitmap validDocIds02 =
createThreadSafeMutableRoaringBitmap(11);
when(seg02.getValidDocIds()).thenReturn(validDocIds02);
- upsertMetadataManager.trackSegment(seg02);
+ upsertMetadataManager.trackSegmentForUpsertView(seg02);
segmentQueryableDocIdsMap.put(seg02, validDocIds02);
IndexSegment seg03 = mock(IndexSegment.class);
ThreadSafeMutableRoaringBitmap validDocIds03 =
createThreadSafeMutableRoaringBitmap(12);
when(seg03.getValidDocIds()).thenReturn(validDocIds03);
- upsertMetadataManager.trackSegment(seg03);
+ upsertMetadataManager.trackSegmentForUpsertView(seg03);
segmentQueryableDocIdsMap.put(seg03, validDocIds03);
RecordInfo recordInfo = new RecordInfo(null, 5, null, false);
@@ -587,20 +591,22 @@ public class BasePartitionUpsertMetadataManagerTest {
List<SegmentContext> segmentContexts = new ArrayList<>();
segmentQueryableDocIdsMap.forEach((k, v) -> segmentContexts.add(new
SegmentContext(k)));
- upsertMetadataManager.setSegmentContexts(segmentContexts, new HashMap<>());
+
upsertMetadataManager.getUpsertViewManager().setSegmentContexts(segmentContexts,
new HashMap<>());
assertEquals(segmentContexts.size(), 3);
- assertEquals(upsertMetadataManager.getSegmentQueryableDocIdsMap().size(),
3);
+
assertEquals(upsertMetadataManager.getUpsertViewManager().getSegmentQueryableDocIdsMap().size(),
3);
// We can force to refresh the upsert view by clearing up the current
upsert view, even though there are no updated
// segments tracked in _updatedSegmentsSinceLastRefresh.
-
assertTrue(upsertMetadataManager.getUpdatedSegmentsSinceLastRefresh().isEmpty());
- upsertMetadataManager.getSegmentQueryableDocIdsMap().clear();
+
assertEquals(upsertMetadataManager.getUpsertViewManager().getUpdatedSegmentsSinceLastRefresh().size(),
2);
+
assertTrue(upsertMetadataManager.getUpsertViewManager().getUpdatedSegmentsSinceLastRefresh().contains(seg01));
+
assertTrue(upsertMetadataManager.getUpsertViewManager().getUpdatedSegmentsSinceLastRefresh().contains(seg03));
+
upsertMetadataManager.getUpsertViewManager().getSegmentQueryableDocIdsMap().clear();
List<SegmentContext> refreshSegmentContexts = new ArrayList<>();
segmentQueryableDocIdsMap.forEach((k, v) -> refreshSegmentContexts.add(new
SegmentContext(k)));
Map<String, String> queryOptions = new HashMap<>();
queryOptions.put("upsertViewFreshnessMs", "0");
- upsertMetadataManager.setSegmentContexts(refreshSegmentContexts,
queryOptions);
+
upsertMetadataManager.getUpsertViewManager().setSegmentContexts(refreshSegmentContexts,
queryOptions);
for (SegmentContext refreshSC : refreshSegmentContexts) {
for (SegmentContext sc : segmentContexts) {
if (refreshSC.getIndexSegment() == sc.getIndexSegment()) {
@@ -612,6 +618,7 @@ public class BasePartitionUpsertMetadataManagerTest {
// The upsert view holds a clone of the original queryableDocIds held by
the segment object.
assertNotSame(refreshSC.getQueryableDocIdsSnapshot(),
validDocIds.getMutableRoaringBitmap());
assertEquals(refreshSC.getQueryableDocIdsSnapshot(),
validDocIds.getMutableRoaringBitmap());
+ assertNotNull(refreshSC.getQueryableDocIdsSnapshot());
// docId=0 in seg01 got invalidated.
if (refreshSC.getIndexSegment() == seg01) {
assertFalse(refreshSC.getQueryableDocIdsSnapshot().contains(0));
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/UpsertViewManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/UpsertViewManagerTest.java
new file mode 100644
index 0000000000..dd179028ab
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/UpsertViewManagerTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.util.Collections;
+import java.util.HashMap;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.SegmentContext;
+import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+
+public class UpsertViewManagerTest {
+ @Test
+ public void testTrackUntrackSegments() {
+ UpsertViewManager mgr = new
UpsertViewManager(UpsertConfig.ConsistencyMode.SYNC, mock(UpsertContext.class));
+ IndexSegment seg1 = mock(MutableSegment.class);
+ ThreadSafeMutableRoaringBitmap threadSafeMutableRoaringBitmap =
mock(ThreadSafeMutableRoaringBitmap.class);
+ MutableRoaringBitmap mutableRoaringBitmap = new MutableRoaringBitmap();
+
when(threadSafeMutableRoaringBitmap.getMutableRoaringBitmap()).thenReturn(mutableRoaringBitmap);
+ when(seg1.getValidDocIds()).thenReturn(threadSafeMutableRoaringBitmap);
+ when(seg1.getSegmentName()).thenReturn("seg1");
+
+ SegmentContext segCtx1 = new SegmentContext(seg1);
+ mgr.trackSegment(seg1);
+ assertEquals(mgr.getOptionalSegments(),
Collections.singleton(seg1.getSegmentName()));
+ mgr.setSegmentContexts(Collections.singletonList(segCtx1), new
HashMap<>());
+ assertSame(segCtx1.getQueryableDocIdsSnapshot(), mutableRoaringBitmap);
+
+ mgr.untrackSegment(seg1);
+ assertTrue(mgr.getOptionalSegments().isEmpty());
+ segCtx1 = new SegmentContext(seg1);
+ mgr.setSegmentContexts(Collections.singletonList(segCtx1), new
HashMap<>());
+ assertNull(segCtx1.getQueryableDocIdsSnapshot());
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index 90338cc7af..9e72b8b76d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -383,6 +383,11 @@ public class TableConfig extends BaseJsonConfig {
return _upsertConfig != null && _upsertConfig.getMode() !=
UpsertConfig.Mode.NONE;
}
+ @JsonIgnore
+ public UpsertConfig.ConsistencyMode getUpsertConsistencyMode() {
+ return _upsertConfig == null ? UpsertConfig.ConsistencyMode.NONE :
_upsertConfig.getConsistencyMode();
+ }
+
@JsonIgnore
@Nullable
public List<String> getUpsertComparisonColumns() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]