This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ec7028e69 handle segments not tracked by partition mgr and add 
skipUpsertView query option (#13415)
5ec7028e69 is described below

commit 5ec7028e69d861104f502f91b649661c312b56d3
Author: Xiaobing <[email protected]>
AuthorDate: Mon Jun 17 17:18:51 2024 -0700

    handle segments not tracked by partition mgr and add skipUpsertView query 
option (#13415)
    
    * handle segments not tracked by partition mgr and add skipUpsertView query 
option for easy debug
---
 .../common/utils/config/QueryOptionsUtils.java     |   4 +
 .../apache/pinot/core/plan/FilterPlanNodeTest.java |   3 +-
 .../java/org/apache/pinot/core/plan/TestUtils.java |  43 --------
 ...adataAndDictionaryAggregationPlanMakerTest.java |   4 +-
 .../upsert/BasePartitionUpsertMetadataManager.java | 117 +++++++++++----------
 .../upsert/BaseTableUpsertMetadataManager.java     |  10 +-
 .../ConcurrentMapTableUpsertMetadataManager.java   |  21 +++-
 .../pinot/segment/local/upsert/UpsertUtils.java    |  11 ++
 .../BasePartitionUpsertMetadataManagerTest.java    |  79 +++++++++++++-
 .../apache/pinot/segment/spi/SegmentContext.java   |   5 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |   1 +
 11 files changed, 191 insertions(+), 107 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index a7c45c45d3..12bc750679 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -139,6 +139,10 @@ public class QueryOptionsUtils {
     return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UPSERT));
   }
 
