This is an automated email from the ASF dual-hosted git repository.

atri pushed a commit to branch pluggable_store
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 48789ff3cbb9354c307f56c6ac13dc0df399743c
Author: Atri Sharma <[email protected]>
AuthorDate: Mon Sep 4 15:16:40 2023 +0530

    All tests passing but double writes during compute
---
 .../upsert/BasePartitionUpsertMetadataManager.java |  18 +++
 ...oncurrentMapPartitionUpsertMetadataManager.java |  16 ++-
 .../segment/local/upsert/IntelligentKVStore.java   | 127 ++++++++++++++++++---
 .../upsert/PartitionUpsertMetadataManager.java     |   5 +
 ...rrentMapPartitionUpsertMetadataManagerTest.java |  48 ++++----
 5 files changed, 170 insertions(+), 44 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 2b7b36e9ef..b11d04b73e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -692,6 +692,24 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
   }
 
+  @Override
+  public void transferKeysBetweenPrimaryAndSecondaryStorage() {
+    if (_metadataTTL <= 0) {
+      return;
+    }
+    if (_stopped) {
+      _logger.info("Skip transferring cold primary keys because metadata 
manager is already stopped");
+      return;
+    }
+
+    startOperation();
+    try {
+      transferKeysBetweenPrimaryAndSecondaryStorage();
+    } finally {
+      finishOperation();
+    }
+  }
+
   /**
    * Removes all primary keys that have comparison value smaller than 
(largestSeenComparisonValue - TTL).
    */
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 306b64db02..90bce017e8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -64,7 +64,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
 
   @Override
   protected long getNumPrimaryKeys() {
-    return _primaryKeyToRecordLocationMap.size();
+    return _primaryKeyToRecordLocationMap.sizeOfPrimaryStore();
   }
 
   @Override
@@ -236,6 +236,18 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
     persistWatermark(_largestSeenComparisonValue);
   }
 
+  @Override
+  public void transferKeysBetweenPrimaryAndSecondaryStorage() {
+    //TODO: Declare own TTL
+    double threshold = _largestSeenComparisonValue - _metadataTTL;
+    _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
+      if (((Number) recordLocation.getComparisonValue()).doubleValue() < 
threshold) {
+        _primaryKeyToRecordLocationMap.transferKey(primaryKey);
+      }
+    });
+    persistWatermark(_largestSeenComparisonValue);
+  }
+
   @Override
   protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
     ThreadSafeMutableRoaringBitmap validDocIds = 
Objects.requireNonNull(segment.getValidDocIds());
@@ -279,7 +291,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
 
     // Update metrics
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
-        _primaryKeyToRecordLocationMap.size());
+        _primaryKeyToRecordLocationMap.sizeOfPrimaryStore());
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java
index 18e115dd17..9bbae1ecd5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java
@@ -18,8 +18,12 @@ public class IntelligentKVStore {
     _offheapStore = offheapStore;
   }
 
