This is an automated email from the ASF dual-hosted git repository.

atri pushed a commit to branch pluggable_store
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit fc626664846e78f3df735effde777e56e6421993
Author: Atri Sharma <[email protected]>
AuthorDate: Mon Sep 4 23:14:29 2023 +0530

    More stuff and tests
---
 ...oncurrentMapPartitionUpsertMetadataManager.java | 10 +++
 .../segment/local/upsert/IntelligentKVStore.java   | 36 ++++-----
 ...rrentMapPartitionUpsertMetadataManagerTest.java | 93 ++++++++++++++++++++--
 3 files changed, 110 insertions(+), 29 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 90bce017e8..1d7cdbb395 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -322,11 +322,13 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
     private final IndexSegment _segment;
     private final int _docId;
     private final Comparable _comparisonValue;
+    private boolean _isOffHeap;
 
     public RecordLocation(IndexSegment indexSegment, int docId, Comparable 
comparisonValue) {
       _segment = indexSegment;
       _docId = docId;
       _comparisonValue = comparisonValue;
+      _isOffHeap = false;
     }
 
     public IndexSegment getSegment() {
@@ -340,5 +342,13 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
     public Comparable getComparisonValue() {
       return _comparisonValue;
     }
+
+    public void setIsOffHeap(boolean isOffHeap) {
+      _isOffHeap = isOffHeap;
+    }
+
+    public boolean getIsOffHeap() {
+      return _isOffHeap;
+    }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java
index 9bbae1ecd5..0f2434b7a1 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java
@@ -81,19 +81,7 @@ public class IntelligentKVStore {
   ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
computeIfPresent(Object key,
       BiFunction<? super Object, ? super 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ?
           extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> 
remappingFunction) {
-    // TODO: Concurrency checks
-    ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
primaryStoreLocation = null;
-    ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
offHeapStoreLocation = null;
-
-    if (_primaryKeyToRecordLocationMap.containsKey(key)) {
-      primaryStoreLocation = 
computeIfPresentInternal(_primaryKeyToRecordLocationMap, key, 
remappingFunction);
-    }
-
-    if (_offheapStore.containsKey(key)) {
-      offHeapStoreLocation = computeIfPresentInternal(_offheapStore, key, 
remappingFunction);
-    }
-
-    return comparePrimaryAndOffheapValues(primaryStoreLocation, 
offHeapStoreLocation);
+    return computeIfPresentInternal(_primaryKeyToRecordLocationMap, key, 
remappingFunction);
   }
 
   ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
computeIfPresentInternal(
@@ -115,18 +103,23 @@ public class IntelligentKVStore {
     return null;
   }
 
+  /**
+   * Here is how this method works:
+   *
+   * All updates are sent to the primary store. Offheap store is updated only 
periodically when
+   * keys are transferred.
+   *
+   * The idea is that the majority of offheap keys should not be updated 
regularly - hence they
+   * should be restricted to off heap only. If an off heap key is updated, it 
will become "hot"
+   * until it expires again (i.e. breaches the update TTL). For that duration, 
the key will
+   * have a duplicate in the offheap store -- but it should be fine as long as 
the number of
+   * updated keys that are cold are significantly lower than the total count.
+   */
   ConcurrentMapPartitionUpsertMetadataManager.RecordLocation compute(Object 
key,
       BiFunction<? super Object, ? super 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ?
           extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> 
remappingFunction) {
 
-    ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
primaryStoreLocation = null;
-    ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
offHeapStoreLocation = null;
-
-      primaryStoreLocation = computeInternal(_primaryKeyToRecordLocationMap, 
key, remappingFunction);
-
-      offHeapStoreLocation = computeInternal(_offheapStore, key, 
remappingFunction);
-
-    return comparePrimaryAndOffheapValues(primaryStoreLocation, 
offHeapStoreLocation);
+    return computeInternal(_primaryKeyToRecordLocationMap, key, 
remappingFunction);
   }
 
   ConcurrentMapPartitionUpsertMetadataManager.RecordLocation computeInternal(
@@ -252,6 +245,7 @@ public class IntelligentKVStore {
 
     ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
currentRecordLocation =
         _primaryKeyToRecordLocationMap.get(key);
+    currentRecordLocation.setIsOffHeap(true);
     _offheapStore.put(key, currentRecordLocation);
 
     // If we got here, assuming that the write went through
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index d25bc02728..76f7f7339e 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -107,6 +107,20 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     verifyAddOutOfTTLSegment();
   }
 
+  @Test
+  public void testUpsertMetadataTransferColdKeys() {
+    verifyTransferColdKeys(new Integer(80), new Integer(120));
+    verifyTransferColdKeys(new Float(80), new Float(120));
+    verifyTransferColdKeys(new Double(80), new Double(120));
+    verifyTransferColdKeys(new Long(80), new Long(120));
+    verifyPersistAndLoadWatermark();
+    verifyAddSegmentForTTL(new Integer(80));
+    verifyAddSegmentForTTL(new Float(80));
+    verifyAddSegmentForTTL(new Double(80));
+    verifyAddSegmentForTTL(new Long(80));
+    verifyAddOutOfTTLSegment();
+  }
+
   private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, 
boolean enableSnapshot)
       throws IOException {
     String comparisonColumn = "timeCol";
@@ -535,8 +549,8 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     return new PrimaryKey(new Object[]{value});
   }
 
-  private static void checkRecordLocation(IntelligentKVStore 
recordLocationMap, int keyValue,
-      IndexSegment segment, int docId, int comparisonValue, HashFunction 
hashFunction) {
+  private static void checkRecordLocation(IntelligentKVStore 
recordLocationMap, int keyValue, IndexSegment segment,
+      int docId, int comparisonValue, HashFunction hashFunction) {
     RecordLocation recordLocation =
         
recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue), 
hashFunction));
     assertNotNull(recordLocation);
@@ -808,8 +822,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
             Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, false, 30, tableDir,
             mock(ServerMetrics.class));
-    IntelligentKVStore recordLocationMap =
-        upsertMetadataManager._primaryKeyToRecordLocationMap;
+    IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add record to update largestSeenTimestamp, largest seen timestamp: 
earlierComparisonValue
     ThreadSafeMutableRoaringBitmap validDocIds0 = new 
ThreadSafeMutableRoaringBitmap();
@@ -862,6 +875,63 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2, 3});
   }
 
