This is an automated email from the ASF dual-hosted git repository.

atri pushed a commit to branch pluggable_store
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 267dfbad0dbbab1b30274b571a81f24f3d4f35a5
Author: Atri Sharma <[email protected]>
AuthorDate: Mon Sep 4 10:13:00 2023 +0530

    Intermediate First Working Commit
---
 ...oncurrentMapPartitionUpsertMetadataManager.java |   3 +-
 .../segment/local/upsert/IntelligentKVStore.java   | 173 +++++++++++++++++++++
 ...rrentMapPartitionUpsertMetadataManagerTest.java |  21 +--
 3 files changed, 186 insertions(+), 11 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 089ad01430..306b64db02 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -51,7 +51,8 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
 public class ConcurrentMapPartitionUpsertMetadataManager extends 
BasePartitionUpsertMetadataManager {
 
   @VisibleForTesting
-  final ConcurrentHashMap<Object, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+  final IntelligentKVStore _primaryKeyToRecordLocationMap =
+      new IntelligentKVStore(new ConcurrentHashMap<>());
 
   public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, 
int partitionId,
       List<String> primaryKeyColumns, List<String> comparisonColumns, 
@Nullable String deleteRecordColumn,
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java
new file mode 100644
index 0000000000..18e115dd17
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java
@@ -0,0 +1,173 @@
+package org.apache.pinot.segment.local.upsert;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+
+public class IntelligentKVStore {
+  @VisibleForTesting
+  final ConcurrentHashMap<Object, 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation>
+      _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+  final Map<Object, 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> _offheapStore;
+
+  public IntelligentKVStore(Map<Object, 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> offheapStore) {
+    _offheapStore = offheapStore;
+  }
+
+  public int size() {
+    return _primaryKeyToRecordLocationMap.size() + _offheapStore.size();
+  }
+
+  public boolean isEmpty() {
+    return _primaryKeyToRecordLocationMap.isEmpty() && _offheapStore.isEmpty();
+  }
+
+  public boolean containsKey(Object key) {
+    return _primaryKeyToRecordLocationMap.containsKey(key) || 
_offheapStore.containsKey(key);
+  }
+
+  public boolean containsValue(Object value) {
+    return _primaryKeyToRecordLocationMap.containsValue(value) || 
_offheapStore.containsValue(value);
+  }
+
+  public ConcurrentMapPartitionUpsertMetadataManager.RecordLocation get(Object 
key) {
+    ConcurrentMapPartitionUpsertMetadataManager.RecordLocation value = 
_primaryKeyToRecordLocationMap.get(key);
+
+    if (value == null) {
+      return _offheapStore.get(key);
+    }
+
+    return value;
+  }
+
+  public Object put(Object key, Object value) {
+
+    if (!(value instanceof 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation)) {
+      throw new IllegalArgumentException("Value should be of type 
RecordLocation");
+    }
+
+    // We do a double put
+    _primaryKeyToRecordLocationMap.putIfAbsent(key, 
(ConcurrentMapPartitionUpsertMetadataManager.RecordLocation) value);
+    _offheapStore.put(key, 
(ConcurrentMapPartitionUpsertMetadataManager.RecordLocation) value);
+
+    return value;
+  }
+
+  public Object remove(Object key) {
+    // Remove in all maps where the key is present
+    ConcurrentMapPartitionUpsertMetadataManager.RecordLocation firstValue = 
_primaryKeyToRecordLocationMap.remove(key);
+    ConcurrentMapPartitionUpsertMetadataManager.RecordLocation secondValue = 
_offheapStore.remove(key);
+
+    if (firstValue == null) {
+      return secondValue;
+    }
+
+    return firstValue;
+  }
+
+  public void clear() {
+    _primaryKeyToRecordLocationMap.clear();
+    _offheapStore.clear();
+  }
+
+  ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
computeIfPresent(Object key,
+      BiFunction<? super Object, ? super 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ?
+          extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> 
remappingFunction) {
+    // TODO: Concurrency checks
+    if (_primaryKeyToRecordLocationMap.containsKey(key)) {
+      return computeIfPresentInternal(_primaryKeyToRecordLocationMap, key, 
remappingFunction);
+    } else {
+      return computeIfPresentInternal(_offheapStore, key, remappingFunction);
+    }
+  }
+
+  ConcurrentMapPartitionUpsertMetadataManager.RecordLocation 
computeIfPresentInternal(
+      Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> 
map, Object key,
+      BiFunction<? super Object, ? super 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ?
+          extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> 
remappingFunction) {
+    if (map.get(key) != null) {
+      ConcurrentMapPartitionUpsertMetadataManager.RecordLocation oldValue = 
map.get(key);
+      ConcurrentMapPartitionUpsertMetadataManager.RecordLocation newValue = 
remappingFunction.apply(key, oldValue);
+      if (newValue != null) {
+        map.put(key, newValue);
+        return newValue;
+      } else {
+        map.remove(key);
+        return null;
+      }
+    }
+
+    return null;
+  }
+
+  ConcurrentMapPartitionUpsertMetadataManager.RecordLocation compute(Object 
key,
+      BiFunction<? super Object, ? super 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ?
+          extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> 
remappingFunction) {
+
+    if (_primaryKeyToRecordLocationMap.containsKey(key)) {
+      return computeInternal(_primaryKeyToRecordLocationMap, key, 
remappingFunction);
+    } else {
+      return computeInternal(_offheapStore, key, remappingFunction);
+    }
+  }
+
+  ConcurrentMapPartitionUpsertMetadataManager.RecordLocation computeInternal(
+      Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> 
map, Object key,
+      BiFunction<? super Object, ? super 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ?
+          extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> 
remappingFunction) {
+    ConcurrentMapPartitionUpsertMetadataManager.RecordLocation oldValue = 
map.get(key);
+    ConcurrentMapPartitionUpsertMetadataManager.RecordLocation newValue = 
remappingFunction.apply(key, oldValue);
+    if (oldValue != null) {
+      if (newValue != null) {
+        map.put(key, newValue);
+
+        return newValue;
+      } else {
+        map.remove(key);
+
+        return null;
+      }
+    } else {
+      if (newValue != null) {
+        map.put(key, newValue);
+      }
+      return null;
+    }
+  }
+
+  void forEach(BiConsumer<? super Object, ? super 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> action) {
+    //TODO: Extend this to offheap store as well
+    forEachInternal(_primaryKeyToRecordLocationMap, action);
+  }
+
+  void forEachInternal(Map<Object, 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> map,
+      BiConsumer<? super Object, ? super 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> action) {
+    for (Map.Entry<Object, 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> entry : 
map.entrySet()) {
+      action.accept(entry.getKey(), entry.getValue());
+    }
+  }
+
+  boolean remove(Object key,
+      Object value) {
+
+    if (_primaryKeyToRecordLocationMap.containsKey(key)) {
+      return removeInternal(_primaryKeyToRecordLocationMap, key, value);
+    }
+
+    return removeInternal(_offheapStore, key, value);
+  }
+
+  boolean removeInternal(Map<Object, 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> map, Object key,
+      Object value) {
+    if (map.containsKey(key) && Objects.equals(map.get(key), value)) {
+      map.remove(key);
+      return true;
+    } else {
+      return false;
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 363ed18e13..ec2b89065e 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
+import jnr.ffi.annotations.In;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -115,7 +116,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
             Collections.singletonList(comparisonColumn), null, hashFunction, 
null, false, 0, INDEX_DIR,
             mock(ServerMetrics.class));
-    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
 
     // Add the first segment
@@ -278,7 +279,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
             Collections.singletonList(comparisonColumn), deleteRecordColumn, 
hashFunction, null, false, 0, INDEX_DIR,
             mock(ServerMetrics.class));
-    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
 
     // Add the first segment
@@ -536,7 +537,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     return new PrimaryKey(new Object[]{value});
   }
 
-  private static void checkRecordLocation(Map<Object, RecordLocation> 
recordLocationMap, int keyValue,
+  private static void checkRecordLocation(IntelligentKVStore 
recordLocationMap, int keyValue,
       IndexSegment segment, int docId, int comparisonValue, HashFunction 
hashFunction) {
     RecordLocation recordLocation =
         
recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue), 
hashFunction));
@@ -561,7 +562,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
             Collections.singletonList(comparisonColumn), null, hashFunction, 
null, false, 0, INDEX_DIR,
             mock(ServerMetrics.class));
