This is an automated email from the ASF dual-hosted git repository.

atri pushed a commit to branch pluggable_store
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/pluggable_store by this push:
     new 4877348b4a Add new config for offheap storage TTL
4877348b4a is described below

commit 4877348b4ab4c6e1ae401f205e2e8fcaed821d2f
Author: Atri Sharma <[email protected]>
AuthorDate: Wed Sep 6 00:27:39 2023 +0530

    Add new config for offheap storage TTL
---
 ...etadataAndDictionaryAggregationPlanMakerTest.java |  2 +-
 .../upsert/BasePartitionUpsertMetadataManager.java   |  4 +++-
 .../local/upsert/BaseTableUpsertMetadataManager.java |  2 ++
 .../ConcurrentMapPartitionUpsertMetadataManager.java |  6 +++---
 .../ConcurrentMapTableUpsertMetadataManager.java     |  2 +-
 ...currentMapPartitionUpsertMetadataManagerTest.java | 20 ++++++++++----------
 .../apache/pinot/spi/config/table/UpsertConfig.java  | 13 +++++++++++++
 7 files changed, 33 insertions(+), 16 deletions(-)

diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 874c8a6bed..d8e188d1ea 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -131,7 +131,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
     _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, 
SEGMENT_NAME), ReadMode.heap);
     ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
         new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 
0, Collections.singletonList("column6"),
-            Collections.singletonList("daysSinceEpoch"), null, 
HashFunction.NONE, null, false, 0, INDEX_DIR,
+            Collections.singletonList("daysSinceEpoch"), null, 
HashFunction.NONE, null, false, 0, 0, INDEX_DIR,
             serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index c2b5a8c8d0..174554204e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -69,6 +69,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   protected final PartialUpsertHandler _partialUpsertHandler;
   protected final boolean _enableSnapshot;
   protected final double _metadataTTL;
+  protected final double _offheapStorageTTL;
   protected final File _tableIndexDir;
   protected final ServerMetrics _serverMetrics;
   protected final Logger _logger;
@@ -94,7 +95,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   protected BasePartitionUpsertMetadataManager(String tableNameWithType, int 
partitionId,
       List<String> primaryKeyColumns, List<String> comparisonColumns, 
@Nullable String deleteRecordColumn,
       HashFunction hashFunction, @Nullable PartialUpsertHandler 
partialUpsertHandler, boolean enableSnapshot,
-      double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
+      double metadataTTL, double offheapStorageTTL, File tableIndexDir, 
ServerMetrics serverMetrics) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _primaryKeyColumns = primaryKeyColumns;
@@ -104,6 +105,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     _partialUpsertHandler = partialUpsertHandler;
     _enableSnapshot = enableSnapshot;
     _metadataTTL = metadataTTL;
+    _offheapStorageTTL = offheapStorageTTL;
     _tableIndexDir = tableIndexDir;
     _snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null;
     _serverMetrics = serverMetrics;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 2fecc4fbe5..9050b18696 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -69,6 +69,7 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
   protected PartialUpsertHandler _partialUpsertHandler;
   protected boolean _enableSnapshot;
   protected double _metadataTTL;
+  protected double _offheapStorageTTL;
   protected File _tableIndexDir;
   protected ServerMetrics _serverMetrics;
   protected HelixManager _helixManager;
@@ -111,6 +112,7 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _metadataTTL = upsertConfig.getMetadataTTL();
+    _offheapStorageTTL = upsertConfig.getOffHeapStorageTTL();
     _tableIndexDir = tableDataManager.getTableDataDir();
     _serverMetrics = serverMetrics;
     _helixManager = helixManager;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 96e82b64ca..e3adc2bb07 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -57,9 +57,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
   public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, 
int partitionId,
       List<String> primaryKeyColumns, List<String> comparisonColumns, 
@Nullable String deleteRecordColumn,
       HashFunction hashFunction, @Nullable PartialUpsertHandler 
partialUpsertHandler, boolean enableSnapshot,
-      double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
+      double metadataTTL, double offheapStorageTTL, File tableIndexDir, 
ServerMetrics serverMetrics) {
     super(tableNameWithType, partitionId, primaryKeyColumns, 
comparisonColumns, deleteRecordColumn, hashFunction,
-        partialUpsertHandler, enableSnapshot, metadataTTL, tableIndexDir, 
serverMetrics);
+        partialUpsertHandler, enableSnapshot, metadataTTL, offheapStorageTTL, 
tableIndexDir, serverMetrics);
   }
 
   @Override