+  private void verifyTransferColdKeys(Comparable earlierComparisonValue, 
Comparable largerComparisonValue) {
+    File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, false, 30, tableDir,
+            mock(ServerMetrics.class));
+    IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    // Add record to update largestSeenTimestamp, largest seen timestamp: 
earlierComparisonValue
+    ThreadSafeMutableRoaringBitmap validDocIds0 = new 
ThreadSafeMutableRoaringBitmap();
+    MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null);
+    upsertMetadataManager.addRecord(segment0, new 
RecordInfo(makePrimaryKey(10), 1, earlierComparisonValue, false));
+    checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, 
HashFunction.NONE);
+
+    // Add a segment with segmentEndTime = earlierComparisonValue, so it will 
not be skipped
+    int numRecords = 4;
+    int[] primaryKeys = new int[]{0, 1, 2, 3};
+    Number[] timestamps = new Number[]{100, 100, 120, 80};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, 
Collections.singletonList("timeCol"),
+            earlierComparisonValue, null);
+
+    int[] docIds1 = new int[]{0, 1, 2, 3};
+    MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+    validDocIdsSnapshot1.add(docIds1);
+
+    // load segment1.
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+        getRecordInfoListForTTL(numRecords, primaryKeys, 
timestamps).iterator());
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 5);
+    checkRecordLocationForIntelligentKVStore(recordLocationMap, 0, segment1, 
0, 100, HashFunction.NONE, false);
+    checkRecordLocationForIntelligentKVStore(recordLocationMap, 1, segment1, 
1, 100, HashFunction.NONE, false);
+    checkRecordLocationForIntelligentKVStore(recordLocationMap, 2, segment1, 
2, 120, HashFunction.NONE, false);
+    checkRecordLocationForIntelligentKVStore(recordLocationMap, 3, segment1, 
3, 80, HashFunction.NONE, false);
+    checkRecordLocationForIntelligentKVStore(recordLocationMap, 10, segment0, 
1, 80, HashFunction.NONE, false);
+
+    // Add record to update largestSeenTimestamp, largest seen timestamp: 
largerComparisonValue
+    upsertMetadataManager.addRecord(segment0, new 
RecordInfo(makePrimaryKey(10), 0, largerComparisonValue, false));
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 5);
+
+    // records before (largest seen timestamp - TTL) are expired and removed 
from upsertMetadata.
+    upsertMetadataManager.transferKeysBetweenPrimaryAndSecondaryStorage();
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4);
+    assertEquals(recordLocationMap.sizeOfSecondaryStore(), 1);
+    checkRecordLocationForIntelligentKVStore(recordLocationMap, 0, segment1, 
0, 100, HashFunction.NONE, false);
+    checkRecordLocationForIntelligentKVStore(recordLocationMap, 1, segment1, 
1, 100, HashFunction.NONE, false);
+    checkRecordLocationForIntelligentKVStore(recordLocationMap, 2, segment1, 
2, 120, HashFunction.NONE, false);
+    checkRecordLocationForIntelligentKVStore(recordLocationMap, 10, segment0, 
0, 120, HashFunction.NONE, false);
+    checkRecordLocationForIntelligentKVStore(recordLocationMap, 3, segment1, 
3, 80, HashFunction.NONE, true);
+
+    // ValidDocIds for out-of-ttl records should not be removed.
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2, 3});
+  }
+
   private void verifyAddOutOfTTLSegment() {
     File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
 
@@ -869,8 +939,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
             Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, true, 30, tableDir,
             mock(ServerMetrics.class));
