This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bd32dc9a64 Ensure upsert deletion consistency when enabled with
compaction flow (#13347)
bd32dc9a64 is described below
commit bd32dc9a642fbbdfef45e066ed00e97f66b80a5d
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Fri Aug 16 04:03:42 2024 +0530
Ensure upsert deletion consistency when enabled with compaction flow
(#13347)
---
.../upsert/BaseTableUpsertMetadataManager.java | 6 +-
...nUpsertMetadataManagerForConsistentDeletes.java | 389 +++++++
.../ConcurrentMapTableUpsertMetadataManager.java | 12 +-
.../pinot/segment/local/upsert/UpsertContext.java | 30 +-
.../pinot/segment/local/upsert/UpsertUtils.java | 20 +
.../segment/local/utils/TableConfigUtils.java | 25 +
...ertMetadataManagerForConsistentDeletesTest.java | 1212 ++++++++++++++++++++
.../TableUpsertMetadataManagerFactoryTest.java | 60 +
.../segment/local/utils/TableConfigUtilsTest.java | 77 ++
.../pinot/spi/config/table/UpsertConfig.java | 11 +
10 files changed, 1834 insertions(+), 8 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 6d77bbc535..400519079b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -41,6 +41,7 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
protected UpsertContext _context;
protected UpsertConfig.ConsistencyMode _consistencyMode;
protected boolean _enablePreload;
+ protected boolean _enableDeletedKeysCompactionConsistency;
@Override
public void init(TableConfig tableConfig, Schema schema, TableDataManager
tableDataManager) {
@@ -71,6 +72,7 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
enableSnapshot && upsertConfig.isEnablePreload() &&
tableDataManager.getSegmentPreloadExecutor() != null;
double metadataTTL = upsertConfig.getMetadataTTL();
double deletedKeysTTL = upsertConfig.getDeletedKeysTTL();
+ _enableDeletedKeysCompactionConsistency =
upsertConfig.isEnableDeletedKeysCompactionConsistency();
_consistencyMode = upsertConfig.getConsistencyMode();
if (_consistencyMode == null) {
_consistencyMode = UpsertConfig.ConsistencyMode.NONE;
@@ -83,7 +85,9 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
.setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot)
.setEnablePreload(_enablePreload).setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL)
.setConsistencyMode(_consistencyMode).setUpsertViewRefreshIntervalMs(upsertViewRefreshIntervalMs)
-
.setTableIndexDir(tableIndexDir).setTableDataManager(tableDataManager).build();
+
.setTableIndexDir(tableIndexDir).setDropOutOfOrderRecord(upsertConfig.isDropOutOfOrderRecord())
+
.setEnableDeletedKeysCompactionConsistency(_enableDeletedKeysCompactionConsistency)
+ .setTableDataManager(tableDataManager).build();
LOGGER.info(
"Initialized {} for table: {} with primary key columns: {}, comparison
columns: {}, delete record column: {},"
+ " hash function: {}, upsert mode: {}, enable snapshot: {},
enable preload: {}, metadata TTL: {},"
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
new file mode 100644
index 0000000000..7e8f510737
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.common.metrics.ServerMeter;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.LazyRow;
+import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+/**
+ * Implementation of {@link PartitionUpsertMetadataManager} that is backed by
a {@link ConcurrentHashMap} and ensures
+ * consistent deletions. This should be used when the table is configured with
'enableConsistentDeletes' set to true.
+ *
+ * Consistent deletion ensures that when deletedKeysTTL is enabled with
UpsertCompaction, the key metadata is
+ * removed from the HashMap only after all other records in the old segments
are compacted. This guarantees
+ * data consistency. Without this, there can be a scenario where a deleted
record is compacted first, while an
+ * old record remains non-compacted in a previous segment. During a server
restart, this could lead to the old
+ * record reappearing. For the end-user, this would result in a data loss or
inconsistency scenario, as the
+ * record was marked for deletion.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+@ThreadSafe
+public class ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
+ extends BasePartitionUpsertMetadataManager {
+
+ @VisibleForTesting
+ final ConcurrentHashMap<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
+ _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+ // Used to initialize a reference to previous row for merging in partial
upsert
+ private final LazyRow _reusePreviousRow = new LazyRow();
+ private final Map<String, Object> _reuseMergeResultHolder = new HashMap<>();
+
+ public
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(String
tableNameWithType, int partitionId,
+ UpsertContext context) {
+ super(tableNameWithType, partitionId, context);
+ }
+
+ @Override
+ protected long getNumPrimaryKeys() {
+ return _primaryKeyToRecordLocationMap.size();
+ }
+
+ @Override
+ protected void doAddOrReplaceSegment(ImmutableSegmentImpl segment,
ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds,
Iterator<RecordInfo> recordInfoIterator,
+ @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap
validDocIdsForOldSegment) {
+ String segmentName = segment.getSegmentName();
+ segment.enableUpsert(this, validDocIds, queryableDocIds);
+
+ AtomicInteger numKeysInWrongSegment = new AtomicInteger();
+ while (recordInfoIterator.hasNext()) {
+ RecordInfo recordInfo = recordInfoIterator.next();
+ int newDocId = recordInfo.getDocId();
+ Comparable newComparisonValue = recordInfo.getComparisonValue();
+
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
_hashFunction),
+ (primaryKey, currentRecordLocation) -> {
+ if (currentRecordLocation != null) {
+ // Existing primary key
+ IndexSegment currentSegment = currentRecordLocation.getSegment();
+ int currentDocId = currentRecordLocation.getDocId();
+ Comparable currentComparisonValue =
currentRecordLocation.getComparisonValue();
+ int comparisonResult =
newComparisonValue.compareTo(currentComparisonValue);
+ int currentDistinctSegmentCount =
currentRecordLocation.getDistinctSegmentCount();
+
+ // The current record is in the same segment
+ // Update the record location when there is a tie to keep the
newer record. Note that the record info
+ // iterator will return records with incremental doc ids.
+ if (currentSegment == segment) {
+ if (comparisonResult >= 0) {
+ replaceDocId(segment, validDocIds, queryableDocIds,
currentDocId, newDocId, recordInfo);
+ return new
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation(segment,
+ newDocId, newComparisonValue,
currentDistinctSegmentCount);
+ } else {
+ return currentRecordLocation;
+ }
+ }
+
+ // The current record is in an old segment being replaced
+ // This could happen when committing a consuming segment, or
reloading a completed segment. In this
+ // case, we want to update the record location when there is a
tie because the record locations should
+ // point to the new added segment instead of the old segment
being replaced. Also, do not update the valid
+ // doc ids for the old segment because it has not been replaced
yet. We pass in an optional valid doc ids
+ // snapshot for the old segment, which can be updated and used
to track the docs not replaced yet.
+ if (currentSegment == oldSegment) {
+ if (comparisonResult >= 0) {
+ addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
+ if (validDocIdsForOldSegment != null) {
+ validDocIdsForOldSegment.remove(currentDocId);
+ }
+ return new RecordLocation(segment, newDocId,
newComparisonValue,
+
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
+ } else {
+ return new RecordLocation(currentSegment, currentDocId,
currentComparisonValue,
+
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
+ }
+ }
+
+ // This should not happen because the previously replaced
segment should have all keys removed. We still
+ // handle it here, and also track the number of keys not
properly replaced previously.
+ String currentSegmentName = currentSegment.getSegmentName();
+ if (currentSegmentName.equals(segmentName)) {
+ numKeysInWrongSegment.getAndIncrement();
+ if (comparisonResult >= 0) {
+ addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
+ return new RecordLocation(segment, newDocId,
newComparisonValue,
+
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
+ } else {
+ return currentRecordLocation;
+ }
+ }
+
+ // The current record is in a different segment
+ // Update the record location when getting a newer comparison
value, or the value is the same as the
+ // current value, but the segment has a larger sequence number
(the segment is newer than the current
+ // segment).
+ if (comparisonResult > 0 || (comparisonResult == 0 &&
shouldReplaceOnComparisonTie(segmentName,
+ currentSegmentName,
segment.getSegmentMetadata().getIndexCreationTime(),
+
currentSegment.getSegmentMetadata().getIndexCreationTime()))) {
+ replaceDocId(segment, validDocIds, queryableDocIds,
currentSegment, currentDocId, newDocId, recordInfo);
+ return new RecordLocation(segment, newDocId,
newComparisonValue,
+
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
+ } else {
+ return new RecordLocation(currentSegment, currentDocId,
currentComparisonValue,
+
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
+ }
+ } else {
+ // New primary key
+ addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
+ return new RecordLocation(segment, newDocId, newComparisonValue,
1);
+ }
+ });
+ }
+ int numKeys = numKeysInWrongSegment.get();
+ if (numKeys > 0) {
+ _logger.warn("Found {} primary keys in the wrong segment when adding
segment: {}", numKeys, segmentName);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.UPSERT_KEYS_IN_WRONG_SEGMENT, numKeys);
+ }
+ }
+
+ @Override
+ protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment,
ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds,
Iterator<RecordInfo> recordInfoIterator) {
+ throw new UnsupportedOperationException("Consistent-deletion does not
support preloading of segments.");
+ }
+
+ @Override
+ protected void doRemoveSegment(IndexSegment segment) {
+ String segmentName = segment.getSegmentName();
+ _logger.info("Removing {} segment: {}, current primary key count: {}",
+ segment instanceof ImmutableSegment ? "immutable" : "mutable",
segmentName, getNumPrimaryKeys());
+ long startTimeMs = System.currentTimeMillis();
+
+ try (
+ UpsertUtils.PrimaryKeyReader primaryKeyReader = new
UpsertUtils.PrimaryKeyReader(segment, _primaryKeyColumns)) {
+ removeSegment(segment,
+ UpsertUtils.getPrimaryKeyIterator(primaryKeyReader,
segment.getSegmentMetadata().getTotalDocs()));
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Caught exception while removing segment: %s, table:
%s", segment.getSegmentName(),
+ _tableNameWithType), e);
+ }
+
+ // Update metrics
+ long numPrimaryKeys = getNumPrimaryKeys();
+ updatePrimaryKeyGauge(numPrimaryKeys);
+ _logger.info("Finished removing segment: {} in {}ms, current primary key
count: {}", segmentName,
+ System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
+ }
+
+ @Override
+ protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey>
primaryKeyIterator) {
+ // We need to decrease the distinctSegmentCount for each unique primary
key in this deleting segment by 1
+ // as the occurrence of the key in this segment is being removed. We are
taking a set of unique primary keys
+ // to avoid double counting the same key in the same segment.
+ Set<Object> uniquePrimaryKeys = new HashSet<>();
+ while (primaryKeyIterator.hasNext()) {
+ PrimaryKey primaryKey = primaryKeyIterator.next();
+
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
_hashFunction),
+ (pk, recordLocation) -> {
+ if (recordLocation.getSegment() == segment) {
+ return null;
+ }
+ if (!uniquePrimaryKeys.add(pk)) {
+ return recordLocation;
+ }
+ return new RecordLocation(recordLocation.getSegment(),
recordLocation.getDocId(),
+ recordLocation.getComparisonValue(),
+
RecordLocation.decrementSegmentCount(recordLocation.getDistinctSegmentCount()));
+ });
+ }
+ }
+
+ @Override
+ public void doRemoveExpiredPrimaryKeys() {
+ AtomicInteger numDeletedTTLKeysRemoved = new AtomicInteger();
+ double largestSeenComparisonValue = _largestSeenComparisonValue.get();
+ double deletedKeysThreshold;
+ if (_deletedKeysTTL > 0) {
+ deletedKeysThreshold = largestSeenComparisonValue - _deletedKeysTTL;
+ } else {
+ deletedKeysThreshold = Double.MIN_VALUE;
+ }
+
+ _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
+ double comparisonValue = ((Number)
recordLocation.getComparisonValue()).doubleValue();
+ // We need to verify that the record belongs to only one segment. If a
record is part of multiple segments,
+ // an issue can arise where the upsert compaction might first process
the segment containing the delete record
+ // while the previous segment(s) are not compacted. Upon restart, this
can inadvertently revive the key
+ // that was originally marked for deletion.
+ if (_deletedKeysTTL > 0 && comparisonValue < deletedKeysThreshold
+ && recordLocation.getDistinctSegmentCount() <= 1) {
+ ThreadSafeMutableRoaringBitmap currentQueryableDocIds =
recordLocation.getSegment().getQueryableDocIds();
+ // if key not part of queryable doc id, it means it is deleted
+ if (currentQueryableDocIds != null &&
!currentQueryableDocIds.contains(recordLocation.getDocId())) {
+ _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
+ removeDocId(recordLocation.getSegment(), recordLocation.getDocId());
+ numDeletedTTLKeysRemoved.getAndIncrement();
+ }
+ }
+ });
+
+ // Update metrics
+ updatePrimaryKeyGauge();
+ int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get();
+ if (numDeletedTTLKeys > 0) {
+ _logger.info("Deleted {} primary keys based on deletedKeysTTL",
numDeletedTTLKeys);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED,
+ numDeletedTTLKeys);
+ }
+ }
+
+ @Override
+ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo)
{
+ AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false);
+ ThreadSafeMutableRoaringBitmap validDocIds =
Objects.requireNonNull(segment.getValidDocIds());
+ ThreadSafeMutableRoaringBitmap queryableDocIds =
segment.getQueryableDocIds();
+ int newDocId = recordInfo.getDocId();
+ Comparable newComparisonValue = recordInfo.getComparisonValue();
+
+ // When TTL is enabled, update largestSeenComparisonValue when adding new
record
+ if (_deletedKeysTTL > 0) {
+ double comparisonValue = ((Number) newComparisonValue).doubleValue();
+ _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v,
comparisonValue));
+ }
+
+
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
_hashFunction),
+ (primaryKey, currentRecordLocation) -> {
+ if (currentRecordLocation != null) {
+ // Existing primary key
+ IndexSegment currentSegment = currentRecordLocation.getSegment();
+ // Update the record location when the new comparison value is
greater than or equal to the current value.
+ // Update the record location when there is a tie to keep the
newer record.
+ if
(newComparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0)
{
+ int currentDocId = currentRecordLocation.getDocId();
+ if (segment == currentSegment) {
+ replaceDocId(segment, validDocIds, queryableDocIds,
currentDocId, newDocId, recordInfo);
+ return new RecordLocation(segment, newDocId,
newComparisonValue,
+ currentRecordLocation.getDistinctSegmentCount());
+ } else {
+ replaceDocId(segment, validDocIds, queryableDocIds,
currentSegment, currentDocId, newDocId, recordInfo);
+ return new RecordLocation(segment, newDocId,
newComparisonValue,
+
RecordLocation.incrementSegmentCount(currentRecordLocation.getDistinctSegmentCount()));
+ }
+ } else {
+ // Out-of-order record
+
handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(),
recordInfo.getComparisonValue());
+ isOutOfOrderRecord.set(true);
+ if (segment == currentSegment) {
+ return currentRecordLocation;
+ } else {
+ return new RecordLocation(currentSegment,
currentRecordLocation.getDocId(),
+ currentRecordLocation.getComparisonValue(),
+ _context.isDropOutOfOrderRecord() ?
currentRecordLocation.getDistinctSegmentCount()
+ :
RecordLocation.incrementSegmentCount(currentRecordLocation.getDistinctSegmentCount()));
+ }
+ }
+ } else {
+ // New primary key
+ addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
+ return new RecordLocation(segment, newDocId, newComparisonValue,
1);
+ }
+ });
+
+ updatePrimaryKeyGauge();
+ return !isOutOfOrderRecord.get();
+ }
+
+ @Override
+ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo
recordInfo) {
+ assert _partialUpsertHandler != null;
+
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
_hashFunction),
+ (pk, recordLocation) -> {
+ // Read the previous record if the following conditions are met:
+ // - New record is not a DELETE record
+ // - New record is not out-of-order
+ // - Previous record is not deleted
+ if (!recordInfo.isDeleteRecord()
+ &&
recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue())
>= 0) {
+ IndexSegment currentSegment = recordLocation.getSegment();
+ ThreadSafeMutableRoaringBitmap currentQueryableDocIds =
currentSegment.getQueryableDocIds();
+ int currentDocId = recordLocation.getDocId();
+ if (currentQueryableDocIds == null ||
currentQueryableDocIds.contains(currentDocId)) {
+ _reusePreviousRow.init(currentSegment, currentDocId);
+ _partialUpsertHandler.merge(_reusePreviousRow, record,
_reuseMergeResultHolder);
+ _reuseMergeResultHolder.clear();
+ }
+ }
+ return recordLocation;
+ });
+ return record;
+ }
+
+ @VisibleForTesting
+ static class RecordLocation {
+ private final IndexSegment _segment;
+ private final int _docId;
+ private final Comparable _comparisonValue;
+ // The number of distinct segments in which the record is present. If this
count is less than or equal to 1,
+ // we proceed to remove the record from the primary hashmap during the
deletedKeysTTL process.
+ private final int _distinctSegmentCount;
+
+ public RecordLocation(IndexSegment indexSegment, int docId, Comparable
comparisonValue, int distinctSegmentCount) {
+ _segment = indexSegment;
+ _docId = docId;
+ _comparisonValue = comparisonValue;
+ _distinctSegmentCount = distinctSegmentCount;
+ }
+
+ public static int incrementSegmentCount(int count) {
+ return count + 1;
+ }
+
+ public static int decrementSegmentCount(int count) {
+ return count - 1;
+ }
+
+ public IndexSegment getSegment() {
+ return _segment;
+ }
+
+ public int getDocId() {
+ return _docId;
+ }
+
+ public Comparable getComparisonValue() {
+ return _comparisonValue;
+ }
+
+ public int getDistinctSegmentCount() {
+ return _distinctSegmentCount;
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 7b216acfc3..45a6487a82 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -35,18 +35,20 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
*/
@ThreadSafe
public class ConcurrentMapTableUpsertMetadataManager extends
BaseTableUpsertMetadataManager {
- private final Map<Integer, ConcurrentMapPartitionUpsertMetadataManager>
_partitionMetadataManagerMap =
+ private final Map<Integer, BasePartitionUpsertMetadataManager>
_partitionMetadataManagerMap =
new ConcurrentHashMap<>();
@Override
- public ConcurrentMapPartitionUpsertMetadataManager
getOrCreatePartitionManager(int partitionId) {
+ public BasePartitionUpsertMetadataManager getOrCreatePartitionManager(int
partitionId) {
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
- k -> new
ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _context));
+ k -> _enableDeletedKeysCompactionConsistency
+ ? new
ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _context)
+ : new
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(_tableNameWithType,
k, _context));
}
@Override
public void stop() {
- for (ConcurrentMapPartitionUpsertMetadataManager metadataManager :
_partitionMetadataManagerMap.values()) {
+ for (BasePartitionUpsertMetadataManager metadataManager :
_partitionMetadataManagerMap.values()) {
metadataManager.stop();
}
}
@@ -82,7 +84,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends
BaseTableUpsertMeta
@Override
public void close()
throws IOException {
- for (ConcurrentMapPartitionUpsertMetadataManager metadataManager :
_partitionMetadataManagerMap.values()) {
+ for (BasePartitionUpsertMetadataManager metadataManager :
_partitionMetadataManagerMap.values()) {
metadataManager.close();
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
index 837d94a963..69323c0e65 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
@@ -45,13 +45,16 @@ public class UpsertContext {
private final UpsertConfig.ConsistencyMode _consistencyMode;
private final long _upsertViewRefreshIntervalMs;
private final File _tableIndexDir;
+ private final boolean _dropOutOfOrderRecord;
+ private final boolean _enableDeletedKeysCompactionConsistency;
private final TableDataManager _tableDataManager;
private UpsertContext(TableConfig tableConfig, Schema schema, List<String>
primaryKeyColumns,
List<String> comparisonColumns, @Nullable String deleteRecordColumn,
HashFunction hashFunction,
@Nullable PartialUpsertHandler partialUpsertHandler, boolean
enableSnapshot, boolean enablePreload,
double metadataTTL, double deletedKeysTTL, UpsertConfig.ConsistencyMode
consistencyMode,
- long upsertViewRefreshIntervalMs, File tableIndexDir, @Nullable
TableDataManager tableDataManager) {
+ long upsertViewRefreshIntervalMs, File tableIndexDir, boolean
dropOutOfOrderRecord,
+ boolean enableDeletedKeysCompactionConsistency, @Nullable
TableDataManager tableDataManager) {
_tableConfig = tableConfig;
_schema = schema;
_primaryKeyColumns = primaryKeyColumns;
@@ -66,6 +69,8 @@ public class UpsertContext {
_consistencyMode = consistencyMode;
_upsertViewRefreshIntervalMs = upsertViewRefreshIntervalMs;
_tableIndexDir = tableIndexDir;
+ _dropOutOfOrderRecord = dropOutOfOrderRecord;
+ _enableDeletedKeysCompactionConsistency =
enableDeletedKeysCompactionConsistency;
_tableDataManager = tableDataManager;
}
@@ -125,6 +130,14 @@ public class UpsertContext {
return _tableIndexDir;
}
+ public boolean isDropOutOfOrderRecord() {
+ return _dropOutOfOrderRecord;
+ }
+
+ public boolean isEnableDeletedKeysCompactionConsistency() {
+ return _enableDeletedKeysCompactionConsistency;
+ }
+
public TableDataManager getTableDataManager() {
return _tableDataManager;
}
@@ -144,6 +157,8 @@ public class UpsertContext {
private UpsertConfig.ConsistencyMode _consistencyMode;
private long _upsertViewRefreshIntervalMs;
private File _tableIndexDir;
+ private boolean _dropOutOfOrderRecord;
+ private boolean _enableDeletedKeysCompactionConsistency;
private TableDataManager _tableDataManager;
public Builder setTableConfig(TableConfig tableConfig) {
@@ -216,6 +231,16 @@ public class UpsertContext {
return this;
}
+ public Builder setDropOutOfOrderRecord(boolean dropOutOfOrderRecord) {
+ _dropOutOfOrderRecord = dropOutOfOrderRecord;
+ return this;
+ }
+
+ public Builder setEnableDeletedKeysCompactionConsistency(boolean
enableDeletedKeysCompactionConsistency) {
+ _enableDeletedKeysCompactionConsistency =
enableDeletedKeysCompactionConsistency;
+ return this;
+ }
+
public Builder setTableDataManager(TableDataManager tableDataManager) {
_tableDataManager = tableDataManager;
return this;
@@ -230,7 +255,8 @@ public class UpsertContext {
Preconditions.checkState(_tableIndexDir != null, "Table index directory
must be set");
return new UpsertContext(_tableConfig, _schema, _primaryKeyColumns,
_comparisonColumns, _deleteRecordColumn,
_hashFunction, _partialUpsertHandler, _enableSnapshot,
_enablePreload, _metadataTTL, _deletedKeysTTL,
- _consistencyMode, _upsertViewRefreshIntervalMs, _tableIndexDir,
_tableDataManager);
+ _consistencyMode, _upsertViewRefreshIntervalMs, _tableIndexDir,
_dropOutOfOrderRecord,
+ _enableDeletedKeysCompactionConsistency, _tableDataManager);
}
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index 6484c354f9..47d6a72743 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -108,6 +108,26 @@ public class UpsertUtils {
};
}
+ /**
+ * Returns an iterator of {@link PrimaryKey} for all the documents from the
segment.
+ */
+ public static Iterator<PrimaryKey> getPrimaryKeyIterator(PrimaryKeyReader
primaryKeyReader,
+ int numDocs) {
+ return new Iterator<>() {
+ private int _docId = 0;
+
+ @Override
+ public boolean hasNext() {
+ return _docId < numDocs;
+ }
+
+ @Override
+ public PrimaryKey next() {
+ return primaryKeyReader.getPrimaryKey(_docId++);
+ }
+ };
+ }
+
public static class RecordInfoReader implements Closeable {
private final PrimaryKeyReader _primaryKeyReader;
private final ComparisonColumnReader _comparisonColumnReader;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 16597eebd5..54460fa47c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -858,6 +858,31 @@ public final class TableConfigUtils {
}
}
+
+ if (upsertConfig != null &&
upsertConfig.isEnableDeletedKeysCompactionConsistency()) {
+ // enableDeletedKeysCompactionConsistency shouldn't exist with
metadataTTL
+ Preconditions.checkState(upsertConfig.getMetadataTTL() == 0,
+ "enableDeletedKeysCompactionConsistency and metadataTTL shouldn't
exist together for upsert table");
+
+ // enableDeletedKeysCompactionConsistency shouldn't exist with
enablePreload
+ Preconditions.checkState(!upsertConfig.isEnablePreload(),
+ "enableDeletedKeysCompactionConsistency and enablePreload shouldn't
exist together for upsert table");
+
+ // enableDeletedKeysCompactionConsistency should exist with
deletedKeysTTL
+ Preconditions.checkState(upsertConfig.getDeletedKeysTTL() > 0,
+ "enableDeletedKeysCompactionConsistency should exist with
deletedKeysTTL for upsert table");
+
+ // enableDeletedKeysCompactionConsistency should exist with
enableSnapshot
+ Preconditions.checkState(upsertConfig.isEnableSnapshot(),
+ "enableDeletedKeysCompactionConsistency should exist with
enableSnapshot for upsert table");
+
+ // enableDeletedKeysCompactionConsistency should exist with
UpsertCompactionTask
+ TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+ Preconditions.checkState(taskConfig != null
+ &&
taskConfig.getTaskTypeConfigsMap().containsKey(UPSERT_COMPACTION_TASK_TYPE),
+ "enableDeletedKeysCompactionConsistency should exist with
UpsertCompactionTask for upsert table");
+ }
+
Preconditions.checkState(
tableConfig.getInstanceAssignmentConfigMap() == null ||
!tableConfig.getInstanceAssignmentConfigMap()
.containsKey(InstancePartitionsType.COMPLETED),
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
new file mode 100644
index 0000000000..4a6ab831cb
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
@@ -0,0 +1,1212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
+import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.mockito.MockedConstruction;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+
+/**
+ * This class tries to replicate the behaviour for {@code
ConcurrentMapPartitionUpsertMetadataManagerTest} assuming
+ * that _enableConsistentDeletes is enabled, and accordingly we set all the
params in {@code setUpContextBuilder}.
+ * We have removed preload and metadataTTL unit-tests for now as we don't
allow them along with
+ * _enableConsistentDeletes.
+ */
+public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest {
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+ private static final List<String> PRIMARY_KEY_COLUMNS =
Collections.singletonList("pk");
+ private static final List<String> COMPARISON_COLUMNS =
Collections.singletonList("timeCol");
+ private static final String DELETE_RECORD_COLUMN = "deleteCol";
+ private static final File INDEX_DIR =
+ new File(FileUtils.getTempDirectory(),
"ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest");
+
+ private UpsertContext.Builder _contextBuilder;
+
+ private static ImmutableSegmentImpl mockImmutableSegment(int sequenceNumber,
+ ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
+ List<PrimaryKey> primaryKeys) {
+ ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
+ when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
+ when(segment.getValidDocIds()).thenReturn(validDocIds);
+ when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
+ DataSource dataSource = mock(DataSource.class);
+ when(segment.getDataSource(anyString())).thenReturn(dataSource);
+ ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
+ when(forwardIndex.isSingleValue()).thenReturn(true);
+ when(forwardIndex.getStoredType()).thenReturn(FieldSpec.DataType.INT);
+ when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
+ invocation ->
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
+ when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+ SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+
when(segmentMetadata.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+ when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+ return segment;
+ }
+
+ private static ImmutableSegmentImpl mockUploadedImmutableSegment(String
suffix,
+ ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
+ List<PrimaryKey> primaryKeys, Long creationTimeMs) {
+ if (creationTimeMs == null) {
+ creationTimeMs = System.currentTimeMillis();
+ }
+ ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
+
when(segment.getSegmentName()).thenReturn(getUploadedRealtimeSegmentName(creationTimeMs,
suffix));
+ when(segment.getValidDocIds()).thenReturn(validDocIds);
+ when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
+ DataSource dataSource = mock(DataSource.class);
+ when(segment.getDataSource(anyString())).thenReturn(dataSource);
+ ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
+ when(forwardIndex.isSingleValue()).thenReturn(true);
+ when(forwardIndex.getStoredType()).thenReturn(FieldSpec.DataType.INT);
+ when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
+ invocation ->
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
+ when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+ SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+ when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
+ when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+ return segment;
+ }
+
+ private static ImmutableSegmentImpl
mockImmutableSegmentWithSegmentMetadata(int sequenceNumber,
+ ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
+ List<PrimaryKey> primaryKeys, SegmentMetadataImpl segmentMetadata,
MutableRoaringBitmap snapshot) {
+ ImmutableSegmentImpl segment = mockImmutableSegment(sequenceNumber,
validDocIds, queryableDocIds, primaryKeys);
+ when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+ when(segment.loadValidDocIdsFromSnapshot()).thenReturn(snapshot);
+ return segment;
+ }
+
+ private static EmptyIndexSegment mockEmptySegment(int sequenceNumber) {
+ SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+ when(segmentMetadata.getName()).thenReturn(getSegmentName(sequenceNumber));
+ return new EmptyIndexSegment(segmentMetadata);
+ }
+
+ private static MutableSegment mockMutableSegment(int sequenceNumber,
ThreadSafeMutableRoaringBitmap validDocIds,
+ ThreadSafeMutableRoaringBitmap queryableDocIds) {
+ MutableSegment segment = mock(MutableSegment.class);
+ when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
+ when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
+ when(segment.getValidDocIds()).thenReturn(validDocIds);
+ return segment;
+ }
+
+ private static String getSegmentName(int sequenceNumber) {
+ return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber,
System.currentTimeMillis()).toString();
+ }
+
+ private static String getUploadedRealtimeSegmentName(long creationTimeMs,
String suffix) {
+ return new UploadedRealtimeSegmentName(RAW_TABLE_NAME, 0, creationTimeMs,
"uploaded", suffix).toString();
+ }
+
+ private static PrimaryKey makePrimaryKey(int value) {
+ return new PrimaryKey(new Object[]{value});
+ }
+
+ private static void checkRecordLocation(
+ Map<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
recordLocationMap,
+ int keyValue, IndexSegment segment, int docId, int comparisonValue,
HashFunction hashFunction) {
+
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation
recordLocation =
+
recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue),
hashFunction));
+ assertNotNull(recordLocation);
+ assertSame(recordLocation.getSegment(), segment);
+ assertEquals(recordLocation.getDocId(), docId);
+ assertEquals(((Integer) recordLocation.getComparisonValue()),
comparisonValue);
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws IOException {
+ FileUtils.forceMkdir(INDEX_DIR);
+ ServerMetrics.register(mock(ServerMetrics.class));
+ }
+
+ @BeforeMethod
+ public void setUpContextBuilder() {
+ _contextBuilder = new
UpsertContext.Builder().setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
+
.setPrimaryKeyColumns(PRIMARY_KEY_COLUMNS).setComparisonColumns(COMPARISON_COLUMNS)
+
.setEnableDeletedKeysCompactionConsistency(true).setTableIndexDir(INDEX_DIR).setEnableSnapshot(true)
+ .setDeleteRecordColumn(DELETE_RECORD_COLUMN).setDeletedKeysTTL(20);
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ FileUtils.forceDelete(INDEX_DIR);
+ }
+
+ @Test
+ public void testStartFinishOperation() {
+ ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
upsertMetadataManager =
+ new
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
0,
+ _contextBuilder.build());
+
+ // Start 2 operations
+ assertTrue(upsertMetadataManager.startOperation());
+ assertTrue(upsertMetadataManager.startOperation());
+
+ // Stop and close the metadata manager
+ AtomicBoolean stopped = new AtomicBoolean();
+ AtomicBoolean closed = new AtomicBoolean();
+ // Avoid early finalization by not using Executors.newSingleThreadExecutor
(java <= 20, JDK-8145304)
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ executor.submit(() -> {
+ upsertMetadataManager.stop();
+ stopped.set(true);
+ try {
+ upsertMetadataManager.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ closed.set(true);
+ });
+ executor.shutdown();
+
+ // Wait for metadata manager to be stopped
+ TestUtils.waitForCondition(aVoid -> stopped.get(), 10_000L, "Failed to
stop the metadata manager");
+
+ // Metadata manager should block on close because there are 2 pending
operations
+ assertFalse(closed.get());
+
+ // Starting new operation should fail because the metadata manager is
already stopped
+ assertFalse(upsertMetadataManager.startOperation());
+
+ // Finish one operation
+ upsertMetadataManager.finishOperation();
+
+ // Metadata manager should still block on close because there is still 1
pending operation
+ assertFalse(closed.get());
+
+ // Finish the other operation
+ upsertMetadataManager.finishOperation();
+
+ // Metadata manager should be closed now
+ TestUtils.waitForCondition(aVoid -> closed.get(), 10_000L, "Failed to
close the metadata manager");
+ }
+
+ @Test
+ public void testAddReplaceRemoveSegment()
+ throws IOException {
+ verifyAddReplaceRemoveSegment(HashFunction.NONE);
+ verifyAddReplaceRemoveSegment(HashFunction.MD5);
+ verifyAddReplaceRemoveSegment(HashFunction.MURMUR3);
+ }
+
+ @Test
+ public void testGetQueryableDocIds() {
+ boolean[] deleteFlags1 = new boolean[]{false, false, false, true, true,
false};
+ int[] docIds1 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+ validDocIdsSnapshot1.add(docIds1);
+ MutableRoaringBitmap queryableDocIds1 = new MutableRoaringBitmap();
+ queryableDocIds1.add(new int[]{2, 5});
+ verifyGetQueryableDocIds(false, deleteFlags1, validDocIdsSnapshot1,
queryableDocIds1);
+
+ // all records are not deleted
+ boolean[] deleteFlags2 = new boolean[]{false, false, false, false, false,
false};
+ int[] docIds2 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+ validDocIdsSnapshot2.add(docIds2);
+ MutableRoaringBitmap queryableDocIds2 = new MutableRoaringBitmap();
+ queryableDocIds2.add(docIds2);
+ verifyGetQueryableDocIds(false, deleteFlags2, validDocIdsSnapshot2,
queryableDocIds2);
+
+ // delete column has null values
+ boolean[] deleteFlags3 = new boolean[]{false, false, false, false, false,
false};
+ int[] docIds3 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot3 = new MutableRoaringBitmap();
+ validDocIdsSnapshot3.add(docIds3);
+ MutableRoaringBitmap queryableDocIds3 = new MutableRoaringBitmap();
+ queryableDocIds3.add(docIds3);
+ verifyGetQueryableDocIds(true, deleteFlags3, validDocIdsSnapshot3,
queryableDocIds3);
+
+ // All records are deleted record.
+ boolean[] deleteFlags4 = new boolean[]{true, true, true, true, true, true};
+ int[] docIds4 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot4 = new MutableRoaringBitmap();
+ validDocIdsSnapshot4.add(docIds4);
+ MutableRoaringBitmap queryableDocIds4 = new MutableRoaringBitmap();
+ verifyGetQueryableDocIds(false, deleteFlags4, validDocIdsSnapshot4,
queryableDocIds4);
+ }
+
+ private void verifyAddReplaceRemoveSegment(HashFunction hashFunction)
+ throws IOException {
+ ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
upsertMetadataManager =
+ new
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
0,
+ _contextBuilder.setHashFunction(hashFunction).build());
+ Map<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
recordLocationMap =
+ upsertMetadataManager._primaryKeyToRecordLocationMap;
+ Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+ // Add the first segment
+ int numRecords = 6;
+ int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+ int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+ SegmentMetadataImpl segmentMetadata1 = mock(SegmentMetadataImpl.class);
+
when(segmentMetadata1.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+ when(segmentMetadata1.getTotalDocs()).thenReturn(numRecords);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithSegmentMetadata(1, validDocIds1, null,
primaryKeys1, segmentMetadata1, null);
+ List<RecordInfo> recordInfoList1;
+ // get recordInfo from validDocIdSnapshot.
+ // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ int[] docIds1 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+ validDocIdsSnapshot1.add(docIds1);
+ recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys,
timestamps, null);
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+ trackedSegments.add(segment1);
+ // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4, 5});
+
+ // Add the second segment
+ numRecords = 5;
+ primaryKeys = new int[]{0, 1, 2, 3, 0};
+ timestamps = new int[]{100, 100, 120, 80, 80};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ SegmentMetadataImpl segmentMetadata2 = mock(SegmentMetadataImpl.class);
+
when(segmentMetadata2.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+ when(segmentMetadata2.getTotalDocs()).thenReturn(numRecords);
+ ImmutableSegmentImpl segment2 =
+ mockImmutableSegmentWithSegmentMetadata(2, validDocIds2, null,
getPrimaryKeyList(numRecords, primaryKeys),
+ segmentMetadata2, null);
+ List<RecordInfo> recordInfoList2;
+ // get recordInfo from validDocIdSnapshot.
+ // segment2 snapshot: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // segment1 snapshot: 1 -> {4, 120}
+ MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+ validDocIdsSnapshot2.add(0, 2, 3);
+ recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys,
timestamps, null);
+ upsertMetadataManager.addSegment(segment2, validDocIds2, null,
recordInfoList2.iterator());
+ trackedSegments.add(segment2);
+
+ // segment1: 1 -> {4, 120}
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
+
+ // Add an empty segment
+ EmptyIndexSegment emptySegment = mockEmptySegment(3);
+ upsertMetadataManager.addSegment(emptySegment);
+ // segment1: 1 -> {4, 120}
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
+
+ // Replace (reload) the first segment
+ ThreadSafeMutableRoaringBitmap newValidDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ SegmentMetadataImpl newSegmentMetadata1 = mock(SegmentMetadataImpl.class);
+
when(newSegmentMetadata1.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+ when(newSegmentMetadata1.getTotalDocs()).thenReturn(primaryKeys1.size());
+ ImmutableSegmentImpl newSegment1 =
+ mockImmutableSegmentWithSegmentMetadata(1, newValidDocIds1, null,
primaryKeys1, newSegmentMetadata1, null);
+ upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, null,
recordInfoList1.iterator(), segment1);
+ trackedSegments.add(newSegment1);
+ trackedSegments.remove(segment1);
+ // original segment1: 1 -> {4, 120} (not in the map)
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+
+ // Remove the original segment1
+ upsertMetadataManager.removeSegment(segment1);
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+
+ // Remove the empty segment
+ upsertMetadataManager.removeSegment(emptySegment);
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+
+ // Remove segment2
+ upsertMetadataManager.removeSegment(segment2);
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map)
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 1);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertEquals(trackedSegments, Collections.singleton(newSegment1));
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Remove new segment1, should be no-op
+ upsertMetadataManager.removeSegment(newSegment1);
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 1);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertEquals(trackedSegments, Collections.singleton(newSegment1));
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testAddReplaceRemoveSegmentWithRecordDelete()
+ throws IOException {
+ verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.NONE);
+ verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MD5);
+ verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3);
+ }
+
+ @Test
+ public void verifyAddReplaceUploadedSegment1()
+ throws IOException {
+ ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
upsertMetadataManager =
+ new
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
0,
+ _contextBuilder.setHashFunction(HashFunction.NONE).build());
+ Map<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
recordLocationMap =
+ upsertMetadataManager._primaryKeyToRecordLocationMap;
+ Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+ // Add the first segment
+ int numRecords = 6;
+ int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+ int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+ SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+ when(segmentMetadata.getIndexCreationTime()).thenReturn(1000L);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithSegmentMetadata(1, validDocIds1, null,
primaryKeys1, segmentMetadata, null);
+ List<RecordInfo> recordInfoList1;
+ // get recordInfo by iterating all records.
+ recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps,
null);
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+ trackedSegments.add(segment1);
+ // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 0, segment1, 5, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4, 5});
+
+ // Add the second segment of uploaded name format with same creation time
+ numRecords = 2;
+ primaryKeys = new int[]{0, 3};
+ timestamps = new int[]{100, 80};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl uploadedSegment2 =
+ mockUploadedImmutableSegment("2", validDocIds2, null,
getPrimaryKeyList(numRecords, primaryKeys), 1000L);
+ List<RecordInfo> recordInfoList2;
+ // get recordInfo by iterating all records.
+ recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps,
null);
+ upsertMetadataManager.addSegment(uploadedSegment2, validDocIds2, null,
recordInfoList2.iterator());
+ trackedSegments.add(uploadedSegment2);
+
+ // segment1: 1 -> {4, 120}, 2 -> {2, 100}
+ // uploadedSegment2: 0 -> {0, 100}, 3 -> {1, 80}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, uploadedSegment2, 0, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, uploadedSegment2, 1, 80,
HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+
+ // replace uploadedSegment2
+ ThreadSafeMutableRoaringBitmap newValidDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl newUploadedSegment2 =
+ mockUploadedImmutableSegment("2", newValidDocIds2, null,
getPrimaryKeyList(numRecords, primaryKeys), 1020L);
+ upsertMetadataManager.replaceSegment(newUploadedSegment2, newValidDocIds2,
null, recordInfoList2.iterator(),
+ uploadedSegment2);
+ trackedSegments.add(newUploadedSegment2);
+ trackedSegments.remove(uploadedSegment2);
+
+ // segment1: 1 -> {4, 120}, 2 -> {2, 100}
+ // newUploadedSegment2: 0 -> {0, 100}, 3 -> {1, 80}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, newUploadedSegment2, 0, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80,
HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4});
+ assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+
+ // add upploadedSegment3 with higher creation time than newUploadedSegment2
+ numRecords = 1;
+ primaryKeys = new int[]{0};
+ timestamps = new int[]{100};
+ ThreadSafeMutableRoaringBitmap validDocIds3 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl uploadedSegment3 =
+ mockUploadedImmutableSegment("3", validDocIds3, null,
getPrimaryKeyList(numRecords, primaryKeys), 1040L);
+ List<RecordInfo> recordInfoList3;
+ // get recordInfo by iterating all records.
+ recordInfoList3 = getRecordInfoList(numRecords, primaryKeys, timestamps,
null);
+ upsertMetadataManager.addSegment(uploadedSegment3, validDocIds3, null,
recordInfoList3.iterator());
+
+ // segment1: 1 -> {4, 120}, 2 -> {2, 100}
+ // newUploadedSegment2: 3 -> {1, 80}
+ // uploadedSegment3: 0 -> {0, 100}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80,
HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4});
+ assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1});
+ assertEquals(validDocIds3.getMutableRoaringBitmap().toArray(), new
int[]{0});
+
+ // add uploadedSegment4 with higher creation time than segment 1 and same
creation time as uploadedSegment3
+ numRecords = 2;
+ primaryKeys = new int[]{0, 1};
+ timestamps = new int[]{100, 120};
+ ThreadSafeMutableRoaringBitmap validDocIds4 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl uploadedSegment4 =
+ mockUploadedImmutableSegment("4", validDocIds4, null,
getPrimaryKeyList(numRecords, primaryKeys), 1040L);
+ List<RecordInfo> recordInfoList4;
+ // get recordInfo by iterating all records.
+ recordInfoList4 = getRecordInfoList(numRecords, primaryKeys, timestamps,
null);
+ upsertMetadataManager.addSegment(uploadedSegment4, validDocIds4, null,
recordInfoList4.iterator());
+
+ // segment1: 2 -> {2, 100}
+ // newUploadedSegment2: 3 -> {1, 80}
+ // uploadedSegment3: 0 -> {0, 100}
+ // uploadedSegment4: 1 -> {1, 120}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, uploadedSegment4, 1, 120,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80,
HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2});
+ assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1});
+ assertEquals(validDocIds3.getMutableRoaringBitmap().toArray(), new
int[]{0});
+ assertEquals(validDocIds4.getMutableRoaringBitmap().toArray(), new
int[]{1});
+
+ // remove segments
+ upsertMetadataManager.removeSegment(segment1);
+ upsertMetadataManager.removeSegment(uploadedSegment2);
+ upsertMetadataManager.removeSegment(newUploadedSegment2);
+ upsertMetadataManager.removeSegment(uploadedSegment3);
+ upsertMetadataManager.removeSegment(uploadedSegment4);
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
+ }
+
+ private void verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction
hashFunction)
+ throws IOException {
+ ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
upsertMetadataManager =
+ new
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
0,
+ _contextBuilder.setHashFunction(hashFunction).build());
+ Map<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
recordLocationMap =
+ upsertMetadataManager._primaryKeyToRecordLocationMap;
+ Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+ // Add the first segment
+ int numRecords = 6;
+ int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+ int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+ boolean[] deleteFlags = new boolean[]{false, false, false, true, true,
false};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+ ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1,
queryableDocIds1, primaryKeys1);
+ List<RecordInfo> recordInfoList1;
+ // get recordInfo from validDocIdSnapshot.
+ // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ int[] docIds1 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+ validDocIdsSnapshot1.add(docIds1);
+ recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys,
timestamps, deleteFlags);
+ upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1,
recordInfoList1.iterator());
+ trackedSegments.add(segment1);
+ // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4, 5});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 5});
+
+ // Add the second segment
+ numRecords = 5;
+ primaryKeys = new int[]{0, 1, 2, 3, 0};
+ timestamps = new int[]{100, 100, 120, 80, 80};
+ deleteFlags = new boolean[]{false, true, true, false, false};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ SegmentMetadataImpl segmentMetadata2 = mock(SegmentMetadataImpl.class);
+
when(segmentMetadata2.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+ when(segmentMetadata2.getTotalDocs()).thenReturn(primaryKeys.length);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithSegmentMetadata(2,
validDocIds2, queryableDocIds2,
+ getPrimaryKeyList(numRecords, primaryKeys), segmentMetadata2, null);
+ List<RecordInfo> recordInfoList2;
+ // get recordInfo from validDocIdSnapshot.
+ // segment2 snapshot: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // segment1 snapshot: 1 -> {4, 120}
+ MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+ validDocIdsSnapshot2.add(0, 2, 3);
+ recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys,
timestamps, deleteFlags);
+ upsertMetadataManager.addSegment(segment2, validDocIds2, queryableDocIds2,
recordInfoList2.iterator());
+ trackedSegments.add(segment2);
+
+ // segment1: 1 -> {4, 120}
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
+ assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 3});
+
+ // Add an empty segment
+ EmptyIndexSegment emptySegment = mockEmptySegment(3);
+ upsertMetadataManager.addSegment(emptySegment);
+ // segment1: 1 -> {4, 120}
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
+ assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 3});
+
+ // Replace (reload) the first segment
+ ThreadSafeMutableRoaringBitmap newValidDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap newQueryableDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl newSegment1 = mockImmutableSegment(1,
newValidDocIds1, newQueryableDocIds1, primaryKeys1);
+ upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1,
newQueryableDocIds1, recordInfoList1.iterator(),
+ segment1);
+ trackedSegments.add(newSegment1);
+ trackedSegments.remove(segment1);
+
+ // original segment1: 1 -> {4, 120} (not in the map)
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 3});
+ assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+ // Remove the original segment1
+ upsertMetadataManager.removeSegment(segment1);
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 3});
+ assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+ // Remove the empty segment
+ upsertMetadataManager.removeSegment(emptySegment);
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 3});
+ assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+ // Remove segment2
+ upsertMetadataManager.removeSegment(segment2);
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map)
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 1);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertEquals(trackedSegments, Collections.singleton(newSegment1));
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 3});
+ assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Remove new segment1, should be no-op
+ upsertMetadataManager.removeSegment(newSegment1);
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 1);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertEquals(trackedSegments, Collections.singleton(newSegment1));
+ assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
+ }
+
+ private List<RecordInfo> getRecordInfoList(int numRecords, int[]
primaryKeys, int[] timestamps,
+ @Nullable boolean[] deleteRecordFlags) {
+ List<RecordInfo> recordInfoList = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i,
timestamps[i],
+ deleteRecordFlags != null && deleteRecordFlags[i]));
+ }
+ return recordInfoList;
+ }
+
+ private List<RecordInfo> getRecordInfoListForTTL(int numRecords, int[]
primaryKeys, int[] timestamps,
+ @Nullable boolean[] deleteRecordFlags) {
+ List<RecordInfo> recordInfoList = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i,
timestamps[i],
+ deleteRecordFlags != null && deleteRecordFlags[i]));
+ }
+ return recordInfoList;
+ }
+
+ /**
+ * Get recordInfo from validDocIdsSnapshot (enabledSnapshot = True).
+ */
+ private List<RecordInfo> getRecordInfoList(MutableRoaringBitmap
validDocIdsSnapshot, int[] primaryKeys,
+ int[] timestamps, @Nullable boolean[] deleteRecordFlags) {
+ List<RecordInfo> recordInfoList = new ArrayList<>();
+ Iterator<java.lang.Integer> validDocIdsIterator =
validDocIdsSnapshot.iterator();
+ validDocIdsIterator.forEachRemaining((docId) -> recordInfoList.add(
+ new RecordInfo(makePrimaryKey(primaryKeys[docId]), docId,
timestamps[docId],
+ deleteRecordFlags != null && deleteRecordFlags[docId])));
+ return recordInfoList;
+ }
+
+ private List<PrimaryKey> getPrimaryKeyList(int numRecords, int[]
primaryKeys) {
+ List<PrimaryKey> primaryKeyList = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ primaryKeyList.add(makePrimaryKey(primaryKeys[i]));
+ }
+ return primaryKeyList;
+ }
+
+ @Test
+ public void testAddRecord()
+ throws IOException {
+ verifyAddRecord(HashFunction.NONE);
+ verifyAddRecord(HashFunction.MD5);
+ verifyAddRecord(HashFunction.MURMUR3);
+ }
+
+ private void verifyAddRecord(HashFunction hashFunction)
+ throws IOException {
+ ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
upsertMetadataManager =
+ new
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
0,
+ _contextBuilder.setHashFunction(hashFunction).build());
+ Map<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
recordLocationMap =
+ upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+ // Add the first segment
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+ int numRecords = 3;
+ int[] primaryKeys = new int[]{0, 1, 2};
+ int[] timestamps = new int[]{100, 120, 100};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegment(1, validDocIds1, null,
getPrimaryKeyList(numRecords, primaryKeys));
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+ getRecordInfoList(numRecords, primaryKeys, timestamps,
null).iterator());
+
+ // Update records from the second segment
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment segment2 = mockMutableSegment(1, validDocIds2, null);
+ upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(3), 0, 100, false));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+ // segment2: 3 -> {0, 100}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0});
+
+ upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(2), 1, 120, false));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+ // segment2: 2 -> {1, 120}, 3 -> {0, 100}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+
+ upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(1), 2, 100, false));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+ // segment2: 2 -> {1, 120}, 3 -> {0, 100}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+
+ upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(0), 3, 100, false));
+
+ // segment1: 1 -> {1, 120}
+ // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
+ checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 3});
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Add record should be no-op
+ upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(0), 4, 120, false));
+ // segment1: 1 -> {1, 120}
+ // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
+ checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 3});
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testAddOutOfOrderRecord()
+ throws IOException {
+ verifyAddOutOfOrderRecord(HashFunction.NONE);
+ verifyAddOutOfOrderRecord(HashFunction.MD5);
+ verifyAddOutOfOrderRecord(HashFunction.MURMUR3);
+ }
+
+ private void verifyAddOutOfOrderRecord(HashFunction hashFunction)
+ throws IOException {
+ ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
upsertMetadataManager =
+ new
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
0,
+ _contextBuilder.setHashFunction(hashFunction).build());
+ Map<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
recordLocationMap =
+ upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+ // Add the first segment
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+ int numRecords = 3;
+ int[] primaryKeys = new int[]{0, 1, 2};
+ int[] timestamps = new int[]{100, 120, 100};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegment(1, validDocIds1, null,
getPrimaryKeyList(numRecords, primaryKeys));
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+ getRecordInfoList(numRecords, primaryKeys, timestamps,
null).iterator());
+
+ // Update records from the second segment
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment segment2 = mockMutableSegment(1, validDocIds2, null);
+
+ // new record, should return false for out of order event
+ boolean isOutOfOrderRecord =
+ !upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(3), 0, 100, false));
+ assertFalse(isOutOfOrderRecord);
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+ // segment2: 3 -> {0, 100}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0});
+
+ // send an out-of-order event, should return true for orderness of event
+ isOutOfOrderRecord = !upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(2), 1, 80, false));
+ assertTrue(isOutOfOrderRecord);
+
+ // ordered event for an existing key
+ isOutOfOrderRecord = !upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(2), 1, 150, false));
+ assertFalse(isOutOfOrderRecord);
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+ // segment2: 3 -> {0, 100}, 2 -> {1, 150}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+
+ // Close the metadata manager
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testAddRecordWithDeleteColumn()
+ throws IOException {
+ verifyAddRecordWithDeleteColumn(HashFunction.NONE);
+ verifyAddRecordWithDeleteColumn(HashFunction.MD5);
+ verifyAddRecordWithDeleteColumn(HashFunction.MURMUR3);
+ }
+
+ private void verifyAddRecordWithDeleteColumn(HashFunction hashFunction)
+ throws IOException {
+ ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
upsertMetadataManager =
+ new
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
0,
+ _contextBuilder.setHashFunction(hashFunction).build());
+ Map<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
recordLocationMap =
+ upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+ // queryableDocIds is same as validDocIds in the absence of delete markers
+ // Add the first segment
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+ int numRecords = 3;
+ int[] primaryKeys = new int[]{0, 1, 2};
+ int[] timestamps = new int[]{100, 120, 100};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegment(1, validDocIds1, queryableDocIds1,
getPrimaryKeyList(numRecords, primaryKeys));
+ upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1,
+ getRecordInfoList(numRecords, primaryKeys, timestamps,
null).iterator());
+
+ // Update records from the second segment
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment segment2 = mockMutableSegment(1, validDocIds2,
queryableDocIds2);
+ upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(3), 0, 100, false));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+ // segment2: 3 -> {0, 100}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0});
+
+ // Mark a record with latest value in segment1 as deleted
+ upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(2), 1, 120, true));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+ // segment2: 2 -> {1, 120}, 3 -> {0, 100}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0});
+
+ // Mark a record with latest value in segment2 as deleted
+ upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(3), 2, 150, true));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+ // segment2: 2 -> {1, 120}, 3 -> {2, 150}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 2, 150, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1, 2});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{});
+
+ // Revive a deleted primary key (by providing a larger comparisonValue)
+ upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(3), 3, 200, false));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+ // segment2: 2 -> {1, 120}, 3 -> {3, 200}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1, 3});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{3});
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Add record should be no-op
+ upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(0), 4, 120, false));
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+ // segment2: 2 -> {1, 120}, 3 -> {3, 200}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1, 3});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{3});
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testRemoveExpiredDeletedKeys()
+ throws IOException {
+ verifyRemoveExpiredDeletedKeys(HashFunction.NONE);
+ verifyRemoveExpiredDeletedKeys(HashFunction.MD5);
+ verifyRemoveExpiredDeletedKeys(HashFunction.MURMUR3);
+ }
+
+ private void verifyRemoveExpiredDeletedKeys(HashFunction hashFunction)
+ throws IOException {
+ ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
upsertMetadataManager =
+ new
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
0,
+ _contextBuilder.setHashFunction(hashFunction).build());
+ Map<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
recordLocationMap =
+ upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+ // Add the first segment
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+ int numRecords = 3;
+ int[] primaryKeys = new int[]{0, 1, 2};
+ int[] timestamps = new int[]{100, 120, 100};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegment(1, validDocIds1, queryableDocIds1,
getPrimaryKeyList(numRecords, primaryKeys));
+ upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1,
+ getRecordInfoListForTTL(numRecords, primaryKeys, timestamps,
null).iterator());
+
+ // Update records from the second segment
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment segment2 = mockMutableSegment(1, validDocIds2,
queryableDocIds2);
+ upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(3), 0, 100, false));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+ // segment2: 3 -> {0, 100}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0});
+
+ // Mark a record with latest value in segment1 as deleted (within
TTL-window)
+ upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(2), 1, 150, true));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+ // segment2: 2 -> {1, 120}, 3 -> {0, 100}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0});
+
+ // Mark a record with latest value in segment2 as deleted (outside TTL
window)
+ upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(3), 2, 120, true));
+ // Mark a record with latest value in segment2 as deleted (outside TTL
window)
+ upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(1), 3, 120, true));
+
+ // now we have 3 records marked as deleted, one is within TTL window, rest
2 are outside TTL window
+ // For the other 2, one key has data in one previous segment as well and
one has all data for the keys
+ // in the same segment.
+ // segment1: 0 -> {0, 100}
+ // segment2: 2 -> {1, 120}, 3 -> {2, 150}, 1 -> {1, 120}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment2, 3, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 2, 120, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1, 2, 3});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{});
+
+ // call delete-key workflow
+ // value 2 will be there as it is within TTL window
+ // value 1 will also be there as it is outside TTL window but data exists
in one more segment
+ // value 3 will be removed
+ upsertMetadataManager.removeExpiredPrimaryKeys();
+ // segment1: 0 -> {0, 100}
+ // segment2: 2 -> {1, 120}, 1 -> {1, 120}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment2, 3, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1, 3});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{});
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
+ }
+
+ public void verifyGetQueryableDocIds(boolean isDeleteColumnNull, boolean[]
deleteFlags,
+ MutableRoaringBitmap validDocIdsSnapshot, MutableRoaringBitmap
queryableDocIds) {
+ ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
upsertMetadataManager =
+ new
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
0,
+ _contextBuilder.build());
+
+ try (MockedConstruction<PinotSegmentColumnReader> deleteColReader =
mockConstruction(PinotSegmentColumnReader.class,
+ (mockReader, context) -> {
+ for (int i = 0; i < deleteFlags.length; i++) {
+ when(mockReader.isNull(i)).thenReturn(isDeleteColumnNull);
+ when(mockReader.getValue(i)).thenReturn(deleteFlags[i]);
+ }
+ })) {
+
+ SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+ ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
+ when(segmentMetadata.getTotalDocs()).thenReturn(deleteFlags.length);
+ when(segmentMetadata.getColumnMetadataMap()).thenReturn(new TreeMap() {{
+ this.put(COMPARISON_COLUMNS.get(0), columnMetadata);
+ }});
+
+ ImmutableSegmentImpl segment =
+ mockImmutableSegmentWithSegmentMetadata(1, new
ThreadSafeMutableRoaringBitmap(), null, null, segmentMetadata,
+ validDocIdsSnapshot);
+ assertEquals(upsertMetadataManager.getQueryableDocIds(segment,
validDocIdsSnapshot), queryableDocIds);
+ }
+ }
+
+ @Test
+ public void testHashPrimaryKey() {
+ PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"});
+ assertEquals(BytesUtils.toHexString(((ByteArray)
HashUtils.hashPrimaryKey(pk, HashFunction.MD5)).getBytes()),
+ "6ca926be8c2d1d980acf48ba48418e24");
+ assertEquals(BytesUtils.toHexString(((ByteArray)
HashUtils.hashPrimaryKey(pk, HashFunction.MURMUR3)).getBytes()),
+ "e4540494e43b27e312d01f33208c6a4e");
+ // reorder
+ pk = new PrimaryKey(new Object[]{"uuid-3", "uuid-2", "uuid-1"});
+ assertEquals(BytesUtils.toHexString(((ByteArray)
HashUtils.hashPrimaryKey(pk, HashFunction.MD5)).getBytes()),
+ "fc2159b78d07f803fdfb0b727315a445");
+ assertEquals(BytesUtils.toHexString(((ByteArray)
HashUtils.hashPrimaryKey(pk, HashFunction.MURMUR3)).getBytes()),
+ "37fab5ef0ea39711feabcdc623cb8a4e");
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java
new file mode 100644
index 0000000000..daf10826c9
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class TableUpsertMetadataManagerFactoryTest {
+ private static final String RAW_TABLE_NAME = "testTable";
+ private TableConfig _tableConfig;
+
+ @Test
+ public void testCreateForDefaultManagerClass() {
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setHashFunction(HashFunction.NONE);
+ _tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setUpsertConfig(upsertConfig).build();
+ TableUpsertMetadataManager tableUpsertMetadataManager =
+ TableUpsertMetadataManagerFactory.create(_tableConfig, null);
+ assertNotNull(tableUpsertMetadataManager);
+ assertTrue(tableUpsertMetadataManager instanceof
ConcurrentMapTableUpsertMetadataManager);
+ }
+
+ @Test
+ public void testCreateForManagerClassWithConsistentDeletes() {
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setHashFunction(HashFunction.NONE);
+ upsertConfig.setEnableDeletedKeysCompactionConsistency(true);
+ _tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setUpsertConfig(upsertConfig).build();
+ TableUpsertMetadataManager tableUpsertMetadataManager =
+ TableUpsertMetadataManagerFactory.create(_tableConfig, null);
+ assertNotNull(tableUpsertMetadataManager);
+ assertTrue(tableUpsertMetadataManager instanceof
BaseTableUpsertMetadataManager);
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 8062b422f0..e35c671705 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1961,6 +1961,83 @@ public class TableConfigUtilsTest {
} catch (IllegalStateException e) {
Assert.assertEquals(e.getMessage(), "The outOfOrderRecordColumn must be
a single-valued BOOLEAN column");
}
+
+ // test enableDeletedKeysCompactionConsistency shouldn't exist with
metadataTTL
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setEnableDeletedKeysCompactionConsistency(true);
+ upsertConfig.setMetadataTTL(1.0);
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(),
+ "enableDeletedKeysCompactionConsistency and metadataTTL shouldn't
exist together for upsert table");
+ }
+
+ // test enableDeletedKeysCompactionConsistency shouldn't exist with
enablePreload
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setEnableDeletedKeysCompactionConsistency(true);
+ upsertConfig.setEnablePreload(true);
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(),
+ "enableDeletedKeysCompactionConsistency and enablePreload shouldn't
exist together for upsert table");
+ }
+
+ // test enableDeletedKeysCompactionConsistency should exist with
deletedKeysTTL
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setEnableDeletedKeysCompactionConsistency(true);
+ upsertConfig.setDeletedKeysTTL(0);
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(),
+ "enableDeletedKeysCompactionConsistency should exist with
deletedKeysTTL for upsert table");
+ }
+
+ // test enableDeletedKeysCompactionConsistency should exist with
enableSnapshot
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setEnableDeletedKeysCompactionConsistency(true);
+ upsertConfig.setDeletedKeysTTL(100);
+ upsertConfig.setEnableSnapshot(false);
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(),
+ "enableDeletedKeysCompactionConsistency should exist with
enableSnapshot for upsert table");
+ }
+
+ // test enableDeletedKeysCompactionConsistency should exist with
UpsertCompactionTask
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setEnableDeletedKeysCompactionConsistency(true);
+ upsertConfig.setDeletedKeysTTL(100);
+ upsertConfig.setEnableSnapshot(true);
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(),
+ "enableDeletedKeysCompactionConsistency should exist with
UpsertCompactionTask for upsert table");
+ }
}
@Test
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 51588c273c..3b453aca92 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -76,6 +76,9 @@ public class UpsertConfig extends BaseJsonConfig {
@JsonPropertyDescription("TTL for upsert metadata cleanup for deleted keys,
it uses the same unit as comparison col")
private double _deletedKeysTTL;
+ @JsonPropertyDescription("If we are using deletionKeysTTL + compaction we
need to enable this for data consistency")
+ private boolean _enableDeletedKeysCompactionConsistency;
+
@JsonPropertyDescription("Whether to preload segments for fast upsert
metadata recovery")
private boolean _enablePreload;
@@ -160,6 +163,10 @@ public class UpsertConfig extends BaseJsonConfig {
return _enablePreload;
}
+ public boolean isEnableDeletedKeysCompactionConsistency() {
+ return _enableDeletedKeysCompactionConsistency;
+ }
+
public ConsistencyMode getConsistencyMode() {
return _consistencyMode;
}
@@ -270,6 +277,10 @@ public class UpsertConfig extends BaseJsonConfig {
_dropOutOfOrderRecord = dropOutOfOrderRecord;
}
+ public void setEnableDeletedKeysCompactionConsistency(boolean
enableDeletedKeysCompactionConsistency) {
+ _enableDeletedKeysCompactionConsistency =
enableDeletedKeysCompactionConsistency;
+ }
+
public void setMetadataManagerClass(String metadataManagerClass) {
_metadataManagerClass = metadataManagerClass;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]