+  public static boolean isSkipUpsertView(Map<String, String> queryOptions) {
+    return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UPSERT_VIEW));
+  }
+
   public static long getUpsertViewFreshnessMs(Map<String, String> 
queryOptions) {
     String freshnessMsString = 
queryOptions.get(QueryOptionKey.UPSERT_VIEW_FRESHNESS_MS);
     return freshnessMsString != null ? Long.parseLong(freshnessMsString) : -1;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/FilterPlanNodeTest.java 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/FilterPlanNodeTest.java
index 2fe671fa29..4f86f8d85a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/FilterPlanNodeTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/FilterPlanNodeTest.java
@@ -24,6 +24,7 @@ import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.operator.blocks.FilterBlock;
 import org.apache.pinot.core.operator.filter.BaseFilterOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.local.upsert.UpsertUtils;
 import org.apache.pinot.segment.spi.Constants;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentContext;
@@ -71,7 +72,7 @@ public class FilterPlanNodeTest {
     // Result should be invariant - always exactly 3 docs
     for (int i = 0; i < 10_000; i++) {
       SegmentContext segmentContext = new SegmentContext(segment);
-      
segmentContext.setQueryableDocIdsSnapshot(TestUtils.getQueryableDocIdsSnapshotFromSegment(segment));
+      
segmentContext.setQueryableDocIdsSnapshot(UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
       assertEquals(getNumberOfFilteredDocs(segmentContext, queryContext), 3);
     }
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/TestUtils.java 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/TestUtils.java
deleted file mode 100644
index 171489e5ed..0000000000
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/TestUtils.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.plan;
-
-import org.apache.pinot.segment.spi.IndexSegment;
-import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
-
-
-public class TestUtils {
-  private TestUtils() {
-  }
-
-  public static MutableRoaringBitmap 
getQueryableDocIdsSnapshotFromSegment(IndexSegment segment) {
-    MutableRoaringBitmap queryableDocIdsSnapshot = null;
-    ThreadSafeMutableRoaringBitmap queryableDocIds = 
segment.getQueryableDocIds();
-    if (queryableDocIds != null) {
-      queryableDocIdsSnapshot = queryableDocIds.getMutableRoaringBitmap();
-    } else {
-      ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
-      if (validDocIds != null) {
-        queryableDocIdsSnapshot = validDocIds.getMutableRoaringBitmap();
-      }
-    }
-    return queryableDocIdsSnapshot;
-  }
-}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 6d932489e9..e1b9008799 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -32,7 +32,6 @@ import 
org.apache.pinot.core.operator.query.FastFilteredCountOperator;
 import org.apache.pinot.core.operator.query.GroupByOperator;
 import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
 import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
-import org.apache.pinot.core.plan.TestUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
@@ -40,6 +39,7 @@ import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoa
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import 
org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager;
 import org.apache.pinot.segment.local.upsert.UpsertContext;
+import org.apache.pinot.segment.local.upsert.UpsertUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentContext;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -161,7 +161,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
     assertTrue(operatorClass.isInstance(operator));
 
     SegmentContext segmentContext = new SegmentContext(_upsertIndexSegment);
-    
segmentContext.setQueryableDocIdsSnapshot(TestUtils.getQueryableDocIdsSnapshotFromSegment(_upsertIndexSegment));
+    
segmentContext.setQueryableDocIdsSnapshot(UpsertUtils.getQueryableDocIdsSnapshotFromSegment(_upsertIndexSegment));
     Operator<?> upsertOperator = 
PLAN_MAKER.makeSegmentPlanNode(segmentContext, queryContext).run();
     assertTrue(upsertOperatorClass.isInstance(upsertOperator));
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index d59d77b540..182e75129b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -104,7 +104,9 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   protected final ServerMetrics _serverMetrics;
   protected final Logger _logger;
 
-  // Tracks all the segments managed by this manager (excluding EmptySegment)
+  // Tracks all the segments managed by this manager, excluding EmptySegment 
and segments out of metadata TTL.
+  // Basically, it's possible that some segments in the table partition are 
not tracked here, as their upsert metadata
+  // is not managed by the manager currently.
   protected final Set<IndexSegment> _trackedSegments = 
ConcurrentHashMap.newKeySet();
   // Track all the immutable segments where changes took place since last 
snapshot was taken.
   // Note: we need take to take _snapshotLock RLock while updating this set as 
it may be updated by the multiple
@@ -1124,24 +1126,29 @@ public abstract class 
BasePartitionUpsertMetadataManager implements PartitionUps
       RecordInfo recordInfo) {
     if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
       _upsertViewLock.writeLock().lock();
+      try {
+        doRemoveDocId(oldSegment, oldDocId);
+        doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
+      } finally {
+        _upsertViewLock.writeLock().unlock();
+      }
     } else if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
       _upsertViewLock.readLock().lock();
-    }
-    try {
-      doRemoveDocId(oldSegment, oldDocId);
-      doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
-    } finally {
-      if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
-        _upsertViewLock.writeLock().unlock();
-      } else if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
+      try {
+        doRemoveDocId(oldSegment, oldDocId);
+        doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
         _updatedSegmentsSinceLastRefresh.add(newSegment);
         _updatedSegmentsSinceLastRefresh.add(oldSegment);
+      } finally {
         _upsertViewLock.readLock().unlock();
         // Batch refresh takes WLock. Do it outside RLock for clarity. The R/W 
lock ensures that only one thread
         // can refresh the bitmaps. The other threads that are about to update 
the bitmaps will be blocked until
         // refreshing is done.
         doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
       }
+    } else {
+      doRemoveDocId(oldSegment, oldDocId);
+      doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
     }
     trackUpdatedSegmentsSinceLastSnapshot(oldSegment);
   }