-  public int size() {
-    return _primaryKeyToRecordLocationMap.size() + _offheapStore.size();
+  public int sizeOfPrimaryStore() {
+    return _primaryKeyToRecordLocationMap.size();
+  }
+
+  public int sizeOfSecondaryStore() {
+    return _offheapStore.size();
   }
 
   public boolean isEmpty() {
@@ -51,7 +55,7 @@ public class IntelligentKVStore {
     }
 
     // We do a double put
-    _primaryKeyToRecordLocationMap.putIfAbsent(key, 
(ConcurrentMapPartitionUpsertMetadataManager.RecordLocation) value);
+    _primaryKeyToRecordLocationMap.put(key, 
(ConcurrentMapPartitionUpsertMetadataManager.RecordLocation) value);
     _offheapStore.put(key, 
(ConcurrentMapPartitionUpsertMetadataManager.RecordLocation) value);
 
     return value;
@@ -78,11 +82,18 @@ public class IntelligentKVStore {
       BiFunction<? super Object, ? super 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ?
           extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> 
remappingFunction) {
     // TODO: Concurrency checks
+    ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
primaryStoreLocation = null;
+    ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
offHeapStoreLocation = null;
+
     if (_primaryKeyToRecordLocationMap.containsKey(key)) {
-      return computeIfPresentInternal(_primaryKeyToRecordLocationMap, key, 
remappingFunction);
-    } else {
-      return computeIfPresentInternal(_offheapStore, key, remappingFunction);
+      primaryStoreLocation = 
computeIfPresentInternal(_primaryKeyToRecordLocationMap, key, 
remappingFunction);
+    }
+
+    if (_offheapStore.containsKey(key)) {
+      offHeapStoreLocation = computeIfPresentInternal(_offheapStore, key, 
remappingFunction);
     }
+
+    return comparePrimaryAndOffheapValues(primaryStoreLocation, 
offHeapStoreLocation);
   }
 
   ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
computeIfPresentInternal(
@@ -108,11 +119,14 @@ public class IntelligentKVStore {
       BiFunction<? super Object, ? super 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ?
           extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> 
remappingFunction) {
 
-    if (_primaryKeyToRecordLocationMap.containsKey(key)) {
-      return computeInternal(_primaryKeyToRecordLocationMap, key, 
remappingFunction);
-    } else {
-      return computeInternal(_offheapStore, key, remappingFunction);
-    }
+    ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
primaryStoreLocation = null;
+    ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
offHeapStoreLocation = null;
+
+      primaryStoreLocation = computeInternal(_primaryKeyToRecordLocationMap, 
key, remappingFunction);
+
+      offHeapStoreLocation = computeInternal(_offheapStore, key, 
remappingFunction);
+
+    return comparePrimaryAndOffheapValues(primaryStoreLocation, 
offHeapStoreLocation);
   }
 
   ConcurrentMapPartitionUpsertMetadataManager.RecordLocation computeInternal(
@@ -139,11 +153,22 @@ public class IntelligentKVStore {
     }
   }
 
-  void forEach(BiConsumer<? super Object, ? super 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> action) {
-    //TODO: Extend this to offheap store as well
+  void forEachOnPrimaryDataStore(
+      BiConsumer<? super Object, ? super 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> action) {
     forEachInternal(_primaryKeyToRecordLocationMap, action);
   }
 
+  void forEachOnOffheapDataStore(
+      BiConsumer<? super Object, ? super 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> action) {
+    forEachInternal(_offheapStore, action);
+  }
+
+  void forEach(BiConsumer<? super Object, ? super 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> action) {
+    //TODO: Make these two operations concurrent?
+    forEachOnPrimaryDataStore(action);
+    forEachOnOffheapDataStore(action);
+  }
+
   void forEachInternal(Map<Object, 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> map,
       BiConsumer<? super Object, ? super 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> action) {
     for (Map.Entry<Object, 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> entry : 
map.entrySet()) {
@@ -151,14 +176,39 @@ public class IntelligentKVStore {
     }
   }
 
-  boolean remove(Object key,
-      Object value) {
+  ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
comparePrimaryAndOffheapValues(
+      ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
primaryStoreLocation,
+      ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
offHeapStoreLocation) {
+    if (primaryStoreLocation == null) {
+      return offHeapStoreLocation;
+    }
+
+    if (offHeapStoreLocation == null) {
+      return primaryStoreLocation;
+    }
+
+    // If both are null, we would have handled it in the above conditions
+
+    if 
(primaryStoreLocation.getComparisonValue().compareTo(offHeapStoreLocation.getComparisonValue())
 >= 0) {
+      return primaryStoreLocation;
+    }
 
+    return offHeapStoreLocation;
+  }
+
+  boolean remove(Object key, Object value) {
+    boolean isRemoved = false;
     if (_primaryKeyToRecordLocationMap.containsKey(key)) {
-      return removeInternal(_primaryKeyToRecordLocationMap, key, value);
+      removeInternal(_primaryKeyToRecordLocationMap, key, value);
+      isRemoved = true;
     }
 
-    return removeInternal(_offheapStore, key, value);
+    if (_offheapStore.containsKey(key)) {
+      removeInternal(_offheapStore, key, value);
+      isRemoved = true;
+    }
+
+    return isRemoved;
   }
 
   boolean removeInternal(Map<Object, 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> map, Object key,
@@ -170,4 +220,47 @@ public class IntelligentKVStore {
       return false;
     }
   }
+
+  /**
+   * Here is how this method works:
+   *
+   * If primary store does not contain the key, then no action needs to be 
taken
+   *
+   * If primary store contains the key, then the first thing we do is to 
perform
+   * the write on the offheap store. Once the offheap store write is completed,
+   * we do another read on the primary store and check if the previously known
+   * value for this key from the primary store is the same.
+   * If yes, then delete the value from the primary store.
+   * If not, then do nothing. This will ensure that hot keys are still kept
+   * in primary store and the key gets naturally removed from the primary
+   * store when it cools down.
+   *
+   * There is a caveat here -- if majority keys are hot, then we are 
essentially
+   * keeping a duplicate for the key on disk. The invariant
+   * here is that the update rate of keys after the set "hot" TTL goes down
+   * drastically.
+   *
+   * Once a key successfully moves to off heap only, all updates are percolated
+   * directly to the off heap store.
+   * @param key
+   */
+  void transferKey(Object key) {
+
+    if (!(_primaryKeyToRecordLocationMap.containsKey(key))) {
+      return;
+    }
+
+    ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
currentRecordLocation =
+        _primaryKeyToRecordLocationMap.get(key);
+    _offheapStore.put(key, currentRecordLocation);
+
+    // If we got here, assuming that the write went through
+    if (_primaryKeyToRecordLocationMap.get(key).getComparisonValue()
+        .compareTo(currentRecordLocation.getComparisonValue()) == 0) {
+      // Equal values, no updates. Remove from the primary store
+      // NOTE: There is a race condition here that the value can change
+      // between the read above to here.
+      _primaryKeyToRecordLocationMap.remove(key);
+    }
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 6ebd08c74c..d59a8b3dda 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -102,6 +102,11 @@ public interface PartitionUpsertMetadataManager extends 
Closeable {
    */
   void removeExpiredPrimaryKeys();
 
+  /**
+   * Move the once hot now cold keys from primary to secondary storage.
+   */
+  void transferKeysBetweenPrimaryAndSecondaryStorage();
+
   /**
    * Stops the metadata manager. After invoking this method, no access to the 
metadata will be accepted.
    */
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index ec2b89065e..d25bc02728 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -24,11 +24,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
-import jnr.ffi.annotations.In;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -141,7 +139,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
     trackedSegments.add(segment1);
     // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
-    assertEquals(recordLocationMap.size(), 3);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 3);
     checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
@@ -171,7 +169,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     // segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
-    assertEquals(recordLocationMap.size(), 4);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4);
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -184,7 +182,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.addSegment(emptySegment);
     // segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
-    assertEquals(recordLocationMap.size(), 4);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4);
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -201,7 +199,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     // original segment1: 1 -> {4, 120} (not in the map)
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
-    assertEquals(recordLocationMap.size(), 4);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4);
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -214,7 +212,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.removeSegment(segment1);
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
-    assertEquals(recordLocationMap.size(), 4);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4);
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -227,7 +225,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.removeSegment(emptySegment);
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
-    assertEquals(recordLocationMap.size(), 4);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4);
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -239,7 +237,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.removeSegment(segment2);
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map)
     // new segment1: 1 -> {4, 120}
-    assertEquals(recordLocationMap.size(), 1);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 1);
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
@@ -251,7 +249,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     // Remove new segment1, should be no-op
     upsertMetadataManager.removeSegment(newSegment1);
     // new segment1: 1 -> {4, 120}
