This is an automated email from the ASF dual-hosted git repository.
atri pushed a commit to branch pluggable_store
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/pluggable_store by this push:
new 4877348b4a Add new config for offheap storage TTL
4877348b4a is described below
commit 4877348b4ab4c6e1ae401f205e2e8fcaed821d2f
Author: Atri Sharma <[email protected]>
AuthorDate: Wed Sep 6 00:27:39 2023 +0530
Add new config for offheap storage TTL
---
...etadataAndDictionaryAggregationPlanMakerTest.java | 2 +-
.../upsert/BasePartitionUpsertMetadataManager.java | 4 +++-
.../local/upsert/BaseTableUpsertMetadataManager.java | 2 ++
.../ConcurrentMapPartitionUpsertMetadataManager.java | 6 +++---
.../ConcurrentMapTableUpsertMetadataManager.java | 2 +-
...currentMapPartitionUpsertMetadataManagerTest.java | 20 ++++++++++----------
.../apache/pinot/spi/config/table/UpsertConfig.java | 13 +++++++++++++
7 files changed, 33 insertions(+), 16 deletions(-)
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 874c8a6bed..d8e188d1ea 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -131,7 +131,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
_upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR,
SEGMENT_NAME), ReadMode.heap);
((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME",
0, Collections.singletonList("column6"),
- Collections.singletonList("daysSinceEpoch"), null,
HashFunction.NONE, null, false, 0, INDEX_DIR,
+ Collections.singletonList("daysSinceEpoch"), null,
HashFunction.NONE, null, false, 0, 0, INDEX_DIR,
serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index c2b5a8c8d0..174554204e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -69,6 +69,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
protected final PartialUpsertHandler _partialUpsertHandler;
protected final boolean _enableSnapshot;
protected final double _metadataTTL;
+ protected final double _offheapStorageTTL;
protected final File _tableIndexDir;
protected final ServerMetrics _serverMetrics;
protected final Logger _logger;
@@ -94,7 +95,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
protected BasePartitionUpsertMetadataManager(String tableNameWithType, int
partitionId,
List<String> primaryKeyColumns, List<String> comparisonColumns,
@Nullable String deleteRecordColumn,
HashFunction hashFunction, @Nullable PartialUpsertHandler
partialUpsertHandler, boolean enableSnapshot,
- double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
+ double metadataTTL, double offheapStorageTTL, File tableIndexDir,
ServerMetrics serverMetrics) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
_primaryKeyColumns = primaryKeyColumns;
@@ -104,6 +105,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
_partialUpsertHandler = partialUpsertHandler;
_enableSnapshot = enableSnapshot;
_metadataTTL = metadataTTL;
+ _offheapStorageTTL = offheapStorageTTL;
_tableIndexDir = tableIndexDir;
_snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null;
_serverMetrics = serverMetrics;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 2fecc4fbe5..9050b18696 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -69,6 +69,7 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
protected PartialUpsertHandler _partialUpsertHandler;
protected boolean _enableSnapshot;
protected double _metadataTTL;
+ protected double _offheapStorageTTL;
protected File _tableIndexDir;
protected ServerMetrics _serverMetrics;
protected HelixManager _helixManager;
@@ -111,6 +112,7 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
_enableSnapshot = upsertConfig.isEnableSnapshot();
_metadataTTL = upsertConfig.getMetadataTTL();
+ _offheapStorageTTL = upsertConfig.getOffHeapStorageTTL();
_tableIndexDir = tableDataManager.getTableDataDir();
_serverMetrics = serverMetrics;
_helixManager = helixManager;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 96e82b64ca..e3adc2bb07 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -57,9 +57,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType,
int partitionId,
List<String> primaryKeyColumns, List<String> comparisonColumns,
@Nullable String deleteRecordColumn,
HashFunction hashFunction, @Nullable PartialUpsertHandler
partialUpsertHandler, boolean enableSnapshot,
- double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
+ double metadataTTL, double offheapStorageTTL, File tableIndexDir,
ServerMetrics serverMetrics) {
super(tableNameWithType, partitionId, primaryKeyColumns,
comparisonColumns, deleteRecordColumn, hashFunction,
- partialUpsertHandler, enableSnapshot, metadataTTL, tableIndexDir,
serverMetrics);
+ partialUpsertHandler, enableSnapshot, metadataTTL, offheapStorageTTL,
tableIndexDir, serverMetrics);
}
@Override
@@ -239,7 +239,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
@Override
public void doTransferColdKeysToOffheapStorage() {
//TODO: Declare own TTL
- double threshold = _largestSeenComparisonValue - _metadataTTL;
+ double threshold = _largestSeenComparisonValue - _offheapStorageTTL;
_primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
if (((Number) recordLocation.getComparisonValue()).doubleValue() <
threshold) {
_primaryKeyToRecordLocationMap.transferKey(primaryKey);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 3380203656..0f428af596 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -37,7 +37,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends
BaseTableUpsertMeta
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
k -> new
ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k,
_primaryKeyColumns,
_comparisonColumns, _deleteRecordColumn, _hashFunction,
_partialUpsertHandler,
- _enableSnapshot, _metadataTTL, _tableIndexDir, _serverMetrics));
+ _enableSnapshot, _metadataTTL, _offheapStorageTTL, _tableIndexDir,
_serverMetrics));
}
@Override
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 76f7f7339e..f848561fe8 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -126,7 +126,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction,
null, false, 0, INDEX_DIR,
+ Collections.singletonList(comparisonColumn), null, hashFunction,
null, false, 0, 0, INDEX_DIR,
mock(ServerMetrics.class));
IntelligentKVStore recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -289,7 +289,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
String deleteRecordColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteRecordColumn,
hashFunction, null, false, 0, INDEX_DIR,
+ Collections.singletonList(comparisonColumn), deleteRecordColumn,
hashFunction, null, false, 0, 0, INDEX_DIR,
mock(ServerMetrics.class));
IntelligentKVStore recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -572,7 +572,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction,
null, false, 0, INDEX_DIR,
+ Collections.singletonList(comparisonColumn), null, hashFunction,
null, false, 0, 0, INDEX_DIR,
mock(ServerMetrics.class));
IntelligentKVStore recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -663,7 +663,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction,
null, false, 0, INDEX_DIR,
+ Collections.singletonList(comparisonColumn), null, hashFunction,
null, false, 0, 0, INDEX_DIR,
mock(ServerMetrics.class));
IntelligentKVStore recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -719,7 +719,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
String deleteColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteColumn,
hashFunction, null, false, 0, INDEX_DIR,
+ Collections.singletonList(comparisonColumn), deleteColumn,
hashFunction, null, false, 0, 0, INDEX_DIR,
mock(ServerMetrics.class));
IntelligentKVStore recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -820,7 +820,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, false, 30, tableDir,
+ Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, false, 30, 0, tableDir,
mock(ServerMetrics.class));
IntelligentKVStore recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -880,7 +880,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, false, 30, tableDir,
+ Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, false, 30, 30, tableDir,
mock(ServerMetrics.class));
IntelligentKVStore recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -937,7 +937,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, true, 30, tableDir,
+ Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, true, 30, 0, tableDir,
mock(ServerMetrics.class));
IntelligentKVStore recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1002,7 +1002,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, true, 30, tableDir,
+ Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, true, 30, 0, tableDir,
mock(ServerMetrics.class));
IntelligentKVStore recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1064,7 +1064,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
private void verifyPersistAndLoadWatermark() {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, true, 10, INDEX_DIR,
+ Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, true, 10, 0, INDEX_DIR,
mock(ServerMetrics.class));
double currentTimeMs = System.currentTimeMillis();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 2e2a3f645a..35385d0302 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -63,6 +63,10 @@ public class UpsertConfig extends BaseJsonConfig {
@JsonPropertyDescription("Whether to use TTL for upsert metadata cleanup, it
uses the same unit as comparison col")
private double _metadataTTL;
+ @JsonPropertyDescription("Whether to use TTL for cold key transfer to
offheap storage, it uses the same unit as "
+ + "comparison col")
+ private double _offHeapStorageTTL;
+
@JsonPropertyDescription("Whether to preload segments for fast upsert
metadata recovery")
private boolean _enablePreload;
@@ -118,6 +122,10 @@ public class UpsertConfig extends BaseJsonConfig {
return _metadataTTL;
}
+ public double getOffHeapStorageTTL() {
+ return _offHeapStorageTTL;
+ }
+
public boolean isEnablePreload() {
return _enablePreload;
}
@@ -190,6 +198,11 @@ public class UpsertConfig extends BaseJsonConfig {
_metadataTTL = metadataTTL;
}
+ public void setOffHeapStorageTTL(double offHeapStorageTTL) {
+ _offHeapStorageTTL = offHeapStorageTTL;
+ }
+
+
public void setEnablePreload(boolean enablePreload) {
_enablePreload = enablePreload;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]