This is an automated email from the ASF dual-hosted git repository. atri pushed a commit to branch pluggable_store in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 267dfbad0dbbab1b30274b571a81f24f3d4f35a5 Author: Atri Sharma <[email protected]> AuthorDate: Mon Sep 4 10:13:00 2023 +0530 Intermediate First Working Commit --- ...oncurrentMapPartitionUpsertMetadataManager.java | 3 +- .../segment/local/upsert/IntelligentKVStore.java | 173 +++++++++++++++++++++ ...rrentMapPartitionUpsertMetadataManagerTest.java | 21 +-- 3 files changed, 186 insertions(+), 11 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index 089ad01430..306b64db02 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -51,7 +51,8 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap; public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUpsertMetadataManager { @VisibleForTesting - final ConcurrentHashMap<Object, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>(); + final IntelligentKVStore _primaryKeyToRecordLocationMap = + new IntelligentKVStore(new ConcurrentHashMap<>()); public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId, List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn, diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java new file mode 100644 index 0000000000..18e115dd17 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/IntelligentKVStore.java @@ -0,0 +1,173 @@ +package org.apache.pinot.segment.local.upsert; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; + + +public class IntelligentKVStore { + @VisibleForTesting + final ConcurrentHashMap<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> + _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>(); + final Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> _offheapStore; + + public IntelligentKVStore(Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> offheapStore) { + _offheapStore = offheapStore; + } + + public int size() { + return _primaryKeyToRecordLocationMap.size() + _offheapStore.size(); + } + + public boolean isEmpty() { + return _primaryKeyToRecordLocationMap.isEmpty() && _offheapStore.isEmpty(); + } + + public boolean containsKey(Object key) { + return _primaryKeyToRecordLocationMap.containsKey(key) || _offheapStore.containsKey(key); + } + + public boolean containsValue(Object value) { + return _primaryKeyToRecordLocationMap.containsValue(value) || _offheapStore.containsValue(value); + } + + public ConcurrentMapPartitionUpsertMetadataManager.RecordLocation get(Object key) { + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation value = _primaryKeyToRecordLocationMap.get(key); + + if (value == null) { + return _offheapStore.get(key); + } + + return value; + } + + public Object put(Object key, Object value) { + + if (!(value instanceof ConcurrentMapPartitionUpsertMetadataManager.RecordLocation)) { + throw new IllegalArgumentException("Value should be of type RecordLocation"); + } + + // We do a double put + _primaryKeyToRecordLocationMap.putIfAbsent(key, (ConcurrentMapPartitionUpsertMetadataManager.RecordLocation) value); + _offheapStore.put(key, (ConcurrentMapPartitionUpsertMetadataManager.RecordLocation) value); + + return value; + } + + public Object remove(Object key) { + // Remove in all maps where the key is present + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation firstValue = _primaryKeyToRecordLocationMap.remove(key); + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation secondValue = _offheapStore.remove(key); + + if (firstValue == null) { + return secondValue; + } + + return firstValue; + } + + public void clear() { + _primaryKeyToRecordLocationMap.clear(); + _offheapStore.clear(); + } + + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation computeIfPresent(Object key, + BiFunction<? super Object, ? super ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ? + extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> remappingFunction) { + // TODO: Concurrency checks + if (_primaryKeyToRecordLocationMap.containsKey(key)) { + return computeIfPresentInternal(_primaryKeyToRecordLocationMap, key, remappingFunction); + } else { + return computeIfPresentInternal(_offheapStore, key, remappingFunction); + } + } + + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation computeIfPresentInternal( + Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> map, Object key, + BiFunction<? super Object, ? super ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ? + extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> remappingFunction) { + if (map.get(key) != null) { + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation oldValue = map.get(key); + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation newValue = remappingFunction.apply(key, oldValue); + if (newValue != null) { + map.put(key, newValue); + return newValue; + } else { + map.remove(key); + return null; + } + } + + return null; + } + + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation compute(Object key, + BiFunction<? super Object, ? super ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ? + extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> remappingFunction) { + + if (_primaryKeyToRecordLocationMap.containsKey(key)) { + return computeInternal(_primaryKeyToRecordLocationMap, key, remappingFunction); + } else { + return computeInternal(_offheapStore, key, remappingFunction); + } + } + + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation computeInternal( + Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> map, Object key, + BiFunction<? super Object, ? super ConcurrentMapPartitionUpsertMetadataManager.RecordLocation, ? + extends ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> remappingFunction) { + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation oldValue = map.get(key); + ConcurrentMapPartitionUpsertMetadataManager.RecordLocation newValue = remappingFunction.apply(key, oldValue); + if (oldValue != null) { + if (newValue != null) { + map.put(key, newValue); + + return newValue; + } else { + map.remove(key); + + return null; + } + } else { + if (newValue != null) { + map.put(key, newValue); + } + return null; + } + } + + void forEach(BiConsumer<? super Object, ? super ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> action) { + //TODO: Extend this to offheap store as well + forEachInternal(_primaryKeyToRecordLocationMap, action); + } + + void forEachInternal(Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> map, + BiConsumer<? super Object, ? super ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> action) { + for (Map.Entry<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> entry : map.entrySet()) { + action.accept(entry.getKey(), entry.getValue()); + } + } + + boolean remove(Object key, + Object value) { + + if (_primaryKeyToRecordLocationMap.containsKey(key)) { + return removeInternal(_primaryKeyToRecordLocationMap, key, value); + } + + return removeInternal(_offheapStore, key, value); + } + + boolean removeInternal(Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> map, Object key, + Object value) { + if (map.containsKey(key) && Objects.equals(map.get(key), value)) { + map.remove(key); + return true; + } else { + return false; + } + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index 363ed18e13..ec2b89065e 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import javax.annotation.Nullable; +import jnr.ffi.annotations.In; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; @@ -115,7 +116,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR, mock(ServerMetrics.class)); - Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + IntelligentKVStore recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments; // Add the first segment @@ -278,7 +279,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, 0, INDEX_DIR, mock(ServerMetrics.class)); - Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + IntelligentKVStore recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments; // Add the first segment @@ -536,7 +537,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { return new PrimaryKey(new Object[]{value}); } - private static void checkRecordLocation(Map<Object, RecordLocation> recordLocationMap, int keyValue, + private static void checkRecordLocation(IntelligentKVStore recordLocationMap, int keyValue, IndexSegment segment, int docId, int comparisonValue, HashFunction hashFunction) { RecordLocation recordLocation = recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue), hashFunction)); @@ -561,7 +562,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR, mock(ServerMetrics.class)); - Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + IntelligentKVStore recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add the first segment // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100} @@ -652,7 +653,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR, mock(ServerMetrics.class)); - Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + IntelligentKVStore recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add the first segment // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100} @@ -708,7 +709,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null, false, 0, INDEX_DIR, mock(ServerMetrics.class)); - Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + IntelligentKVStore recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // queryableDocIds is same as validDocIds in the absence of delete markers // Add the first segment @@ -809,7 +810,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, tableDir, mock(ServerMetrics.class)); - Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap = + IntelligentKVStore recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add record to update largestSeenTimestamp, largest seen timestamp: earlierComparisonValue @@ -870,7 +871,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir, mock(ServerMetrics.class)); - Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap = + IntelligentKVStore recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add record to update largestSeenTimestamp, largest seen timestamp: 80 @@ -936,7 +937,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir, mock(ServerMetrics.class)); - Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap = + IntelligentKVStore recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add record to update largestSeenTimestamp, largest seen timestamp: comparisonValue @@ -975,7 +976,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { } // Add the following utils function since the Comparison column is a long value for TTL enabled upsert table. - private static void checkRecordLocationForTTL(Map<Object, RecordLocation> recordLocationMap, int keyValue, + private static void checkRecordLocationForTTL(IntelligentKVStore recordLocationMap, int keyValue, IndexSegment segment, int docId, Number comparisonValue, HashFunction hashFunction) { RecordLocation recordLocation = recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue), hashFunction)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