-    assertEquals(recordLocationMap.size(), 1);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 1);
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(trackedSegments, Collections.singleton(newSegment1));
@@ -306,7 +304,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1, 
recordInfoList1.iterator());
     trackedSegments.add(segment1);
     // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
-    assertEquals(recordLocationMap.size(), 3);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 3);
     checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
@@ -339,7 +337,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     // segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
-    assertEquals(recordLocationMap.size(), 4);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4);
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -354,7 +352,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.addSegment(emptySegment);
     // segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
-    assertEquals(recordLocationMap.size(), 4);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4);
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -376,7 +374,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     // original segment1: 1 -> {4, 120} (not in the map)
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
-    assertEquals(recordLocationMap.size(), 4);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4);
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -392,7 +390,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.removeSegment(segment1);
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
-    assertEquals(recordLocationMap.size(), 4);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4);
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -408,7 +406,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.removeSegment(emptySegment);
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
-    assertEquals(recordLocationMap.size(), 4);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4);
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -422,7 +420,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.removeSegment(segment2);
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map)
     // new segment1: 1 -> {4, 120}
-    assertEquals(recordLocationMap.size(), 1);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 1);
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
@@ -436,7 +434,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     // Remove new segment1, should be no-op
     upsertMetadataManager.removeSegment(newSegment1);
     // new segment1: 1 -> {4, 120}