@@ -1154,21 +1161,14 @@ public abstract class 
BasePartitionUpsertMetadataManager implements PartitionUps
    */
   protected void replaceDocId(IndexSegment segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, 
int newDocId, RecordInfo recordInfo) {
-    if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
+    if (_consistencyMode != UpsertConfig.ConsistencyMode.SNAPSHOT) {
+      doReplaceDocId(validDocIds, queryableDocIds, oldDocId, newDocId, 
recordInfo);
+    } else {
       _upsertViewLock.readLock().lock();
-    }
-    try {
-      validDocIds.replace(oldDocId, newDocId);
-      if (queryableDocIds != null) {
-        if (recordInfo.isDeleteRecord()) {
-          queryableDocIds.remove(oldDocId);
-        } else {
-          queryableDocIds.replace(oldDocId, newDocId);
-        }
-      }
-    } finally {
-      if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
+      try {
+        doReplaceDocId(validDocIds, queryableDocIds, oldDocId, newDocId, 
recordInfo);
         _updatedSegmentsSinceLastRefresh.add(segment);
+      } finally {
         _upsertViewLock.readLock().unlock();
         // Batch refresh takes WLock. Do it outside RLock for clarity.
         doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
@@ -1176,16 +1176,28 @@ public abstract class 
BasePartitionUpsertMetadataManager implements PartitionUps
     }
   }
 
+  private void doReplaceDocId(ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, 
int newDocId, RecordInfo recordInfo) {
+    validDocIds.replace(oldDocId, newDocId);
+    if (queryableDocIds != null) {
+      if (recordInfo.isDeleteRecord()) {
+        queryableDocIds.remove(oldDocId);
+      } else {
+        queryableDocIds.replace(oldDocId, newDocId);
+      }
+    }
+  }
+
   protected void addDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap 
validDocIds,
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, 
RecordInfo recordInfo) {
-    if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
-      _upsertViewLock.readLock().lock();
-    }
-    try {
+    if (_consistencyMode != UpsertConfig.ConsistencyMode.SNAPSHOT) {
       doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
-    } finally {
-      if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
+    } else {
+      _upsertViewLock.readLock().lock();
+      try {
+        doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
         _updatedSegmentsSinceLastRefresh.add(segment);
+      } finally {
         _upsertViewLock.readLock().unlock();
         // Batch refresh takes WLock. Do it outside RLock for clarity.
         doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
@@ -1193,8 +1205,8 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
   }
 
-  private void doAddDocId(ThreadSafeMutableRoaringBitmap validDocIds, 
ThreadSafeMutableRoaringBitmap queryableDocIds,
-      int docId, RecordInfo recordInfo) {
+  private void doAddDocId(ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, 
RecordInfo recordInfo) {
     validDocIds.add(docId);
     if (queryableDocIds != null && !recordInfo.isDeleteRecord()) {
       queryableDocIds.add(docId);
@@ -1202,14 +1214,14 @@ public abstract class 
BasePartitionUpsertMetadataManager implements PartitionUps
   }
 
   protected void removeDocId(IndexSegment segment, int docId) {
-    if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
-      _upsertViewLock.readLock().lock();
-    }
-    try {
+    if (_consistencyMode != UpsertConfig.ConsistencyMode.SNAPSHOT) {
       doRemoveDocId(segment, docId);
-    } finally {
-      if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
+    } else {
+      _upsertViewLock.readLock().lock();
+      try {
+        doRemoveDocId(segment, docId);
         _updatedSegmentsSinceLastRefresh.add(segment);
+      } finally {
         _upsertViewLock.readLock().unlock();
         // Batch refresh takes WLock. Do it outside RLock for clarity.
         doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
@@ -1276,25 +1288,11 @@ public abstract class 
BasePartitionUpsertMetadataManager implements PartitionUps
     }
   }
 
-  private static MutableRoaringBitmap 
getQueryableDocIdsSnapshotFromSegment(IndexSegment segment) {
-    MutableRoaringBitmap queryableDocIdsSnapshot = null;
-    ThreadSafeMutableRoaringBitmap queryableDocIds = 
segment.getQueryableDocIds();
-    if (queryableDocIds != null) {
-      queryableDocIdsSnapshot = queryableDocIds.getMutableRoaringBitmap();
-    } else {
-      ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
-      if (validDocIds != null) {
-        queryableDocIdsSnapshot = validDocIds.getMutableRoaringBitmap();
-      }
-    }
-    return queryableDocIdsSnapshot;
-  }
-
   private void setSegmentContexts(List<SegmentContext> segmentContexts) {
     for (SegmentContext segmentContext : segmentContexts) {
       IndexSegment segment = segmentContext.getIndexSegment();
       if (_trackedSegments.contains(segment)) {
-        
segmentContext.setQueryableDocIdsSnapshot(getQueryableDocIdsSnapshotFromSegment(segment));
+        
segmentContext.setQueryableDocIdsSnapshot(UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
       }
     }
   }
@@ -1320,8 +1318,11 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       }
       Map<IndexSegment, MutableRoaringBitmap> updated = new HashMap<>();
       for (IndexSegment segment : _trackedSegments) {
-        if (current == null || 
_updatedSegmentsSinceLastRefresh.contains(segment)) {
-          updated.put(segment, getQueryableDocIdsSnapshotFromSegment(segment));
+        // Update bitmap for segment updated since last refresh or not in the 
view yet. This also handles segments
+        // that are tracked by _trackedSegments but not by 
_updatedSegmentsSinceLastRefresh, like those didn't update
+        // any bitmaps as their docs simply lost all the upsert comparisons 
with the existing docs.
+        if (current == null || current.get(segment) == null || 
_updatedSegmentsSinceLastRefresh.contains(segment)) {
+          updated.put(segment, 
UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
         } else {
           updated.put(segment, current.get(segment));
         }
@@ -1335,6 +1336,16 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
   }
 
+  @VisibleForTesting
+  Map<IndexSegment, MutableRoaringBitmap> getSegmentQueryableDocIdsMap() {
+    return _segmentQueryableDocIdsMap;
+  }
+
+  @VisibleForTesting
+  Set<IndexSegment> getUpdatedSegmentsSinceLastRefresh() {
+    return _updatedSegmentsSinceLastRefresh;
+  }
+
   protected void doClose()
       throws IOException {
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index ea5aba5d21..2bdcc798f9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -39,6 +39,7 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
 
   protected String _tableNameWithType;
   protected UpsertContext _context;
+  protected UpsertConfig.ConsistencyMode _consistencyMode;
 
   @Override
   public void init(TableConfig tableConfig, Schema schema, TableDataManager 
tableDataManager) {
@@ -68,14 +69,17 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
     boolean enablePreload = upsertConfig.isEnablePreload();
     double metadataTTL = upsertConfig.getMetadataTTL();
     double deletedKeysTTL = upsertConfig.getDeletedKeysTTL();
-    UpsertConfig.ConsistencyMode consistencyMode = 
upsertConfig.getConsistencyMode();
+    _consistencyMode = upsertConfig.getConsistencyMode();
+    if (_consistencyMode == null) {
+      _consistencyMode = UpsertConfig.ConsistencyMode.NONE;
+    }
     long upsertViewRefreshIntervalMs = 
upsertConfig.getUpsertViewRefreshIntervalMs();
     File tableIndexDir = tableDataManager.getTableDataDir();
     _context = new 
UpsertContext.Builder().setTableConfig(tableConfig).setSchema(schema)
         
.setPrimaryKeyColumns(primaryKeyColumns).setComparisonColumns(comparisonColumns)
         
.setDeleteRecordColumn(deleteRecordColumn).setHashFunction(hashFunction)
         
.setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot).setEnablePreload(enablePreload)
-        
.setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setConsistencyMode(consistencyMode)
+        
.setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setConsistencyMode(_consistencyMode)
         
.setUpsertViewRefreshIntervalMs(upsertViewRefreshIntervalMs).setTableIndexDir(tableIndexDir)
         .setTableDataManager(tableDataManager).build();
     LOGGER.info(
@@ -84,7 +88,7 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
             + " deleted Keys TTL: {}, consistency mode: {}, upsert view 
refresh interval: {}ms, table index dir: {}",
         getClass().getSimpleName(), _tableNameWithType, primaryKeyColumns, 
comparisonColumns, deleteRecordColumn,
         hashFunction, upsertConfig.getMode(), enableSnapshot, enablePreload, 
metadataTTL, deletedKeysTTL,
-        consistencyMode, upsertViewRefreshIntervalMs, tableIndexDir);
+        _consistencyMode, upsertViewRefreshIntervalMs, tableIndexDir);
 
     initCustomVariables();
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 4c2f995df8..7b216acfc3 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -24,7 +24,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 
 
 /**
@@ -59,9 +62,21 @@ public class ConcurrentMapTableUpsertMetadataManager extends 
BaseTableUpsertMeta
 
   @Override
   public void setSegmentContexts(List<SegmentContext> segmentContexts, 
Map<String, String> queryOptions) {
-    _partitionMetadataManagerMap.forEach(
-        (partitionID, upsertMetadataManager) -> 
upsertMetadataManager.setSegmentContexts(segmentContexts,
-            queryOptions));
+    if (_consistencyMode != UpsertConfig.ConsistencyMode.NONE && 
!QueryOptionsUtils.isSkipUpsertView(queryOptions)) {
+      // Get queryableDocIds bitmaps from partitionMetadataManagers if any 
consistency mode is used.
+      _partitionMetadataManagerMap.forEach(
+          (partitionID, upsertMetadataManager) -> 
upsertMetadataManager.setSegmentContexts(segmentContexts,
+              queryOptions));
+    }
+    // If no consistency mode is used, we get queryableDocIds bitmaps as kept 
by the segment objects directly.
+    // Even if consistency mode is used, we should still check if any segment 
doesn't get its validDocIds bitmap,
+    // because partitionMetadataManagers may not track all segments of the 
table, like those out of the metadata TTL.
+    for (SegmentContext segmentContext : segmentContexts) {
+      if (segmentContext.getQueryableDocIdsSnapshot() == null) {
+        IndexSegment segment = segmentContext.getIndexSegment();
+        
segmentContext.setQueryableDocIdsSnapshot(UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
+      }
+    }
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index 8dd1d8c475..b29daab533 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -26,6 +26,7 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import org.apache.pinot.segment.spi.IndexSegment;
+import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.BooleanUtils;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -38,6 +39,16 @@ public class UpsertUtils {
   private UpsertUtils() {
   }
 
+  @Nullable
+  public static MutableRoaringBitmap 
getQueryableDocIdsSnapshotFromSegment(IndexSegment segment) {
+    ThreadSafeMutableRoaringBitmap queryableDocIds = 
segment.getQueryableDocIds();
+    if (queryableDocIds != null) {
+      return queryableDocIds.getMutableRoaringBitmap();
+    }
+    ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
+    return validDocIds != null ? validDocIds.getMutableRoaringBitmap() : null;
+  }
+
   /**
    * Returns an iterator of {@link RecordInfo} for all the documents from the 
segment.
    */
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index 3a6984829e..24b85d1121 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -412,7 +412,7 @@ public class BasePartitionUpsertMetadataManagerTest {
     } finally {
       executor.shutdownNow();
     }
-
+    assertEquals(segmentContexts.size(), 3);
     for (SegmentContext sc : segmentContexts) {
       ThreadSafeMutableRoaringBitmap validDocIds = 
segmentQueryableDocIdsMap.get(sc.getIndexSegment());
       assertNotNull(validDocIds);
@@ -493,10 +493,14 @@ public class BasePartitionUpsertMetadataManagerTest {
     } finally {
       executor.shutdownNow();
     }
+    assertEquals(segmentContexts.size(), 3);
+    assertEquals(upsertMetadataManager.getSegmentQueryableDocIdsMap().size(), 
3);
+    
assertTrue(upsertMetadataManager.getUpdatedSegmentsSinceLastRefresh().isEmpty());
 
     // Get the upsert view again, and the existing bitmap objects should be 
set in segment contexts.
     // The segmentContexts initialized above holds the same bitmaps objects as 
from the upsert view.
     List<SegmentContext> reuseSegmentContexts = new ArrayList<>();
+    segmentQueryableDocIdsMap.forEach((k, v) -> reuseSegmentContexts.add(new 
SegmentContext(k)));
     upsertMetadataManager.setSegmentContexts(reuseSegmentContexts, new 
HashMap<>());
     for (SegmentContext reuseSC : reuseSegmentContexts) {
       for (SegmentContext sc : segmentContexts) {
@@ -520,7 +524,80 @@ public class BasePartitionUpsertMetadataManagerTest {
     }
 
     // Force refresh the upsert view when getting it, so different bitmap 
objects should be set in segment contexts.
+    
upsertMetadataManager.getUpdatedSegmentsSinceLastRefresh().addAll(Arrays.asList(seg01,
 seg02, seg03));
+    List<SegmentContext> refreshSegmentContexts = new ArrayList<>();
+    segmentQueryableDocIdsMap.forEach((k, v) -> refreshSegmentContexts.add(new 
SegmentContext(k)));
+    Map<String, String> queryOptions = new HashMap<>();
+    queryOptions.put("upsertViewFreshnessMs", "0");
+    upsertMetadataManager.setSegmentContexts(refreshSegmentContexts, 
queryOptions);
+    for (SegmentContext refreshSC : refreshSegmentContexts) {
+      for (SegmentContext sc : segmentContexts) {
+        if (refreshSC.getIndexSegment() == sc.getIndexSegment()) {
+          assertNotSame(refreshSC.getQueryableDocIdsSnapshot(), 
sc.getQueryableDocIdsSnapshot());
+        }
+      }
+      ThreadSafeMutableRoaringBitmap validDocIds = 
segmentQueryableDocIdsMap.get(refreshSC.getIndexSegment());
+      assertNotNull(validDocIds);
+      // The upsert view holds a clone of the original queryableDocIds held by 
the segment object.
+      assertNotSame(refreshSC.getQueryableDocIdsSnapshot(), 
validDocIds.getMutableRoaringBitmap());
+      assertEquals(refreshSC.getQueryableDocIdsSnapshot(), 
validDocIds.getMutableRoaringBitmap());
+      // docId=0 in seg01 got invalidated.
+      if (refreshSC.getIndexSegment() == seg01) {
+        assertFalse(refreshSC.getQueryableDocIdsSnapshot().contains(0));
+      }
+      // docId=12 in seg03 was newly added.
+      if (refreshSC.getIndexSegment() == seg03) {
+        assertTrue(refreshSC.getQueryableDocIdsSnapshot().contains(12));
+      }
+    }
+  }
+
+  @Test
+  public void testConsistencyModeSnapshotWithUntrackedSegments()
+      throws Exception {
+    UpsertContext upsertContext = mock(UpsertContext.class);
+    
when(upsertContext.getConsistencyMode()).thenReturn(UpsertConfig.ConsistencyMode.SNAPSHOT);
+    when(upsertContext.getUpsertViewRefreshIntervalMs()).thenReturn(3000L);
+    DummyPartitionUpsertMetadataManager upsertMetadataManager =
+        new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext);
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Map<IndexSegment, ThreadSafeMutableRoaringBitmap> 
segmentQueryableDocIdsMap = new HashMap<>();
+    IndexSegment seg01 = mock(IndexSegment.class);
+    ThreadSafeMutableRoaringBitmap validDocIds01 = 
createThreadSafeMutableRoaringBitmap(10);
+    AtomicBoolean called = new AtomicBoolean(false);
+    when(seg01.getValidDocIds()).thenReturn(validDocIds01);
+    upsertMetadataManager.trackSegment(seg01);
+    segmentQueryableDocIdsMap.put(seg01, validDocIds01);
+
+    IndexSegment seg02 = mock(IndexSegment.class);
+    ThreadSafeMutableRoaringBitmap validDocIds02 = 
createThreadSafeMutableRoaringBitmap(11);
+    when(seg02.getValidDocIds()).thenReturn(validDocIds02);
+    upsertMetadataManager.trackSegment(seg02);
+    segmentQueryableDocIdsMap.put(seg02, validDocIds02);
+
+    IndexSegment seg03 = mock(IndexSegment.class);
+    ThreadSafeMutableRoaringBitmap validDocIds03 = 
createThreadSafeMutableRoaringBitmap(12);
+    when(seg03.getValidDocIds()).thenReturn(validDocIds03);
+    upsertMetadataManager.trackSegment(seg03);
+    segmentQueryableDocIdsMap.put(seg03, validDocIds03);
+
+    RecordInfo recordInfo = new RecordInfo(null, 5, null, false);
+    upsertMetadataManager.replaceDocId(seg03, validDocIds03, null, seg01, 0, 
12, recordInfo);
+
+    List<SegmentContext> segmentContexts = new ArrayList<>();
+    segmentQueryableDocIdsMap.forEach((k, v) -> segmentContexts.add(new 
SegmentContext(k)));
+    upsertMetadataManager.setSegmentContexts(segmentContexts, new HashMap<>());
+    assertEquals(segmentContexts.size(), 3);
+    assertEquals(upsertMetadataManager.getSegmentQueryableDocIdsMap().size(), 
3);
+
+    // We can force to refresh the upsert view by clearing up the current 
upsert view, even though there are no updated
+    // segments tracked in _updatedSegmentsSinceLastRefresh.
+    
assertTrue(upsertMetadataManager.getUpdatedSegmentsSinceLastRefresh().isEmpty());
+    upsertMetadataManager.getSegmentQueryableDocIdsMap().clear();
+
     List<SegmentContext> refreshSegmentContexts = new ArrayList<>();
+    segmentQueryableDocIdsMap.forEach((k, v) -> refreshSegmentContexts.add(new 
SegmentContext(k)));
     Map<String, String> queryOptions = new HashMap<>();
     queryOptions.put("upsertViewFreshnessMs", "0");
     upsertMetadataManager.setSegmentContexts(refreshSegmentContexts, 
queryOptions);
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentContext.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentContext.java
index 743a988769..f2c3b8f837 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentContext.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentContext.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pinot.segment.spi;
 
+import javax.annotation.Nullable;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
 public class SegmentContext {
   private final IndexSegment _indexSegment;
+  @Nullable
   private MutableRoaringBitmap _queryableDocIdsSnapshot = null;
 
   public SegmentContext(IndexSegment indexSegment) {
@@ -33,11 +35,12 @@ public class SegmentContext {
     return _indexSegment;
   }
 
+  @Nullable
   public MutableRoaringBitmap getQueryableDocIdsSnapshot() {
     return _queryableDocIdsSnapshot;
   }
 
-  public void setQueryableDocIdsSnapshot(MutableRoaringBitmap 
queryableDocIdsSnapshot) {
+  public void setQueryableDocIdsSnapshot(@Nullable MutableRoaringBitmap 
queryableDocIdsSnapshot) {
     _queryableDocIdsSnapshot = queryableDocIdsSnapshot;
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 8b85f861e5..e268a508af 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -352,6 +352,7 @@ public class CommonConstants {
       public static class QueryOptionKey {
         public static final String TIMEOUT_MS = "timeoutMs";
         public static final String SKIP_UPSERT = "skipUpsert";
+        public static final String SKIP_UPSERT_VIEW = "skipUpsertView";
         public static final String UPSERT_VIEW_FRESHNESS_MS = 
"upsertViewFreshnessMs";
         public static final String USE_STAR_TREE = "useStarTree";
         public static final String SCAN_STAR_TREE_NODES = "scanStarTreeNodes";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to