-    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
@@ -652,7 +653,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
             Collections.singletonList(comparisonColumn), null, hashFunction, 
null, false, 0, INDEX_DIR,
             mock(ServerMetrics.class));
-    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
@@ -708,7 +709,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
             Collections.singletonList(comparisonColumn), deleteColumn, 
hashFunction, null, false, 0, INDEX_DIR,
             mock(ServerMetrics.class));
-    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // queryableDocIds is same as validDocIds in the absence of delete markers
     // Add the first segment
@@ -809,7 +810,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
             Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, false, 30, tableDir,
             mock(ServerMetrics.class));
-    Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> 
recordLocationMap =
+    IntelligentKVStore recordLocationMap =
         upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add record to update largestSeenTimestamp, largest seen timestamp: 
earlierComparisonValue
@@ -870,7 +871,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
             Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, true, 30, tableDir,
             mock(ServerMetrics.class));
-    Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> 
recordLocationMap =
+    IntelligentKVStore recordLocationMap =
         upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add record to update largestSeenTimestamp, largest seen timestamp: 80
@@ -936,7 +937,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
             Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, true, 30, tableDir,
             mock(ServerMetrics.class));
-    Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> 
recordLocationMap =
+    IntelligentKVStore recordLocationMap =
         upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add record to update largestSeenTimestamp, largest seen timestamp: 
comparisonValue
@@ -975,7 +976,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
   }
 
   // Add the following utils function since the Comparison column is a long 
value for TTL enabled upsert table.
-  private static void checkRecordLocationForTTL(Map<Object, RecordLocation> 
recordLocationMap, int keyValue,
+  private static void checkRecordLocationForTTL(IntelligentKVStore 
recordLocationMap, int keyValue,
       IndexSegment segment, int docId, Number comparisonValue, HashFunction 
hashFunction) {
     RecordLocation recordLocation =
         
recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue), 
hashFunction));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to