This is an automated email from the ASF dual-hosted git repository. atri pushed a commit to branch pluggable_store in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 48789ff3cbb9354c307f56c6ac13dc0df399743c Author: Atri Sharma <[email protected]> AuthorDate: Mon Sep 4 15:16:40 2023 +0530 All tests passing but double writes during compute --- .../upsert/BasePartitionUpsertMetadataManager.java | 18 +++ ...oncurrentMapPartitionUpsertMetadataManager.java | 16 ++- .../segment/local/upsert/IntelligentKVStore.java | 127 ++++++++++++++++++--- .../upsert/PartitionUpsertMetadataManager.java | 5 + ...rrentMapPartitionUpsertMetadataManagerTest.java | 48 ++++---- 5 files changed, 170 insertions(+), 44 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index 2b7b36e9ef..b11d04b73e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -692,6 +692,24 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps } } + @Override + public void transferKeysBetweenPrimaryAndSecondaryStorage() { + if (_metadataTTL <= 0) { + return; + } + if (_stopped) { + _logger.info("Skip transferring cold primary keys because metadata manager is already stopped"); + return; + } + + startOperation(); + try { + transferKeysBetweenPrimaryAndSecondaryStorage(); + } finally { + finishOperation(); + } + } + /** * Removes all primary keys that have comparison value smaller than (largestSeenComparisonValue - TTL). */ diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index 306b64db02..90bce017e8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -64,7 +64,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp @Override protected long getNumPrimaryKeys() { - return _primaryKeyToRecordLocationMap.size(); + return _primaryKeyToRecordLocationMap.sizeOfPrimaryStore(); } @Override @@ -236,6 +236,18 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp persistWatermark(_largestSeenComparisonValue); } + @Override + public void transferKeysBetweenPrimaryAndSecondaryStorage() { + //TODO: Declare own TTL + double threshold = _largestSeenComparisonValue - _metadataTTL; + _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> { + if (((Number) recordLocation.getComparisonValue()).doubleValue() < threshold) { + _primaryKeyToRecordLocationMap.transferKey(primaryKey); + } + }); + persistWatermark(_largestSeenComparisonValue); + } + @Override protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) { ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds()); @@ -279,7 +291,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp // Update metrics _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, - _primaryKeyToRecordLocationMap.size()); + _primaryKeyToRecordLocationMap.sizeOfPrimaryStore()); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java index 18e115dd17..9bbae1ecd5 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java @@ -18,8 +18,12 @@ public class IntelligentKVStore { _offheapStore = offheapStore; } - public int size() { - return _primaryKeyToRecordLocationMap.size() + _offheapStore.size(); + public int sizeOfPrimaryStore() { + return _primaryKeyToRecordLocationMap.size(); + } + + public int sizeOfSecondaryStore() { + return _offheapStore.size(); } public boolean isEmpty() { @@ -51,7 +55,7 @@ public class IntelligentKVStore { } // We do a double put - _primaryKeyToRecordLocationMap.putIfAbsent(key, (ConcurrentMapPartitionUpsertMetadataManager.RecordLocation) value); + _primaryKeyToRecordLocationMap.put(key, (ConcurrentMapPartitionUpsertMetadataManager.RecordLocation) value); _offheapStore.put(key, (ConcurrentMapPartitionUpsertMetadataManager.RecordLocation) value); return value; @@ -78,11 +82,18 @@ public class IntelligentKVStore { BiFunction<? super Object, ? super ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ? extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> remappingFunction) { // TODO: Concurrency checks + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation primaryStoreLocation = null; + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation offHeapStoreLocation = null; + if (_primaryKeyToRecordLocationMap.containsKey(key)) { - return computeIfPresentInternal(_primaryKeyToRecordLocationMap, key, remappingFunction); - } else { - return computeIfPresentInternal(_offheapStore, key, remappingFunction); + primaryStoreLocation = computeIfPresentInternal(_primaryKeyToRecordLocationMap, key, remappingFunction); + } + + if (_offheapStore.containsKey(key)) { + offHeapStoreLocation = computeIfPresentInternal(_offheapStore, key, remappingFunction); } + + return comparePrimaryAndOffheapValues(primaryStoreLocation, offHeapStoreLocation); } ConcurrentMapPartitionUpsertMetadataManager.RecordLocation computeIfPresentInternal( @@ -108,11 +119,14 @@ public class IntelligentKVStore { BiFunction<? super Object, ? super ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ? extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> remappingFunction) { - if (_primaryKeyToRecordLocationMap.containsKey(key)) { - return computeInternal(_primaryKeyToRecordLocationMap, key, remappingFunction); - } else { - return computeInternal(_offheapStore, key, remappingFunction); - } + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation primaryStoreLocation = null; + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation offHeapStoreLocation = null; + + primaryStoreLocation = computeInternal(_primaryKeyToRecordLocationMap, key, remappingFunction); + + offHeapStoreLocation = computeInternal(_offheapStore, key, remappingFunction); + + return comparePrimaryAndOffheapValues(primaryStoreLocation, offHeapStoreLocation); } ConcurrentMapPartitionUpsertMetadataManager.RecordLocation computeInternal( @@ -139,11 +153,22 @@ public class IntelligentKVStore { } } - void forEach(BiConsumer<? super Object, ? super ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> action) { - //TODO: Extend this to offheap store as well + void forEachOnPrimaryDataStore( + BiConsumer<? super Object, ? super ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> action) { forEachInternal(_primaryKeyToRecordLocationMap, action); } + void forEachOnOffheapDataStore( + BiConsumer<? super Object, ? super ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> action) { + forEachInternal(_offheapStore, action); + } + + void forEach(BiConsumer<? super Object, ? super ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> action) { + //TODO: Make these two operations concurrent? + forEachOnPrimaryDataStore(action); + forEachOnOffheapDataStore(action); + } + void forEachInternal(Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> map, BiConsumer<? super Object, ? super ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> action) { for (Map.Entry<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> entry : map.entrySet()) { @@ -151,14 +176,39 @@ public class IntelligentKVStore { } } - boolean remove(Object key, - Object value) { + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation comparePrimaryAndOffheapValues( + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation primaryStoreLocation, + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation offHeapStoreLocation) { + if (primaryStoreLocation == null) { + return offHeapStoreLocation; + } + + if (offHeapStoreLocation == null) { + return primaryStoreLocation; + } + + // If both are null, we would have handled it in the above conditions + + if (primaryStoreLocation.getComparisonValue().compareTo(offHeapStoreLocation.getComparisonValue()) >= 0) { + return primaryStoreLocation; + } + return offHeapStoreLocation; + } + + boolean remove(Object key, Object value) { + boolean isRemoved = false; if (_primaryKeyToRecordLocationMap.containsKey(key)) { - return removeInternal(_primaryKeyToRecordLocationMap, key, value); + removeInternal(_primaryKeyToRecordLocationMap, key, value); + isRemoved = true; } - return removeInternal(_offheapStore, key, value); + if (_offheapStore.containsKey(key)) { + removeInternal(_offheapStore, key, value); + isRemoved = true; + } + + return isRemoved; } boolean removeInternal(Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> map, Object key, @@ -170,4 +220,47 @@ public class IntelligentKVStore { return false; } } + + /** + * Here is how this method works: + * + * If primary store does not contain the key, then no action needs to be taken + * + * If primary store contains the key, then the first thing we do is to perform + * the write on the offheap store. Once the offheap store write is completed, + * we do another read on the primary store and check if the previously known + * value for this key from the primary store is the same. + * If yes, then delete the value from the primary store. + * If not, then do nothing. This will ensure that hot keys are still kept + * in primary store and the key gets naturally removed from the primary + * store when it cools down. + * + * There is a caveat here -- if majority keys are hot, then we are essentially + * keeping a duplicate for the key on disk. The invariant + * here is that the update rate of keys after the set "hot" TTL goes down + * drastically. + * + * Once a key successfully moves to off heap only, all updates are percolated + * directly to the off heap store. + * @param key + */ + void transferKey(Object key) { + + if (!(_primaryKeyToRecordLocationMap.containsKey(key))) { + return; + } + + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation currentRecordLocation = + _primaryKeyToRecordLocationMap.get(key); + _offheapStore.put(key, currentRecordLocation); + + // If we got here, assuming that the write went through + if (_primaryKeyToRecordLocationMap.get(key).getComparisonValue() + .compareTo(currentRecordLocation.getComparisonValue()) == 0) { + // Equal values, no updates. Remove from the primary store + // NOTE: There is a race condition here that the value can change + // between the read above to here. + _primaryKeyToRecordLocationMap.remove(key); + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java index 6ebd08c74c..d59a8b3dda 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java @@ -102,6 +102,11 @@ public interface PartitionUpsertMetadataManager extends Closeable { */ void removeExpiredPrimaryKeys(); + /** + * Move the once hot now cold keys from primary to secondary storage. + */ + void transferKeysBetweenPrimaryAndSecondaryStorage(); + /** * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted. */ diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index ec2b89065e..d25bc02728 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -24,11 +24,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.TreeMap; import javax.annotation.Nullable; -import jnr.ffi.annotations.In; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; @@ -141,7 +139,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { upsertMetadataManager.addSegment(segment1, validDocIds1, null, recordInfoList1.iterator()); trackedSegments.add(segment1); // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100} - assertEquals(recordLocationMap.size(), 3); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 3); checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction); @@ -171,7 +169,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { // segment1: 1 -> {4, 120} // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} - assertEquals(recordLocationMap.size(), 4); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4); checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); @@ -184,7 +182,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { upsertMetadataManager.addSegment(emptySegment); // segment1: 1 -> {4, 120} // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} - assertEquals(recordLocationMap.size(), 4); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4); checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); @@ -201,7 +199,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { // original segment1: 1 -> {4, 120} (not in the map) // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} // new segment1: 1 -> {4, 120} - assertEquals(recordLocationMap.size(), 4); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4); checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); @@ -214,7 +212,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { upsertMetadataManager.removeSegment(segment1); // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} // new segment1: 1 -> {4, 120} - assertEquals(recordLocationMap.size(), 4); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4); checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); @@ -227,7 +225,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { upsertMetadataManager.removeSegment(emptySegment); // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} // new segment1: 1 -> {4, 120} - assertEquals(recordLocationMap.size(), 4); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4); checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); @@ -239,7 +237,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { upsertMetadataManager.removeSegment(segment2); // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map) // new segment1: 1 -> {4, 120} - assertEquals(recordLocationMap.size(), 1); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 1); checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); @@ -251,7 +249,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { // Remove new segment1, should be no-op upsertMetadataManager.removeSegment(newSegment1); // new segment1: 1 -> {4, 120} - assertEquals(recordLocationMap.size(), 1); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 1); checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); assertEquals(trackedSegments, Collections.singleton(newSegment1)); @@ -306,7 +304,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1, recordInfoList1.iterator()); trackedSegments.add(segment1); // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100} - assertEquals(recordLocationMap.size(), 3); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 3); checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction); @@ -339,7 +337,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { // segment1: 1 -> {4, 120} // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} - assertEquals(recordLocationMap.size(), 4); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4); checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); @@ -354,7 +352,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { upsertMetadataManager.addSegment(emptySegment); // segment1: 1 -> {4, 120} // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} - assertEquals(recordLocationMap.size(), 4); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4); checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); @@ -376,7 +374,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { // original segment1: 1 -> {4, 120} (not in the map) // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} // new segment1: 1 -> {4, 120} - assertEquals(recordLocationMap.size(), 4); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4); checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); @@ -392,7 +390,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { upsertMetadataManager.removeSegment(segment1); // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} // new segment1: 1 -> {4, 120} - assertEquals(recordLocationMap.size(), 4); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4); checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); @@ -408,7 +406,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { upsertMetadataManager.removeSegment(emptySegment); // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} // new segment1: 1 -> {4, 120} - assertEquals(recordLocationMap.size(), 4); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4); checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); @@ -422,7 +420,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { upsertMetadataManager.removeSegment(segment2); // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map) // new segment1: 1 -> {4, 120} - assertEquals(recordLocationMap.size(), 1); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 1); checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); @@ -436,7 +434,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { // Remove new segment1, should be no-op upsertMetadataManager.removeSegment(newSegment1); // new segment1: 1 -> {4, 120} - assertEquals(recordLocationMap.size(), 1); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 1); checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); assertEquals(trackedSegments, Collections.singleton(newSegment1)); @@ -836,7 +834,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { // load segment1. upsertMetadataManager.addSegment(segment1, validDocIds1, null, getRecordInfoListForTTL(numRecords, primaryKeys, timestamps).iterator()); - assertEquals(recordLocationMap.size(), 5); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 5); checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE); @@ -845,7 +843,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { // Add record to update largestSeenTimestamp, largest seen timestamp: largerComparisonValue upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(10), 0, largerComparisonValue, false)); - assertEquals(recordLocationMap.size(), 5); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 5); checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE); @@ -854,7 +852,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { // records before (largest seen timestamp - TTL) are expired and removed from upsertMetadata. upsertMetadataManager.removeExpiredPrimaryKeys(); - assertEquals(recordLocationMap.size(), 4); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4); checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE); @@ -897,7 +895,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { // load segment1 with segmentEndTime: 80, largest seen timestamp: 80. the segment will be loaded. upsertMetadataManager.addSegment(segment1, validDocIds1, null, getRecordInfoListForTTL(numRecords, primaryKeys, timestamps).iterator()); - assertEquals(recordLocationMap.size(), 5); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 5); checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE); @@ -908,7 +906,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { // Add record to update largestSeenTimestamp, largest seen timestamp: 120 upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(0), 0, new Double(120), false)); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1, 2, 3}); - assertEquals(recordLocationMap.size(), 5); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 5); checkRecordLocationForTTL(recordLocationMap, 0, segment0, 0, 120, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE); @@ -927,7 +925,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { new Double(80), validDocIdsSnapshot2); upsertMetadataManager.addSegment(segment2); // out of ttl segment should not be added to recordLocationMap - assertEquals(recordLocationMap.size(), 5); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 5); } private void verifyAddSegmentForTTL(Comparable comparisonValue) { @@ -961,7 +959,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { // load segment1. upsertMetadataManager.addSegment(segment1); - assertEquals(recordLocationMap.size(), 1); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 1); checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
