This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cf41a0e5c7 Support Upsert deletion for TTL: construct queryableDocIds
when adding segments out of TTL (#11791)
cf41a0e5c7 is described below
commit cf41a0e5c76b1f9c33a5fa7a3ac362b52bee968c
Author: deemoliu <[email protected]>
AuthorDate: Thu Nov 2 23:25:53 2023 -0700
Support Upsert deletion for TTL: construct queryableDocIds when adding
segments out of TTL (#11791)
---
.../upsert/BasePartitionUpsertMetadataManager.java | 31 +++-
.../segment/local/utils/TableConfigUtils.java | 4 -
...rrentMapPartitionUpsertMetadataManagerTest.java | 206 ++++++++++++++++++++-
.../segment/local/utils/TableConfigUtilsTest.java | 21 ---
4 files changed, 229 insertions(+), 33 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index ea9aa7f582..7d3de0346e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -42,6 +42,7 @@ import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -50,6 +51,8 @@ import org.apache.pinot.segment.spi.V1Constants;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.BooleanUtils;
+import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,6 +126,28 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
return _primaryKeyColumns;
}
+ @Nullable
+ protected MutableRoaringBitmap getQueryableDocIds(IndexSegment segment,
MutableRoaringBitmap validDocIds) {
+ if (_deleteRecordColumn == null) {
+ return null;
+ }
+ MutableRoaringBitmap queryableDocIds = new MutableRoaringBitmap();
+ try (PinotSegmentColumnReader deleteRecordColumnReader = new
PinotSegmentColumnReader(segment,
+ _deleteRecordColumn)) {
+ PeekableIntIterator docIdIterator = validDocIds.getIntIterator();
+ while (docIdIterator.hasNext()) {
+ int docId = docIdIterator.next();
+ if (!BooleanUtils.toBoolean(deleteRecordColumnReader.getValue(docId)))
{
+ queryableDocIds.add(docId);
+ }
+ }
+ } catch (IOException e) {
+ _logger.error("Failed to close column reader for delete record column:
{} for segment: {} ", _deleteRecordColumn,
+ segment.getSegmentName(), e);
+ }
+ return queryableDocIds;
+ }
+
@Override
public void addSegment(ImmutableSegment segment) {
String segmentName = segment.getSegmentName();
@@ -140,15 +165,15 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
Preconditions.checkState(_enableSnapshot, "Upsert TTL must have snapshot
enabled");
Preconditions.checkState(_comparisonColumns.size() == 1,
"Upsert TTL does not work with multiple comparison columns");
- // TODO: Support deletion for TTL. Need to construct queryableDocIds
when adding segments out of TTL.
- Preconditions.checkState(_deleteRecordColumn == null, "Upsert TTL
doesn't work with record deletion");
Number maxComparisonValue =
(Number)
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue -
_metadataTTL) {
_logger.info("Skip adding segment: {} because it's out of TTL",
segmentName);
MutableRoaringBitmap validDocIdsSnapshot =
immutableSegment.loadValidDocIdsFromSnapshot();
if (validDocIdsSnapshot != null) {
- immutableSegment.enableUpsert(this, new
ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot), null);
+ MutableRoaringBitmap queryableDocIds = getQueryableDocIds(segment,
validDocIdsSnapshot);
+ immutableSegment.enableUpsert(this, new
ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot),
+ new ThreadSafeMutableRoaringBitmap(queryableDocIds));
} else {
_logger.warn("Failed to find snapshot from segment: {} which is out
of TTL, treating all documents as valid",
segmentName);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 903d8aa1ac..f6d9ea957b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -755,10 +755,6 @@ public final class TableConfigUtils {
}
Preconditions.checkState(upsertConfig.isEnableSnapshot(), "Upsert TTL must
have snapshot enabled");
-
- // TODO: Support deletion for TTL. Need to construct queryableDocIds when
adding segments out of TTL.
- Preconditions.checkState(upsertConfig.getDeleteRecordColumn() == null,
- "Upsert TTL doesn't work with record deletion");
}
/**
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 963fae0474..feaa8da304 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -36,6 +36,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import
org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager.RecordLocation;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
@@ -53,6 +54,7 @@ import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
+import org.mockito.MockedConstruction;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -62,6 +64,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.when;
import static org.testng.Assert.*;
@@ -145,7 +148,8 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
}
@Test
- public void testUpsertMetadataCleanupWithTTLConfig() {
+ public void testUpsertMetadataCleanupWithTTLConfig()
+ throws IOException {
verifyRemoveExpiredPrimaryKeys(new Integer(80), new Integer(120));
verifyRemoveExpiredPrimaryKeys(new Float(80), new Float(120));
verifyRemoveExpiredPrimaryKeys(new Double(80), new Double(120));
@@ -156,6 +160,44 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
verifyAddSegmentForTTL(new Double(80));
verifyAddSegmentForTTL(new Long(80));
verifyAddOutOfTTLSegment();
+ verifyAddOutOfTTLSegmentWithRecordDelete();
+ }
+
+ @Test
+ public void testGetQueryableDocIds() {
+ boolean[] deleteFlags1 = new boolean[]{false, false, false, true, true,
false};
+ int[] docIds1 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+ validDocIdsSnapshot1.add(docIds1);
+ MutableRoaringBitmap queryableDocIds1 = new MutableRoaringBitmap();
+ queryableDocIds1.add(new int[]{2, 5});
+ verifyGetQueryableDocIds(false, deleteFlags1, validDocIdsSnapshot1,
queryableDocIds1);
+
+ // all records are not deleted
+ boolean[] deleteFlags2 = new boolean[]{false, false, false, false, false,
false};
+ int[] docIds2 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+ validDocIdsSnapshot2.add(docIds2);
+ MutableRoaringBitmap queryableDocIds2 = new MutableRoaringBitmap();
+ queryableDocIds2.add(docIds2);
+ verifyGetQueryableDocIds(false, deleteFlags2, validDocIdsSnapshot2,
queryableDocIds2);
+
+ // delete column has null values
+ boolean[] deleteFlags3 = new boolean[]{false, false, false, false, false,
false};
+ int[] docIds3 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot3 = new MutableRoaringBitmap();
+ validDocIdsSnapshot3.add(docIds3);
+ MutableRoaringBitmap queryableDocIds3 = new MutableRoaringBitmap();
+ queryableDocIds3.add(docIds3);
+ verifyGetQueryableDocIds(true, deleteFlags3, validDocIdsSnapshot3,
queryableDocIds3);
+
+ // All records are deleted record.
+ boolean[] deleteFlags4 = new boolean[]{true, true, true, true, true, true};
+ int[] docIds4 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot4 = new MutableRoaringBitmap();
+ validDocIdsSnapshot4.add(docIds4);
+ MutableRoaringBitmap queryableDocIds4 = new MutableRoaringBitmap();
+ verifyGetQueryableDocIds(false, deleteFlags4, validDocIdsSnapshot4,
queryableDocIds4);
}
private void verifyAddReplaceRemoveSegment(HashFunction hashFunction,
boolean enableSnapshot)
@@ -563,6 +605,15 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
return segment;
}
+ private static ImmutableSegmentImpl
mockImmutableSegmentWithSegmentMetadata(int sequenceNumber,
+ ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
+ List<PrimaryKey> primaryKeys, SegmentMetadataImpl segmentMetadata,
MutableRoaringBitmap snapshot) {
+ ImmutableSegmentImpl segment = mockImmutableSegment(sequenceNumber,
validDocIds, queryableDocIds, primaryKeys);
+ when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+ when(segment.loadValidDocIdsFromSnapshot()).thenReturn(snapshot);
+ return segment;
+ }
+
private static EmptyIndexSegment mockEmptySegment(int sequenceNumber) {
SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
when(segmentMetadata.getName()).thenReturn(getSegmentName(sequenceNumber));
@@ -923,7 +974,8 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.close();
}
- private void verifyRemoveExpiredPrimaryKeys(Comparable
earlierComparisonValue, Comparable largerComparisonValue) {
+ private void verifyRemoveExpiredPrimaryKeys(Comparable
earlierComparisonValue, Comparable largerComparisonValue)
+ throws IOException {
File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
@@ -982,9 +1034,16 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
// ValidDocIds for out-of-ttl records should not be removed.
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2, 3});
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
}
- private void verifyAddOutOfTTLSegment() {
+ private void verifyAddOutOfTTLSegment()
+ throws IOException {
File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
@@ -1048,9 +1107,133 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.addSegment(segment2);
// out of ttl segment should not be added to recordLocationMap
assertEquals(recordLocationMap.size(), 5);
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
}
- private void verifyAddSegmentForTTL(Comparable comparisonValue) {
+ private void verifyAddOutOfTTLSegmentWithRecordDelete()
+ throws IOException {
+ String comparisonColumn = "timeCol";
+ String deleteRecordColumn = "deleteCol";
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
+ Collections.singletonList(comparisonColumn), deleteRecordColumn,
HashFunction.NONE, null, true, false, 30,
+ INDEX_DIR, mock(ServerMetrics.class));
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+ // Add the first segment, it will not be skipped
+ int numRecords = 6;
+ int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+ int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+ boolean[] deleteFlags = new boolean[]{false, false, false, true, true,
false};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+
+ int[] docIds1 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+ validDocIdsSnapshot1.add(docIds1);
+ ImmutableSegmentImpl segment1 = mockImmutableSegmentWithEndTime(1,
validDocIds1, queryableDocIds1, primaryKeys1,
+ Collections.singletonList(comparisonColumn), new Double(120),
validDocIdsSnapshot1);
+
+ // get recordInfo from validDocIdSnapshot.
+ // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ List<RecordInfo> recordInfoList1;
+ recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys,
timestamps, deleteFlags);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1,
recordInfoList1.iterator());
+ trackedSegments.add(segment1);
+ // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 0, segment1, 5, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4, 5});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 5});
+
+ // Add the second segment, it will be skipped.
+ numRecords = 5;
+ primaryKeys = new int[]{0, 1, 2, 3, 4};
+ timestamps = new int[]{40, 40, 40, 40, 40};
+ deleteFlags = new boolean[]{false, false, true, false, true};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+
+ int[] docIds2 = new int[]{3, 4};
+ validDocIdsSnapshot2.add(docIds2);
+ ImmutableSegmentImpl segment2 =
+ mockImmutableSegmentWithEndTime(2, validDocIds2, queryableDocIds2,
getPrimaryKeyList(numRecords, primaryKeys),
+ Collections.singletonList(comparisonColumn), new Double(40),
validDocIdsSnapshot2);
+
+ // get recordInfo from validDocIdSnapshot.
+ // segment2 snapshot: 3 -> {3, 40}, 4 -> {4, 40}
+ // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ List<RecordInfo> recordInfoList2;
+ recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys,
timestamps, deleteFlags);
+
+ upsertMetadataManager.addSegment(segment2, validDocIds2, queryableDocIds2,
recordInfoList2.iterator());
+ trackedSegments.add(segment2);
+
+ // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ // segment2: 3 -> {3, 40}, 4 -> {4, 40}
+ assertEquals(recordLocationMap.size(), 5);
+ checkRecordLocation(recordLocationMap, 0, segment1, 5, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 40,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 4, segment2, 4, 40,
HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4, 5});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{3, 4});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 5});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{3});
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
+ }
+
+ public void verifyGetQueryableDocIds(boolean isDeleteColumnNull, boolean[]
deleteFlags,
+ MutableRoaringBitmap validDocIdsSnapshot, MutableRoaringBitmap
queryableDocIds) {
+ String comparisonColumn = "timeCol";
+ String deleteRecordColumn = "deleteCol";
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
+ Collections.singletonList(comparisonColumn), deleteRecordColumn,
HashFunction.NONE, null, true, false, 30,
+ INDEX_DIR, mock(ServerMetrics.class));
+
+ try (MockedConstruction<PinotSegmentColumnReader> deleteColReader =
mockConstruction(PinotSegmentColumnReader.class,
+ (mockReader, context) -> {
+ for (int i = 0; i < deleteFlags.length; i++) {
+ when(mockReader.isNull(i)).thenReturn(isDeleteColumnNull);
+ when(mockReader.getValue(i)).thenReturn(deleteFlags[i]);
+ }
+ })) {
+
+ SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+ ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
+ when(segmentMetadata.getTotalDocs()).thenReturn(deleteFlags.length);
+ when(segmentMetadata.getColumnMetadataMap()).thenReturn(new TreeMap() {{
+ this.put(comparisonColumn, columnMetadata);
+ }});
+ when(columnMetadata.getMaxValue()).thenReturn(null);
+
+ ImmutableSegmentImpl segment =
+ mockImmutableSegmentWithSegmentMetadata(1, new
ThreadSafeMutableRoaringBitmap(), null, null, segmentMetadata,
+ validDocIdsSnapshot);
+ assertEquals(upsertMetadataManager.getQueryableDocIds(segment,
validDocIdsSnapshot), queryableDocIds);
+ }
+ }
+
+ private void verifyAddSegmentForTTL(Comparable comparisonValue)
+ throws IOException {
File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
@@ -1083,6 +1266,12 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.addSegment(segment1);
assertEquals(recordLocationMap.size(), 1);
checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80,
HashFunction.NONE);
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
}
// Add the following utils function since the Comparison column is a long
value for TTL enabled upsert table.
@@ -1106,7 +1295,8 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(((Number) recordLocation.getComparisonValue()).doubleValue(),
comparisonValue.doubleValue());
}
- private void verifyPersistAndLoadWatermark() {
+ private void verifyPersistAndLoadWatermark()
+ throws IOException {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, true, false, 10, INDEX_DIR,
@@ -1118,6 +1308,12 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
double watermark = upsertMetadataManager.loadWatermark();
assertEquals(watermark, currentTimeMs);
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
}
@Test
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 9655361b4c..1ad4cc74d6 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -2054,27 +2054,6 @@ public class TableConfigUtilsTest {
} catch (IllegalStateException e) {
// Expected
}
-
- // Invalid config with both delete and TTL enabled
- String delCol = "myDelCol";
- schema =
- new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
FieldSpec.DataType.STRING)
- .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
- .addSingleValueDimension(delCol, FieldSpec.DataType.STRING)
- .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
- upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
- upsertConfig.setMetadataTTL(3600);
- upsertConfig.setEnableSnapshot(true);
- upsertConfig.setDeleteRecordColumn(delCol);
- TableConfig tableConfigWithBothDeleteAndTTL =
- new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
- .setUpsertConfig(upsertConfig).build();
- try {
-
TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithBothDeleteAndTTL,
schema);
- Assert.fail();
- } catch (IllegalStateException e) {
- // Expected
- }
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]