@@ -239,7 +239,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
   @Override
   public void doTransferColdKeysToOffheapStorage() {
     //TODO: Declare own TTL
-    double threshold = _largestSeenComparisonValue - _metadataTTL;
+    double threshold = _largestSeenComparisonValue - _offheapStorageTTL;
     _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
       if (((Number) recordLocation.getComparisonValue()).doubleValue() < 
threshold) {
         _primaryKeyToRecordLocationMap.transferKey(primaryKey);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 3380203656..0f428af596 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -37,7 +37,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends 
BaseTableUpsertMeta
     return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
         k -> new 
ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, 
_primaryKeyColumns,
             _comparisonColumns, _deleteRecordColumn, _hashFunction, 
_partialUpsertHandler,
-            _enableSnapshot, _metadataTTL, _tableIndexDir, _serverMetrics));
+            _enableSnapshot, _metadataTTL, _offheapStorageTTL, _tableIndexDir, 
_serverMetrics));
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 76f7f7339e..f848561fe8 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -126,7 +126,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, 
null, false, 0, INDEX_DIR,
+            Collections.singletonList(comparisonColumn), null, hashFunction, 
null, false, 0, 0, INDEX_DIR,
             mock(ServerMetrics.class));
     IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -289,7 +289,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     String deleteRecordColumn = "deleteCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), deleteRecordColumn, 
hashFunction, null, false, 0, INDEX_DIR,
+            Collections.singletonList(comparisonColumn), deleteRecordColumn, 
hashFunction, null, false, 0, 0, INDEX_DIR,
             mock(ServerMetrics.class));
     IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -572,7 +572,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, 
null, false, 0, INDEX_DIR,
+            Collections.singletonList(comparisonColumn), null, hashFunction, 
null, false, 0, 0, INDEX_DIR,
             mock(ServerMetrics.class));
     IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -663,7 +663,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, 
null, false, 0, INDEX_DIR,
+            Collections.singletonList(comparisonColumn), null, hashFunction, 
null, false, 0, 0, INDEX_DIR,
             mock(ServerMetrics.class));
     IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -719,7 +719,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     String deleteColumn = "deleteCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), deleteColumn, 
hashFunction, null, false, 0, INDEX_DIR,
+            Collections.singletonList(comparisonColumn), deleteColumn, 
hashFunction, null, false, 0, 0, INDEX_DIR,
             mock(ServerMetrics.class));
     IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -820,7 +820,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, false, 30, tableDir,
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, false, 30, 0, tableDir,
             mock(ServerMetrics.class));
     IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -880,7 +880,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, false, 30, tableDir,
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, false, 30, 30, tableDir,
             mock(ServerMetrics.class));
     IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -937,7 +937,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, true, 30, tableDir,
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, true, 30, 0, tableDir,
             mock(ServerMetrics.class));
     IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -1002,7 +1002,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, true, 30, tableDir,
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, true, 30, 0, tableDir,
             mock(ServerMetrics.class));
     IntelligentKVStore recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -1064,7 +1064,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
   private void verifyPersistAndLoadWatermark() {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, true, 10, INDEX_DIR,
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, 
null, true, 10, 0, INDEX_DIR,
             mock(ServerMetrics.class));
 
     double currentTimeMs = System.currentTimeMillis();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 2e2a3f645a..35385d0302 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -63,6 +63,10 @@ public class UpsertConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Whether to use TTL for upsert metadata cleanup, it 
uses the same unit as comparison col")
   private double _metadataTTL;
 
+  @JsonPropertyDescription("Whether to use TTL for cold key transfer to 
offheap storage, it uses the same unit as "
+      + "comparison col")
+  private double _offHeapStorageTTL;
+
   @JsonPropertyDescription("Whether to preload segments for fast upsert 
metadata recovery")
   private boolean _enablePreload;
 
@@ -118,6 +122,10 @@ public class UpsertConfig extends BaseJsonConfig {
     return _metadataTTL;
   }
 
+  public double getOffHeapStorageTTL() {
+    return _offHeapStorageTTL;
+  }
+
   public boolean isEnablePreload() {
     return _enablePreload;
   }
@@ -190,6 +198,11 @@ public class UpsertConfig extends BaseJsonConfig {
     _metadataTTL = metadataTTL;
   }
 
+  public void setOffHeapStorageTTL(double offHeapStorageTTL) {
+    _offHeapStorageTTL = offHeapStorageTTL;
+  }
+
+
   public void setEnablePreload(boolean enablePreload) {
     _enablePreload = enablePreload;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to