This is an automated email from the ASF dual-hosted git repository. atri pushed a commit to branch pluggable_store in repository https://gitbox.apache.org/repos/asf/pinot.git
commit fc626664846e78f3df735effde777e56e6421993 Author: Atri Sharma <[email protected]> AuthorDate: Mon Sep 4 23:14:29 2023 +0530 More stuff and tests --- ...oncurrentMapPartitionUpsertMetadataManager.java | 10 +++ .../segment/local/upsert/IntelligentKVStore.java | 36 ++++----- ...rrentMapPartitionUpsertMetadataManagerTest.java | 93 ++++++++++++++++++++-- 3 files changed, 110 insertions(+), 29 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index 90bce017e8..1d7cdbb395 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -322,11 +322,13 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp private final IndexSegment _segment; private final int _docId; private final Comparable _comparisonValue; + private boolean _isOffHeap; public RecordLocation(IndexSegment indexSegment, int docId, Comparable comparisonValue) { _segment = indexSegment; _docId = docId; _comparisonValue = comparisonValue; + _isOffHeap = false; } public IndexSegment getSegment() { @@ -340,5 +342,13 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp public Comparable getComparisonValue() { return _comparisonValue; } + + public void setIsOffHeap(boolean isOffHeap) { + _isOffHeap = isOffHeap; + } + + public boolean getIsOffHeap() { + return _isOffHeap; + } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java index 9bbae1ecd5..0f2434b7a1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java @@ -81,19 +81,7 @@ public class IntelligentKVStore { ConcurrentMapPartitionUpsertMetadataManager.RecordLocation computeIfPresent(Object key, BiFunction<? super Object, ? super ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ? extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> remappingFunction) { - // TODO: Concurrency checks - ConcurrentMapPartitionUpsertMetadataManager.RecordLocation primaryStoreLocation = null; - ConcurrentMapPartitionUpsertMetadataManager.RecordLocation offHeapStoreLocation = null; - - if (_primaryKeyToRecordLocationMap.containsKey(key)) { - primaryStoreLocation = computeIfPresentInternal(_primaryKeyToRecordLocationMap, key, remappingFunction); - } - - if (_offheapStore.containsKey(key)) { - offHeapStoreLocation = computeIfPresentInternal(_offheapStore, key, remappingFunction); - } - - return comparePrimaryAndOffheapValues(primaryStoreLocation, offHeapStoreLocation); + return computeIfPresentInternal(_primaryKeyToRecordLocationMap, key, remappingFunction); } ConcurrentMapPartitionUpsertMetadataManager.RecordLocation computeIfPresentInternal( @@ -115,18 +103,23 @@ public class IntelligentKVStore { return null; } + /** + * Here is how this method works: + * + * All updates are sent to the primary store. Offheap store is updated only periodically when + * keys are transferred. + * + * The idea is that the majority of offheap keys should not be updated regularly - hence they + * should be restricted to off heap only. If an off heap key is updated, it will become "hot" + * until it expires again (i.e. breaches the update TTL). For that duration, the key will + * have a duplicate in the offheap store -- but it should be fine as long as the number of + * updated keys that are cold are significantly lower than the total count. + */ ConcurrentMapPartitionUpsertMetadataManager.RecordLocation compute(Object key, BiFunction<? super Object, ? super ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ? extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> remappingFunction) { - ConcurrentMapPartitionUpsertMetadataManager.RecordLocation primaryStoreLocation = null; - ConcurrentMapPartitionUpsertMetadataManager.RecordLocation offHeapStoreLocation = null; - - primaryStoreLocation = computeInternal(_primaryKeyToRecordLocationMap, key, remappingFunction); - - offHeapStoreLocation = computeInternal(_offheapStore, key, remappingFunction); - - return comparePrimaryAndOffheapValues(primaryStoreLocation, offHeapStoreLocation); + return computeInternal(_primaryKeyToRecordLocationMap, key, remappingFunction); } ConcurrentMapPartitionUpsertMetadataManager.RecordLocation computeInternal( @@ -252,6 +245,7 @@ public class IntelligentKVStore { ConcurrentMapPartitionUpsertMetadataManager.RecordLocation currentRecordLocation = _primaryKeyToRecordLocationMap.get(key); + currentRecordLocation.setIsOffHeap(true); _offheapStore.put(key, currentRecordLocation); // If we got here, assuming that the write went through diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index d25bc02728..76f7f7339e 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -107,6 +107,20 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { verifyAddOutOfTTLSegment(); } + @Test + public void testUpsertMetadataTransferColdKeys() { + verifyTransferColdKeys(new Integer(80), new Integer(120)); + verifyTransferColdKeys(new Float(80), new Float(120)); + verifyTransferColdKeys(new Double(80), new Double(120)); + verifyTransferColdKeys(new Long(80), new Long(120)); + verifyPersistAndLoadWatermark(); + verifyAddSegmentForTTL(new Integer(80)); + verifyAddSegmentForTTL(new Float(80)); + verifyAddSegmentForTTL(new Double(80)); + verifyAddSegmentForTTL(new Long(80)); + verifyAddOutOfTTLSegment(); + } + private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot) throws IOException { String comparisonColumn = "timeCol"; @@ -535,8 +549,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { return new PrimaryKey(new Object[]{value}); } - private static void checkRecordLocation(IntelligentKVStore recordLocationMap, int keyValue, - IndexSegment segment, int docId, int comparisonValue, HashFunction hashFunction) { + private static void checkRecordLocation(IntelligentKVStore recordLocationMap, int keyValue, IndexSegment segment, + int docId, int comparisonValue, HashFunction hashFunction) { RecordLocation recordLocation = recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue), hashFunction)); assertNotNull(recordLocation); @@ -808,8 +822,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, tableDir, mock(ServerMetrics.class)); - IntelligentKVStore recordLocationMap = - upsertMetadataManager._primaryKeyToRecordLocationMap; + IntelligentKVStore recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add record to update largestSeenTimestamp, largest seen timestamp: earlierComparisonValue ThreadSafeMutableRoaringBitmap validDocIds0 = new ThreadSafeMutableRoaringBitmap(); @@ -862,6 +875,63 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2, 3}); } + private void verifyTransferColdKeys(Comparable earlierComparisonValue, Comparable largerComparisonValue) { + File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME); + + ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), + Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, tableDir, + mock(ServerMetrics.class)); + IntelligentKVStore recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + + // Add record to update largestSeenTimestamp, largest seen timestamp: earlierComparisonValue + ThreadSafeMutableRoaringBitmap validDocIds0 = new ThreadSafeMutableRoaringBitmap(); + MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null); + upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(10), 1, earlierComparisonValue, false)); + checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE); + + // Add a segment with segmentEndTime = earlierComparisonValue, so it will not be skipped + int numRecords = 4; + int[] primaryKeys = new int[]{0, 1, 2, 3}; + Number[] timestamps = new Number[]{100, 100, 120, 80}; + ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap(); + List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys); + ImmutableSegmentImpl segment1 = + mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"), + earlierComparisonValue, null); + + int[] docIds1 = new int[]{0, 1, 2, 3}; + MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap(); + validDocIdsSnapshot1.add(docIds1); + + // load segment1. + upsertMetadataManager.addSegment(segment1, validDocIds1, null, + getRecordInfoListForTTL(numRecords, primaryKeys, timestamps).iterator()); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 5); + checkRecordLocationForIntelligentKVStore(recordLocationMap, 0, segment1, 0, 100, HashFunction.NONE, false); + checkRecordLocationForIntelligentKVStore(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE, false); + checkRecordLocationForIntelligentKVStore(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE, false); + checkRecordLocationForIntelligentKVStore(recordLocationMap, 3, segment1, 3, 80, HashFunction.NONE, false); + checkRecordLocationForIntelligentKVStore(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE, false); + + // Add record to update largestSeenTimestamp, largest seen timestamp: largerComparisonValue + upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(10), 0, largerComparisonValue, false)); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 5); + + // records before (largest seen timestamp - TTL) are expired and removed from upsertMetadata. + upsertMetadataManager.transferKeysBetweenPrimaryAndSecondaryStorage(); + assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4); + assertEquals(recordLocationMap.sizeOfSecondaryStore(), 1); + checkRecordLocationForIntelligentKVStore(recordLocationMap, 0, segment1, 0, 100, HashFunction.NONE, false); + checkRecordLocationForIntelligentKVStore(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE, false); + checkRecordLocationForIntelligentKVStore(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE, false); + checkRecordLocationForIntelligentKVStore(recordLocationMap, 10, segment0, 0, 120, HashFunction.NONE, false); + checkRecordLocationForIntelligentKVStore(recordLocationMap, 3, segment1, 3, 80, HashFunction.NONE, true); + + // ValidDocIds for out-of-ttl records should not be removed. + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2, 3}); + } + private void verifyAddOutOfTTLSegment() { File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME); @@ -869,8 +939,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir, mock(ServerMetrics.class)); - IntelligentKVStore recordLocationMap = - upsertMetadataManager._primaryKeyToRecordLocationMap; + IntelligentKVStore recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add record to update largestSeenTimestamp, largest seen timestamp: 80 ThreadSafeMutableRoaringBitmap validDocIds0 = new ThreadSafeMutableRoaringBitmap(); @@ -935,8 +1004,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir, mock(ServerMetrics.class)); - IntelligentKVStore recordLocationMap = - upsertMetadataManager._primaryKeyToRecordLocationMap; + IntelligentKVStore recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add record to update largestSeenTimestamp, largest seen timestamp: comparisonValue ThreadSafeMutableRoaringBitmap validDocIds0 = new ThreadSafeMutableRoaringBitmap(); @@ -973,6 +1041,15 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { return recordInfoList; } + private static void checkRecordLocationForIntelligentKVStore(IntelligentKVStore recordLocationMap, int keyValue, + IndexSegment segment, int docId, Number comparisonValue, HashFunction hashFunction, boolean isOffheap) { + checkRecordLocationForTTL(recordLocationMap, keyValue, segment, docId, comparisonValue, hashFunction); + + RecordLocation recordLocation = + recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue), hashFunction)); + assertEquals(recordLocation.getIsOffHeap(), isOffheap); + } + // Add the following utils function since the Comparison column is a long value for TTL enabled upsert table. private static void checkRecordLocationForTTL(IntelligentKVStore recordLocationMap, int keyValue, IndexSegment segment, int docId, Number comparisonValue, HashFunction hashFunction) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
