This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5eafe76ee8 [Upsert TTL] Add Watermark for each partitions for Primary
key cleanup (#10915)
5eafe76ee8 is described below
commit 5eafe76ee89adfd12007db17de7a50c5dd7a43ac
Author: deemoliu <[email protected]>
AuthorDate: Sun Jul 23 21:34:21 2023 -0700
[Upsert TTL] Add Watermark for each partitions for Primary key cleanup
(#10915)
---
.../realtime/LLRealtimeSegmentDataManager.java | 2 +
...adataAndDictionaryAggregationPlanMakerTest.java | 4 +-
.../upsert/BasePartitionUpsertMetadataManager.java | 143 ++++++++++-
.../upsert/BaseTableUpsertMetadataManager.java | 4 +
...oncurrentMapPartitionUpsertMetadataManager.java | 22 +-
.../ConcurrentMapTableUpsertMetadataManager.java | 4 +-
.../upsert/PartitionUpsertMetadataManager.java | 5 +
.../segment/local/utils/TableConfigUtils.java | 29 +++
...rrentMapPartitionUpsertMetadataManagerTest.java | 279 +++++++++++++++++++--
.../segment/local/utils/TableConfigUtilsTest.java | 85 +++++++
.../org/apache/pinot/segment/spi/V1Constants.java | 1 +
.../mutable/ThreadSafeMutableRoaringBitmap.java | 4 +
.../pinot/spi/config/table/UpsertConfig.java | 11 +
13 files changed, 570 insertions(+), 23 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 3f655728bc..5a2d075cae 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -675,6 +675,8 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// Take upsert snapshot before starting consuming events
if (_partitionUpsertMetadataManager != null) {
_partitionUpsertMetadataManager.takeSnapshot();
+ // If upsertTTL is enabled, we will remove expired primary keys from
upsertMetadata after taking snapshot.
+ _partitionUpsertMetadataManager.removeExpiredPrimaryKeys();
}
while (!_state.isFinal()) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 47459fb691..874c8a6bed 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -131,8 +131,8 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
_upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR,
SEGMENT_NAME), ReadMode.heap);
((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME",
0, Collections.singletonList("column6"),
- Collections.singletonList("daysSinceEpoch"), null,
HashFunction.NONE, null, false, serverMetrics),
- new ThreadSafeMutableRoaringBitmap(), null);
+ Collections.singletonList("daysSinceEpoch"), null,
HashFunction.NONE, null, false, 0, INDEX_DIR,
+ serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
}
@AfterClass
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 4632047989..6dc9d2bb7f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -20,7 +20,12 @@ package org.apache.pinot.segment.local.upsert;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -32,6 +37,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -41,6 +47,7 @@ import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -61,6 +68,8 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
protected final HashFunction _hashFunction;
protected final PartialUpsertHandler _partialUpsertHandler;
protected final boolean _enableSnapshot;
+ protected final double _metadataTTL;
+ protected final File _tableIndexDir;
protected final ServerMetrics _serverMetrics;
protected final Logger _logger;
@@ -78,10 +87,14 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
protected int _numOutOfOrderEvents = 0;
+ // Used to maintain the largestSeenComparisonValue to avoid handling
out-of-ttl segments/records.
+ // If upsertTTL enabled, we will keep track of largestSeenComparisonValue to
compute expired segments.
+ protected volatile double _largestSeenComparisonValue;
+
protected BasePartitionUpsertMetadataManager(String tableNameWithType, int
partitionId,
List<String> primaryKeyColumns, List<String> comparisonColumns,
@Nullable String deleteRecordColumn,
HashFunction hashFunction, @Nullable PartialUpsertHandler
partialUpsertHandler, boolean enableSnapshot,
- ServerMetrics serverMetrics) {
+ double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
_primaryKeyColumns = primaryKeyColumns;
@@ -90,9 +103,16 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
_hashFunction = hashFunction;
_partialUpsertHandler = partialUpsertHandler;
_enableSnapshot = enableSnapshot;
+ _metadataTTL = metadataTTL;
+ _tableIndexDir = tableIndexDir;
_snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null;
_serverMetrics = serverMetrics;
_logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId +
"-" + getClass().getSimpleName());
+ if (metadataTTL > 0) {
+ _largestSeenComparisonValue = loadWatermark();
+ } else {
+ deleteWatermark();
+ }
}
@Override
@@ -114,13 +134,36 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
"Got unsupported segment implementation: {} for segment: {}, table:
{}", segment.getClass(), segmentName,
_tableNameWithType);
+ ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
+
+ // Skip adding segment that has max comparison value smaller than
(largestSeenComparisonValue - TTL)
+ if (_largestSeenComparisonValue > 0) {
+ Preconditions.checkState(_enableSnapshot, "Upsert TTL must have snapshot
enabled");
+ Preconditions.checkState(_comparisonColumns.size() == 1,
+ "Upsert TTL does not work with multiple comparison columns");
+ // TODO: Support deletion for TTL. Need to construct queryableDocIds
when adding segments out of TTL.
+ Preconditions.checkState(_deleteRecordColumn == null, "Upsert TTL
doesn't work with record deletion");
+ Number maxComparisonValue =
+ (Number)
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
+ if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue -
_metadataTTL) {
+ _logger.info("Skip adding segment: {} because it's out of TTL",
segmentName);
+ MutableRoaringBitmap validDocIdsSnapshot =
immutableSegment.loadValidDocIdsFromSnapshot();
+ if (validDocIdsSnapshot != null) {
+ immutableSegment.enableUpsert(this, new
ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot), null);
+ } else {
+ _logger.warn("Failed to find snapshot from segment: {} which is out
of TTL, treating all documents as valid",
+ segmentName);
+ }
+ return;
+ }
+ }
if (_enableSnapshot) {
_snapshotLock.readLock().lock();
}
startOperation();
try {
- doAddSegment((ImmutableSegmentImpl) segment);
+ doAddSegment(immutableSegment);
_trackedSegments.add(segment);
} finally {
finishOperation();
@@ -365,6 +408,12 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
IndexSegment oldSegment) {
String segmentName = segment.getSegmentName();
Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType,
segmentName);
+
+ // Currently when TTL is enabled, we don't support skip loading out-of-TTL
segment with snapshots, since we don't
+ // know which docs are valid in the new segment.
+ // TODO: when ttl is enabled, we can allow
+ // (1) skip loading segments without any invalid docs.
+ // (2) assign the invalid docs from the replaced segment to the new
segment.
segmentLock.lock();
try {
MutableRoaringBitmap validDocIdsForOldSegment =
@@ -419,6 +468,16 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
return;
}
+ // Skip removing segment that has max comparison value smaller than
(largestSeenComparisonValue - TTL)
+ if (_largestSeenComparisonValue > 0) {
+ Number maxComparisonValue =
+ (Number)
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
+ if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue -
_metadataTTL) {
+ _logger.info("Skip removing segment: {} because it's out of TTL",
segmentName);
+ return;
+ }
+ }
+
if (_enableSnapshot) {
_snapshotLock.readLock().lock();
}
@@ -544,6 +603,63 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
numImmutableSegments, numTrackedSegments, System.currentTimeMillis() -
startTimeMs);
}
+ /**
+ * Loads watermark from the file if exists.
+ */
+ protected double loadWatermark() {
+ File watermarkFile = getWatermarkFile();
+ if (watermarkFile.exists()) {
+ try {
+ byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+ double watermark = ByteBuffer.wrap(bytes).getDouble();
+ _logger.info("Loaded watermark: {} from file for table: {}
partition_id: {}", watermark, _tableNameWithType,
+ _partitionId);
+ return watermark;
+ } catch (Exception e) {
+ _logger.warn("Caught exception while loading watermark file: {},
skipping", watermarkFile);
+ }
+ }
+ return Double.MIN_VALUE;
+ }
+
+ /**
+ * Persists watermark to the file.
+ */
+ protected void persistWatermark(double watermark) {
+ File watermarkFile = getWatermarkFile();
+ try {
+ if (watermarkFile.exists()) {
+ if (!FileUtils.deleteQuietly(watermarkFile)) {
+ _logger.warn("Cannot delete watermark file: {}, skipping",
watermarkFile);
+ return;
+ }
+ }
+ try (OutputStream outputStream = new FileOutputStream(watermarkFile,
false);
+ DataOutputStream dataOutputStream = new
DataOutputStream(outputStream)) {
+ dataOutputStream.writeDouble(watermark);
+ }
+ _logger.info("Persisted watermark: {} to file: {}", watermark,
watermarkFile);
+ } catch (Exception e) {
+ _logger.warn("Caught exception while persisting watermark file: {},
skipping", watermarkFile);
+ }
+ }
+
+ /**
+ * Deletes the watermark file.
+ */
+ protected void deleteWatermark() {
+ File watermarkFile = getWatermarkFile();
+ if (watermarkFile.exists()) {
+ if (!FileUtils.deleteQuietly(watermarkFile)) {
+ _logger.warn("Cannot delete watermark file: {}, skipping",
watermarkFile);
+ }
+ }
+ }
+
+ protected File getWatermarkFile() {
+ return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION
+ _partitionId);
+ }
+
protected void startOperation() {
_numPendingOperations.getAndIncrement();
}
@@ -556,6 +672,29 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
}
+ @Override
+ public void removeExpiredPrimaryKeys() {
+ if (_metadataTTL <= 0) {
+ return;
+ }
+ if (_stopped) {
+ _logger.info("Skip removing expired primary keys because metadata
manager is already stopped");
+ return;
+ }
+
+ startOperation();
+ try {
+ doRemoveExpiredPrimaryKeys();
+ } finally {
+ finishOperation();
+ }
+ }
+
+ /**
+ * Removes all primary keys that have comparison value smaller than
(largestSeenComparisonValue - TTL).
+ */
+ protected abstract void doRemoveExpiredPrimaryKeys();
+
@Override
public void stop() {
_stopped = true;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index c6434ec487..e23689ad09 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -67,6 +67,8 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
protected HashFunction _hashFunction;
protected PartialUpsertHandler _partialUpsertHandler;
protected boolean _enableSnapshot;
+ protected double _metadataTTL;
+ protected File _tableIndexDir;
protected ServerMetrics _serverMetrics;
private volatile boolean _isPreloading = false;
@@ -104,6 +106,8 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
}
_enableSnapshot = upsertConfig.isEnableSnapshot();
+ _metadataTTL = upsertConfig.getMetadataTTL();
+ _tableIndexDir = tableDataManager.getTableDataDir();
_serverMetrics = serverMetrics;
if (_enableSnapshot && segmentPreloadExecutor != null &&
upsertConfig.isEnablePreload()) {
// Preloading the segments with snapshots for fast upsert metadata
recovery.
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index cae8092b2f..3ce09412e3 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.upsert;
import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
@@ -59,9 +60,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType,
int partitionId,
List<String> primaryKeyColumns, List<String> comparisonColumns,
@Nullable String deleteRecordColumn,
HashFunction hashFunction, @Nullable PartialUpsertHandler
partialUpsertHandler, boolean enableSnapshot,
- ServerMetrics serverMetrics) {
+ double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
super(tableNameWithType, partitionId, primaryKeyColumns,
comparisonColumns, deleteRecordColumn, hashFunction,
- partialUpsertHandler, enableSnapshot, serverMetrics);
+ partialUpsertHandler, enableSnapshot, metadataTTL, tableIndexDir,
serverMetrics);
}
@Override
@@ -205,6 +206,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
@Override
protected void removeSegment(IndexSegment segment, MutableRoaringBitmap
validDocIds) {
assert !validDocIds.isEmpty();
+
PrimaryKey primaryKey = new PrimaryKey(new
Object[_primaryKeyColumns.size()]);
PeekableIntIterator iterator = validDocIds.getIntIterator();
try (
@@ -226,6 +228,17 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
}
}
+ @Override
+ public void doRemoveExpiredPrimaryKeys() {
+ double threshold = _largestSeenComparisonValue - _metadataTTL;
+ _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
+ if (((Number) recordLocation.getComparisonValue()).doubleValue() <
threshold) {
+ _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
+ }
+ });
+ persistWatermark(_largestSeenComparisonValue);
+ }
+
@Override
protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
ThreadSafeMutableRoaringBitmap validDocIds =
Objects.requireNonNull(segment.getValidDocIds());
@@ -234,6 +247,11 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
Comparable newComparisonValue = recordInfo.getComparisonValue();
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
_hashFunction),
(primaryKey, currentRecordLocation) -> {
+ // Update largestSeenComparisonValue when adding new record
+ if (_metadataTTL > 0) {
+ double comparisonValue = ((Number)
recordInfo.getComparisonValue()).doubleValue();
+ _largestSeenComparisonValue =
Math.max(_largestSeenComparisonValue, comparisonValue);
+ }
if (currentRecordLocation != null) {
// Existing primary key
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index b08ea591d1..3380203656 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -36,8 +36,8 @@ public class ConcurrentMapTableUpsertMetadataManager extends
BaseTableUpsertMeta
public ConcurrentMapPartitionUpsertMetadataManager
getOrCreatePartitionManager(int partitionId) {
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
k -> new
ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k,
_primaryKeyColumns,
- _comparisonColumns, _deleteRecordColumn, _hashFunction,
_partialUpsertHandler, _enableSnapshot,
- _serverMetrics));
+ _comparisonColumns, _deleteRecordColumn, _hashFunction,
_partialUpsertHandler,
+ _enableSnapshot, _metadataTTL, _tableIndexDir, _serverMetrics));
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index fbf864bd58..6ebd08c74c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -97,6 +97,11 @@ public interface PartitionUpsertMetadataManager extends
Closeable {
*/
void takeSnapshot();
+ /**
+ * Remove the expired primary keys from the metadata when TTL is enabled.
+ */
+ void removeExpiredPrimaryKeys();
+
/**
* Stops the metadata manager. After invoking this method, no access to the
metadata will be accepted.
*/
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 76cba334e5..d5741fa0b7 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -611,6 +611,35 @@ public final class TableConfigUtils {
}
}
validateAggregateMetricsForUpsertConfig(tableConfig);
+ validateTTLForUpsertConfig(tableConfig, schema);
+ }
+
+ /**
+ * Validates the upsert config related to TTL.
+ */
+ @VisibleForTesting
+ static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema
schema) {
+ UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+ if (upsertConfig == null || upsertConfig.getMetadataTTL() == 0) {
+ return;
+ }
+
+ List<String> comparisonColumns = upsertConfig.getComparisonColumns();
+ if (CollectionUtils.isNotEmpty(comparisonColumns)) {
+ Preconditions.checkState(comparisonColumns.size() == 1,
+ "Upsert TTL does not work with multiple comparison columns");
+ String comparisonColumn = comparisonColumns.get(0);
+ DataType comparisonColumnDataType =
schema.getFieldSpecFor(comparisonColumn).getDataType();
+ Preconditions.checkState(comparisonColumnDataType.isNumeric(),
+ "Upsert TTL must have comparison column: %s in numeric type, found:
%s", comparisonColumn,
+ comparisonColumnDataType);
+ }
+
+ Preconditions.checkState(upsertConfig.isEnableSnapshot(), "Upsert TTL must
have snapshot enabled");
+
+ // TODO: Support deletion for TTL. Need to construct queryableDocIds when
adding segments out of TTL.
+ Preconditions.checkState(upsertConfig.getDeleteRecordColumn() == null,
+ "Upsert TTL doesn't work with record deletion");
}
/**
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 466f1885d5..fe3907ada3 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.upsert;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -25,15 +26,19 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import
org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager.RecordLocation;
import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
@@ -45,7 +50,7 @@ import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
-import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
@@ -56,11 +61,21 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private static final String RAW_TABLE_NAME = "testTable";
private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+ private static final File INDEX_DIR =
+ new File(FileUtils.getTempDirectory(),
"ConcurrentMapPartitionUpsertMetadataManagerTest");
+
+ @BeforeClass
+ public void setup()
+ throws Exception {
+ FileUtils.deleteQuietly(INDEX_DIR);
+ FileUtils.forceMkdir(INDEX_DIR);
+ }
@Test
public void testAddReplaceRemoveSegment()
@@ -73,12 +88,27 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
verifyAddReplaceRemoveSegment(HashFunction.MURMUR3, true);
}
+ @Test
+ public void testUpsertMetadataCleanupWithTTLConfig() {
+ verifyRemoveExpiredPrimaryKeys(new Integer(80), new Integer(120));
+ verifyRemoveExpiredPrimaryKeys(new Float(80), new Float(120));
+ verifyRemoveExpiredPrimaryKeys(new Double(80), new Double(120));
+ verifyRemoveExpiredPrimaryKeys(new Long(80), new Long(120));
+ verifyPersistAndLoadWatermark();
+ verifyAddSegmentForTTL(new Integer(80));
+ verifyAddSegmentForTTL(new Float(80));
+ verifyAddSegmentForTTL(new Double(80));
+ verifyAddSegmentForTTL(new Long(80));
+ verifyAddOutOfTTLSegment();
+ }
+
private void verifyAddReplaceRemoveSegment(HashFunction hashFunction,
boolean enableSnapshot)
throws IOException {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction,
null, false, mock(ServerMetrics.class));
+ Collections.singletonList(comparisonColumn), null, hashFunction,
null, false, 0, INDEX_DIR,
+ mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -240,7 +270,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
String deleteRecordColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteRecordColumn,
hashFunction, null, false,
+ Collections.singletonList(comparisonColumn), deleteRecordColumn,
hashFunction, null, false, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -309,7 +339,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
- Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 3});
// Add an empty segment
@@ -324,7 +354,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
- Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 3});
// Replace (reload) the first segment
@@ -347,9 +377,9 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
- Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 3});
- Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
// Remove the original segment1
upsertMetadataManager.removeSegment(segment1);
@@ -363,9 +393,9 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
- Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 3});
- Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
// Remove the empty segment
upsertMetadataManager.removeSegment(emptySegment);
@@ -379,7 +409,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 3});
- Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
// Remove segment2
upsertMetadataManager.removeSegment(segment2);
@@ -391,7 +421,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(trackedSegments, Collections.singleton(newSegment1));
assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 3});
- Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
// Stop the metadata manager
upsertMetadataManager.stop();
@@ -403,7 +433,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(trackedSegments, Collections.singleton(newSegment1));
- Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
// Close the metadata manager
upsertMetadataManager.close();
@@ -458,6 +488,25 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
return segment;
}
+ private static ImmutableSegmentImpl mockImmutableSegmentWithEndTime(int
sequenceNumber,
+ ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
+ List<PrimaryKey> primaryKeys, List<String> comparisonColumns, Comparable
endTime, MutableRoaringBitmap snapshot) {
+ ImmutableSegmentImpl segment = mockImmutableSegment(sequenceNumber,
validDocIds, queryableDocIds, primaryKeys);
+ SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+ when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+ ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
+ when(segmentMetadata.getColumnMetadataMap()).thenReturn(new TreeMap() {{
+ this.put(comparisonColumns.get(0), columnMetadata);
+ }});
+ when(columnMetadata.getMaxValue()).thenReturn(endTime);
+ if (snapshot != null) {
+ when(segment.loadValidDocIdsFromSnapshot()).thenReturn(snapshot);
+ } else {
+
when(segment.loadValidDocIdsFromSnapshot()).thenReturn(validDocIds.getMutableRoaringBitmap());
+ }
+ return segment;
+ }
+
private static EmptyIndexSegment mockEmptySegment(int sequenceNumber) {
SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
when(segmentMetadata.getName()).thenReturn(getSegmentName(sequenceNumber));
@@ -504,7 +553,8 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction,
null, false, mock(ServerMetrics.class));
+ Collections.singletonList(comparisonColumn), null, hashFunction,
null, false, 0, INDEX_DIR,
+ mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
@@ -594,7 +644,8 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction,
null, false, mock(ServerMetrics.class));
+ Collections.singletonList(comparisonColumn), null, hashFunction,
null, false, 0, INDEX_DIR,
+ mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
@@ -642,13 +693,14 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
verifyAddRecordWithDeleteColumn(HashFunction.MD5);
verifyAddRecordWithDeleteColumn(HashFunction.MURMUR3);
}
+
private void verifyAddRecordWithDeleteColumn(HashFunction hashFunction)
throws IOException {
String comparisonColumn = "timeCol";
String deleteColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteColumn,
hashFunction, null, false,
+ Collections.singletonList(comparisonColumn), deleteColumn,
hashFunction, null, false, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -744,6 +796,203 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.close();
}
+ private void verifyRemoveExpiredPrimaryKeys(Comparable
earlierComparisonValue, Comparable largerComparisonValue) {
+ File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
+ Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, false, 30, tableDir,
+ mock(ServerMetrics.class));
+ Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation>
recordLocationMap =
+ upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+ // Add record to update largestSeenTimestamp, largest seen timestamp:
earlierComparisonValue
+ ThreadSafeMutableRoaringBitmap validDocIds0 = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null);
+ upsertMetadataManager.addRecord(segment0, new
RecordInfo(makePrimaryKey(10), 1, earlierComparisonValue, false));
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80,
HashFunction.NONE);
+
+ // Add a segment with segmentEndTime = earlierComparisonValue, so it will
not be skipped
+ int numRecords = 4;
+ int[] primaryKeys = new int[]{0, 1, 2, 3};
+ Number[] timestamps = new Number[]{100, 100, 120, 80};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1,
Collections.singletonList("timeCol"),
+ earlierComparisonValue, null);
+
+ int[] docIds1 = new int[]{0, 1, 2, 3};
+ MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+ validDocIdsSnapshot1.add(docIds1);
+
+ // load segment1.
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+ getRecordInfoListForTTL(numRecords, primaryKeys,
timestamps).iterator());
+ assertEquals(recordLocationMap.size(), 5);
+ checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80,
HashFunction.NONE);
+
+ // Add record to update largestSeenTimestamp, largest seen timestamp:
largerComparisonValue
+ upsertMetadataManager.addRecord(segment0, new
RecordInfo(makePrimaryKey(10), 0, largerComparisonValue, false));
+ assertEquals(recordLocationMap.size(), 5);
+ checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 0, 120,
HashFunction.NONE);
+
+ // records before (largest seen timestamp - TTL) are expired and removed
from upsertMetadata.
+ upsertMetadataManager.removeExpiredPrimaryKeys();
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 0, 120,
HashFunction.NONE);
+
+ // ValidDocIds for out-of-ttl records should not be removed.
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2, 3});
+ }
+
+ private void verifyAddOutOfTTLSegment() {
+ File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
+ Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, true, 30, tableDir,
+ mock(ServerMetrics.class));
+ Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation>
recordLocationMap =
+ upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+ // Add record to update largestSeenTimestamp, largest seen timestamp: 80
+ ThreadSafeMutableRoaringBitmap validDocIds0 = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null);
+ upsertMetadataManager.addRecord(segment0, new
RecordInfo(makePrimaryKey(10), 1, new Double(80), false));
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80,
HashFunction.NONE);
+
+ // Add a segment with segmentEndTime = 80, so it will not be skipped
+ int numRecords = 4;
+ int[] primaryKeys = new int[]{0, 1, 2, 3};
+ Number[] timestamps = new Number[]{100, 100, 120, 80};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1,
Collections.singletonList("timeCol"),
+ new Double(80), null);
+
+ int[] docIds1 = new int[]{0, 1, 2, 3};
+ MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+ validDocIdsSnapshot1.add(docIds1);
+
+ // load segment1 with segmentEndTime: 80, largest seen timestamp: 80. the
segment will be loaded.
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+ getRecordInfoListForTTL(numRecords, primaryKeys,
timestamps).iterator());
+ assertEquals(recordLocationMap.size(), 5);
+ checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80,
HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2, 3});
+
+ // Add record to update largestSeenTimestamp, largest seen timestamp: 120
+ upsertMetadataManager.addRecord(segment0, new
RecordInfo(makePrimaryKey(0), 0, new Double(120), false));
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{1, 2, 3});
+ assertEquals(recordLocationMap.size(), 5);
+ checkRecordLocationForTTL(recordLocationMap, 0, segment0, 0, 120,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80,
HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80,
HashFunction.NONE);
+
+ // Add an out-of-ttl segment, verify all the invalid docs should not show
up again.
+ // Add a segment with segmentEndTime: 80, largest seen timestamp: 120. the
segment will be skipped.
+ List<PrimaryKey> primaryKeys2 = getPrimaryKeyList(numRecords, new
int[]{100, 101, 102, 103});
+ int[] docIds2 = new int[]{0, 1};
+ MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+ validDocIdsSnapshot2.add(docIds2);
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment2 =
+ mockImmutableSegmentWithEndTime(1, validDocIds2, null, primaryKeys2,
Collections.singletonList("timeCol"),
+ new Double(80), validDocIdsSnapshot2);
+ upsertMetadataManager.addSegment(segment2);
+ // out of ttl segment should not be added to recordLocationMap
+ assertEquals(recordLocationMap.size(), 5);
+ }
+
+ private void verifyAddSegmentForTTL(Comparable comparisonValue) {
+ File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
+ Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, true, 30, tableDir,
+ mock(ServerMetrics.class));
+ Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation>
recordLocationMap =
+ upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+ // Add record to update largestSeenTimestamp, largest seen timestamp:
comparisonValue
+ ThreadSafeMutableRoaringBitmap validDocIds0 = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null);
+ upsertMetadataManager.addRecord(segment0, new
RecordInfo(makePrimaryKey(10), 1, comparisonValue, false));
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80,
HashFunction.NONE);
+
+ // add a segment with segmentEndTime = -1 so it will be skipped since it
out-of-TTL
+ int numRecords = 4;
+ int[] primaryKeys = new int[]{0, 1, 2, 3};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1,
Collections.singletonList("timeCol"), -1,
+ null);
+
+ int[] docIds1 = new int[]{0, 1, 2, 3};
+ MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+ validDocIdsSnapshot1.add(docIds1);
+
+ // load segment1.
+ upsertMetadataManager.addSegment(segment1);
+ assertEquals(recordLocationMap.size(), 1);
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80,
HashFunction.NONE);
+ }
+
+ // Add the following utils function since the Comparison column is a long
value for TTL enabled upsert table.
+ private List<RecordInfo> getRecordInfoListForTTL(int numRecords, int[]
primaryKeys, Number[] timestamps) {
+ List<RecordInfo> recordInfoList = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ recordInfoList.add(
+ new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new
Double(timestamps[i].doubleValue()), false));
+ }
+ return recordInfoList;
+ }
+
+ // Add the following utils function since the Comparison column is a long
value for TTL enabled upsert table.
+ private static void checkRecordLocationForTTL(Map<Object, RecordLocation>
recordLocationMap, int keyValue,
+ IndexSegment segment, int docId, Number comparisonValue, HashFunction
hashFunction) {
+ RecordLocation recordLocation =
+
recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue),
hashFunction));
+ assertNotNull(recordLocation);
+ assertSame(recordLocation.getSegment(), segment);
+ assertEquals(recordLocation.getDocId(), docId);
+ assertEquals(((Number) recordLocation.getComparisonValue()).doubleValue(),
comparisonValue.doubleValue());
+ }
+
+ private void verifyPersistAndLoadWatermark() {
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
+ Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, true, 10, INDEX_DIR,
+ mock(ServerMetrics.class));
+
+ double currentTimeMs = System.currentTimeMillis();
+ upsertMetadataManager.persistWatermark(currentTimeMs);
+ assertTrue(new File(INDEX_DIR, V1Constants.TTL_WATERMARK_TABLE_PARTITION +
0).exists());
+
+ double watermark = upsertMetadataManager.loadWatermark();
+ assertEquals(watermark, currentTimeMs);
+ }
+
@Test
public void testHashPrimaryKey() {
PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"});
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 89f380d049..1f6834f62c 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1810,6 +1810,91 @@ public class TableConfigUtilsTest {
}
}
+ @Test
+ public void testValidateTTLConfigForUpsertConfig() {
+ // Default comparison column (timestamp)
+ Schema schema =
+ new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
FieldSpec.DataType.STRING)
+ .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setMetadataTTL(3600);
+ upsertConfig.setEnableSnapshot(true);
+ TableConfig tableConfigWithoutComparisonColumn =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+ .setUpsertConfig(upsertConfig).build();
+
TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithoutComparisonColumn,
schema);
+
+ // Invalid comparison columns: "myCol"
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setComparisonColumns(Collections.singletonList("myCol"));
+ upsertConfig.setEnableSnapshot(true);
+ upsertConfig.setMetadataTTL(3600);
+ TableConfig tableConfigWithInvalidComparisonColumn =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+ .setUpsertConfig(upsertConfig).build();
+ try {
+
TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithInvalidComparisonColumn,
schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ // Expected
+ }
+
+ // Invalid comparison columns: multiple comparison columns are not
supported for TTL-enabled upsert table.
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setComparisonColumns(Lists.newArrayList(TIME_COLUMN,
"myCol"));
+ upsertConfig.setEnableSnapshot(true);
+ upsertConfig.setMetadataTTL(3600);
+ TableConfig tableConfigWithInvalidComparisonColumn2 =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+ .setUpsertConfig(upsertConfig).build();
+ try {
+
TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithInvalidComparisonColumn2,
schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ // Expected
+ }
+
+ // Invalid config with TTLConfig but Snapshot is not enabled
+ schema =
+ new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
FieldSpec.DataType.STRING)
+ .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setMetadataTTL(3600);
+ upsertConfig.setEnableSnapshot(false);
+ TableConfig tableConfigWithInvalidTTLConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+ .setUpsertConfig(upsertConfig).build();
+ try {
+
TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithInvalidTTLConfig,
schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ // Expected
+ }
+
+ // Invalid config with both delete and TTL enabled
+ String delCol = "myDelCol";
+ schema =
+ new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
FieldSpec.DataType.STRING)
+ .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .addSingleValueDimension(delCol, FieldSpec.DataType.STRING)
+ .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setMetadataTTL(3600);
+ upsertConfig.setEnableSnapshot(true);
+ upsertConfig.setDeleteRecordColumn(delCol);
+ TableConfig tableConfigWithBothDeleteAndTTL =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+ .setUpsertConfig(upsertConfig).build();
+ try {
+
TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithBothDeleteAndTTL,
schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ // Expected
+ }
+ }
+
@Test
public void testValidatePartitionedReplicaGroupInstance() {
String partitionColumn = "testPartitionCol";
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index 38d2e21264..9f2c02fddd 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -26,6 +26,7 @@ public class V1Constants {
public static final String INDEX_MAP_FILE_NAME = "index_map";
public static final String INDEX_FILE_NAME = "columns.psf";
public static final String VALID_DOC_IDS_SNAPSHOT_FILE_NAME =
"validdocids.bitmap.snapshot";
+ public static final String TTL_WATERMARK_TABLE_PARTITION =
"ttl.watermark.partition.";
public static class Str {
public static final char DEFAULT_STRING_PAD_CHAR = '\0';
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/ThreadSafeMutableRoaringBitmap.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/ThreadSafeMutableRoaringBitmap.java
index 58631531f3..05ab86628d 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/ThreadSafeMutableRoaringBitmap.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/ThreadSafeMutableRoaringBitmap.java
@@ -36,6 +36,10 @@ public class ThreadSafeMutableRoaringBitmap {
_mutableRoaringBitmap.add(firstDocId);
}
+ public ThreadSafeMutableRoaringBitmap(MutableRoaringBitmap
mutableRoaringBitmap) {
+ _mutableRoaringBitmap = mutableRoaringBitmap;
+ }
+
public synchronized void add(int docId) {
_mutableRoaringBitmap.add(docId);
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index a9981de4ff..2e2a3f645a 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -60,6 +60,9 @@ public class UpsertConfig extends BaseJsonConfig {
@JsonPropertyDescription("Whether to use snapshot for fast upsert metadata
recovery")
private boolean _enableSnapshot;
+ @JsonPropertyDescription("Whether to use TTL for upsert metadata cleanup, it
uses the same unit as comparison col")
+ private double _metadataTTL;
+
@JsonPropertyDescription("Whether to preload segments for fast upsert
metadata recovery")
private boolean _enablePreload;
@@ -111,6 +114,10 @@ public class UpsertConfig extends BaseJsonConfig {
return _enableSnapshot;
}
+ public double getMetadataTTL() {
+ return _metadataTTL;
+ }
+
public boolean isEnablePreload() {
return _enablePreload;
}
@@ -179,6 +186,10 @@ public class UpsertConfig extends BaseJsonConfig {
_enableSnapshot = enableSnapshot;
}
+ public void setMetadataTTL(double metadataTTL) {
+ _metadataTTL = metadataTTL;
+ }
+
public void setEnablePreload(boolean enablePreload) {
_enablePreload = enablePreload;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]