-    assertEquals(recordLocationMap.size(), 1);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 1);
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(trackedSegments, Collections.singleton(newSegment1));
@@ -836,7 +834,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     // load segment1.
     upsertMetadataManager.addSegment(segment1, validDocIds1, null,
         getRecordInfoListForTTL(numRecords, primaryKeys, 
timestamps).iterator());
-    assertEquals(recordLocationMap.size(), 5);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 5);
     checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, 
HashFunction.NONE);
@@ -845,7 +843,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     // Add record to update largestSeenTimestamp, largest seen timestamp: 
largerComparisonValue
     upsertMetadataManager.addRecord(segment0, new 
RecordInfo(makePrimaryKey(10), 0, largerComparisonValue, false));
-    assertEquals(recordLocationMap.size(), 5);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 5);
     checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, 
HashFunction.NONE);
@@ -854,7 +852,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     // records before (largest seen timestamp - TTL) are expired and removed 
from upsertMetadata.
     upsertMetadataManager.removeExpiredPrimaryKeys();
-    assertEquals(recordLocationMap.size(), 4);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 4);
     checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, 
HashFunction.NONE);
@@ -897,7 +895,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     // load segment1 with segmentEndTime: 80, largest seen timestamp: 80. the 
segment will be loaded.
     upsertMetadataManager.addSegment(segment1, validDocIds1, null,
         getRecordInfoListForTTL(numRecords, primaryKeys, 
timestamps).iterator());
-    assertEquals(recordLocationMap.size(), 5);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 5);
     checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, 
HashFunction.NONE);
@@ -908,7 +906,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     // Add record to update largestSeenTimestamp, largest seen timestamp: 120
     upsertMetadataManager.addRecord(segment0, new 
RecordInfo(makePrimaryKey(0), 0, new Double(120), false));
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{1, 2, 3});
-    assertEquals(recordLocationMap.size(), 5);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 5);
     checkRecordLocationForTTL(recordLocationMap, 0, segment0, 0, 120, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, 
HashFunction.NONE);
@@ -927,7 +925,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
             new Double(80), validDocIdsSnapshot2);
     upsertMetadataManager.addSegment(segment2);
     // out of ttl segment should not be added to recordLocationMap
-    assertEquals(recordLocationMap.size(), 5);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 5);
   }
 
   private void verifyAddSegmentForTTL(Comparable comparisonValue) {
@@ -961,7 +959,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     // load segment1.
     upsertMetadataManager.addSegment(segment1);
-    assertEquals(recordLocationMap.size(), 1);
+    assertEquals(recordLocationMap.sizeOfPrimaryStore(), 1);
     checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, 
HashFunction.NONE);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to