-    IntelligentKVStore recordLocationMap =
-        upsertMetadataManager._primaryKeyToRecordLocationMap;
+    IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add record to update largestSeenTimestamp, largest seen timestamp: 80
     ThreadSafeMutableRoaringBitmap validDocIds0 = new 
ThreadSafeMutableRoaringBitmap();
@@ -935,8 +1004,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
             Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, true, 30, tableDir,
             mock(ServerMetrics.class));
-    IntelligentKVStore recordLocationMap =
-        upsertMetadataManager._primaryKeyToRecordLocationMap;
+    IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add record to update largestSeenTimestamp, largest seen timestamp: 
comparisonValue
     ThreadSafeMutableRoaringBitmap validDocIds0 = new 
ThreadSafeMutableRoaringBitmap();
@@ -973,6 +1041,15 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     return recordInfoList;
   }
 
+  private static void 
checkRecordLocationForIntelligentKVStore(IntelligentKVStore recordLocationMap, 
int keyValue,
+      IndexSegment segment, int docId, Number comparisonValue, HashFunction 
hashFunction, boolean isOffheap) {
+    checkRecordLocationForTTL(recordLocationMap, keyValue, segment, docId, 
comparisonValue, hashFunction);
+
+    RecordLocation recordLocation =
+        
recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue), 
hashFunction));
+    assertEquals(recordLocation.getIsOffHeap(), isOffheap);
+  }
+
   // Add the following utils function since the Comparison column is a long 
value for TTL enabled upsert table.
   private static void checkRecordLocationForTTL(IntelligentKVStore 
recordLocationMap, int keyValue,
       IndexSegment segment, int docId, Number comparisonValue, HashFunction 
hashFunction) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to