This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e4a9f15697 Add watermark for dedup TTL (#14137)
e4a9f15697 is described below
commit e4a9f15697b9e13eeab60a0c89c60acb753af72e
Author: Xiaobing <[email protected]>
AuthorDate: Wed Oct 2 09:30:24 2024 -0700
Add watermark for dedup TTL (#14137)
* store/load watermark for dedup ttl
* skip semgments out of TTL for all add/replace/remove methods
---
.../dedup/BasePartitionDedupMetadataManager.java | 69 ++++++++++++-----
...ConcurrentMapPartitionDedupMetadataManager.java | 11 ++-
.../ConcurrentMapTableDedupMetadataManager.java | 1 -
.../upsert/BasePartitionUpsertMetadataManager.java | 66 ++--------------
.../pinot/segment/local/utils/WatermarkUtils.java | 88 ++++++++++++++++++++++
...apPartitionDedupMetadataManagerWithTTLTest.java | 31 +++++---
...artitionDedupMetadataManagerWithoutTTLTest.java | 16 +++-
.../mutable/MutableSegmentDedupeTest.java | 16 ++--
...rrentMapPartitionUpsertMetadataManagerTest.java | 5 +-
9 files changed, 197 insertions(+), 106 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
index 3009875c3e..3892d36d92 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.dedup;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AtomicDouble;
+import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@@ -30,7 +31,9 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.utils.WatermarkUtils;
import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.slf4j.Logger;
@@ -38,6 +41,8 @@ import org.slf4j.LoggerFactory;
public abstract class BasePartitionDedupMetadataManager implements
PartitionDedupMetadataManager {
+ // The special value to indicate the largest seen time is not set yet,
assuming times are positive.
+ protected static final double TTL_WATERMARK_NOT_SET = 0;
protected final String _tableNameWithType;
protected final List<String> _primaryKeyColumns;
protected final int _partitionId;
@@ -45,7 +50,8 @@ public abstract class BasePartitionDedupMetadataManager
implements PartitionDedu
protected final HashFunction _hashFunction;
protected final double _metadataTTL;
protected final String _dedupTimeColumn;
- protected final AtomicDouble _largestSeenTime = new AtomicDouble(0);
+ protected final AtomicDouble _largestSeenTime;
+ protected final File _tableIndexDir;
protected final Logger _logger;
// The following variables are always accessed within synchronized block
private boolean _stopped;
@@ -61,12 +67,17 @@ public abstract class BasePartitionDedupMetadataManager
implements PartitionDedu
_serverMetrics = dedupContext.getServerMetrics();
_metadataTTL = dedupContext.getMetadataTTL() >= 0 ?
dedupContext.getMetadataTTL() : 0;
_dedupTimeColumn = dedupContext.getDedupTimeColumn();
+ _tableIndexDir = dedupContext.getTableIndexDir();
+ _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId +
"-" + getClass().getSimpleName());
if (_metadataTTL > 0) {
Preconditions.checkArgument(_dedupTimeColumn != null,
"When metadataTTL is configured, metadata time column must be
configured for dedup enabled table: %s",
tableNameWithType);
+ _largestSeenTime = new
AtomicDouble(WatermarkUtils.loadWatermark(getWatermarkFile(),
TTL_WATERMARK_NOT_SET));
+ } else {
+ _largestSeenTime = new AtomicDouble(TTL_WATERMARK_NOT_SET);
+ WatermarkUtils.deleteWatermark(getWatermarkFile());
}
- _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId +
"-" + getClass().getSimpleName());
}
@Override
@@ -85,17 +96,6 @@ public abstract class BasePartitionDedupMetadataManager
implements PartitionDedu
Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
"Got unsupported segment implementation: %s for segment: %s, table:
%s", segment.getClass(), segmentName,
_tableNameWithType);
- // If metadataTTL is enabled, we can skip adding segment that's already
getting out of the TTL.
- if (_metadataTTL > 0) {
- double maxDedupTime = ((Number)
segment.getSegmentMetadata().getColumnMetadataMap().get(_dedupTimeColumn)
- .getMaxValue()).doubleValue();
- _largestSeenTime.getAndUpdate(time -> Math.max(time, maxDedupTime));
- if (isOutOfMetadataTTL(maxDedupTime)) {
- _logger.info("Skip adding segment: {} as max dedupTime: {} is out of
metadataTTL: {}", segmentName,
- _dedupTimeColumn, _metadataTTL);
- return;
- }
- }
if (!startOperation()) {
_logger.info("Skip adding segment: {} because dedup metadata manager is
already stopped",
segment.getSegmentName());
@@ -133,6 +133,17 @@ public abstract class BasePartitionDedupMetadataManager
implements PartitionDedu
private void addOrReplaceSegment(@Nullable IndexSegment oldSegment,
IndexSegment newSegment)
throws IOException {
+ // If metadataTTL is enabled, we can skip adding dedup metadata for
segment that's already out of the TTL.
+ if (_metadataTTL > 0) {
+ double maxDedupTime = getMaxDedupTime(newSegment);
+ _largestSeenTime.getAndUpdate(time -> Math.max(time, maxDedupTime));
+ if (isOutOfMetadataTTL(maxDedupTime)) {
+ String action = oldSegment == null ? "adding" : "replacing";
+ _logger.info("Skip {} segment: {} as max dedupTime: {} is out of TTL:
{}", action, newSegment.getSegmentName(),
+ maxDedupTime, _metadataTTL);
+ return;
+ }
+ }
try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new
DedupUtils.DedupRecordInfoReader(newSegment,
_primaryKeyColumns, _dedupTimeColumn)) {
Iterator<DedupRecordInfo> dedupRecordInfoIterator =
@@ -158,6 +169,15 @@ public abstract class BasePartitionDedupMetadataManager
implements PartitionDedu
_logger.info("Skip removing segment: {} because metadata manager is
already stopped", segment.getSegmentName());
return;
}
+ // Skip removing the dedup metadata of segment out of TTL. The expired
metadata is removed in batches.
+ if (_metadataTTL > 0) {
+ double maxDedupTime = getMaxDedupTime(segment);
+ if (isOutOfMetadataTTL(maxDedupTime)) {
+ _logger.info("Skip removing segment: {} as max dedupTime: {} is out of
TTL: {}", segment.getSegmentName(),
+ maxDedupTime, _metadataTTL);
+ return;
+ }
+ }
try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new
DedupUtils.DedupRecordInfoReader(segment,
_primaryKeyColumns, _dedupTimeColumn)) {
Iterator<DedupRecordInfo> dedupRecordInfoIterator =
@@ -175,8 +195,26 @@ public abstract class BasePartitionDedupMetadataManager
implements PartitionDedu
protected abstract void doRemoveSegment(IndexSegment segment,
Iterator<DedupRecordInfo> dedupRecordInfoIterator);
+ protected boolean isOutOfMetadataTTL(double dedupTime) {
+ return _metadataTTL > 0 && dedupTime < _largestSeenTime.get() -
_metadataTTL;
+ }
+
+ protected double getMaxDedupTime(IndexSegment segment) {
+ return ((Number)
segment.getSegmentMetadata().getColumnMetadataMap().get(_dedupTimeColumn)
+ .getMaxValue()).doubleValue();
+ }
+
+ protected File getWatermarkFile() {
+ // Use 'dedup' suffix to avoid conflicts with upsert watermark file, as
it's possible that a table is changed
+ // from using dedup to upsert and the watermark should be re-calculated
based on upsert comparison column.
+ return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION
+ _partitionId + ".dedup");
+ }
+
@Override
public void removeExpiredPrimaryKeys() {
+ if (_metadataTTL <= 0) {
+ return;
+ }
if (!startOperation()) {
_logger.info("Skip removing expired primary keys because metadata
manager is already stopped");
return;
@@ -184,6 +222,7 @@ public abstract class BasePartitionDedupMetadataManager
implements PartitionDedu
try {
long startTime = System.currentTimeMillis();
doRemoveExpiredPrimaryKeys();
+ WatermarkUtils.persistWatermark(_largestSeenTime.get(),
getWatermarkFile());
long duration = System.currentTimeMillis() - startTime;
_serverMetrics.addTimedTableValue(_tableNameWithType,
ServerTimer.DEDUP_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS,
duration, TimeUnit.MILLISECONDS);
@@ -251,10 +290,6 @@ public abstract class BasePartitionDedupMetadataManager
implements PartitionDedu
_logger.info("Closed the metadata manager");
}
- protected boolean isOutOfMetadataTTL(double dedupTime) {
- return _metadataTTL > 0 && dedupTime < _largestSeenTime.get() -
_metadataTTL;
- }
-
protected abstract long getNumPrimaryKeys();
protected void updatePrimaryKeyGauge(long numPrimaryKeys) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
index 5cfb2cea42..4461266e35 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
@@ -45,7 +45,6 @@ class ConcurrentMapPartitionDedupMetadataManager extends
BasePartitionDedupMetad
while (dedupRecordInfoIteratorOfNewSegment.hasNext()) {
DedupRecordInfo dedupRecordInfo =
dedupRecordInfoIteratorOfNewSegment.next();
double dedupTime = dedupRecordInfo.getDedupTime();
- _largestSeenTime.getAndUpdate(time -> Math.max(time, dedupTime));
_primaryKeyToSegmentAndTimeMap.compute(HashUtils.hashPrimaryKey(dedupRecordInfo.getPrimaryKey(),
_hashFunction),
(primaryKey, segmentAndTime) -> {
// Stale metadata is treated as not existing when checking for
deduplicates.
@@ -94,10 +93,8 @@ class ConcurrentMapPartitionDedupMetadataManager extends
BasePartitionDedupMetad
@Override
protected void doRemoveExpiredPrimaryKeys() {
- if (_metadataTTL > 0) {
- double smallestTimeToKeep = _largestSeenTime.get() - _metadataTTL;
- _primaryKeyToSegmentAndTimeMap.entrySet().removeIf(entry ->
entry.getValue().getRight() < smallestTimeToKeep);
- }
+ double smallestTimeToKeep = _largestSeenTime.get() - _metadataTTL;
+ _primaryKeyToSegmentAndTimeMap.entrySet().removeIf(entry ->
entry.getValue().getRight() < smallestTimeToKeep);
}
@Override
@@ -108,7 +105,9 @@ class ConcurrentMapPartitionDedupMetadataManager extends
BasePartitionDedupMetad
return true;
}
try {
- _largestSeenTime.getAndUpdate(time -> Math.max(time,
dedupRecordInfo.getDedupTime()));
+ if (_metadataTTL > 0) {
+ _largestSeenTime.getAndUpdate(time -> Math.max(time,
dedupRecordInfo.getDedupTime()));
+ }
AtomicBoolean present = new AtomicBoolean(false);
_primaryKeyToSegmentAndTimeMap.compute(HashUtils.hashPrimaryKey(dedupRecordInfo.getPrimaryKey(),
_hashFunction),
(primaryKey, segmentAndTime) -> {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
index 4bb7929ffe..ea9ac9117b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.segment.local.dedup;
-
class ConcurrentMapTableDedupMetadataManager extends
BaseTableDedupMetadataManager {
protected PartitionDedupMetadataManager
createPartitionDedupMetadataManager(Integer partitionId) {
return new ConcurrentMapPartitionDedupMetadataManager(_tableNameWithType,
partitionId, _dedupContext);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 18142e2981..9a868b34c5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -21,12 +21,8 @@ package org.apache.pinot.segment.local.upsert;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AtomicDouble;
-import java.io.DataOutputStream;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -45,7 +41,6 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
-import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.Utils;
@@ -66,6 +61,7 @@ import
org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.local.utils.WatermarkUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
@@ -184,10 +180,11 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
Preconditions.checkState(_comparisonColumns.size() == 1,
"Upsert TTL does not work with multiple comparison columns");
Preconditions.checkState(_metadataTTL <= 0 || _enableSnapshot, "Upsert
metadata TTL must have snapshot enabled");
- _largestSeenComparisonValue = new AtomicDouble(loadWatermark());
+ _largestSeenComparisonValue =
+ new AtomicDouble(WatermarkUtils.loadWatermark(getWatermarkFile(),
TTL_WATERMARK_NOT_SET));
} else {
_largestSeenComparisonValue = new AtomicDouble(TTL_WATERMARK_NOT_SET);
- deleteWatermark();
+ WatermarkUtils.deleteWatermark(getWatermarkFile());
}
}
@@ -1013,7 +1010,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// updated validDocIds bitmaps. If the TTL watermark is persisted first,
segments out of TTL may get loaded with
// stale bitmaps or even no bitmap snapshots to use.
if (isTTLEnabled()) {
- persistWatermark(_largestSeenComparisonValue.get());
+ WatermarkUtils.persistWatermark(_largestSeenComparisonValue.get(),
getWatermarkFile());
}
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, numImmutableSegments);
@@ -1030,59 +1027,6 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
numConsumingSegments, System.currentTimeMillis() - startTimeMs);
}
- /**
- * Loads watermark from the file if exists.
- */
- protected double loadWatermark() {
- File watermarkFile = getWatermarkFile();
- if (watermarkFile.exists()) {
- try {
- byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
- double watermark = ByteBuffer.wrap(bytes).getDouble();
- _logger.info("Loaded watermark: {} from file for table: {}
partition_id: {}", watermark, _tableNameWithType,
- _partitionId);
- return watermark;
- } catch (Exception e) {
- _logger.warn("Caught exception while loading watermark file: {},
skipping", watermarkFile);
- }
- }
- return TTL_WATERMARK_NOT_SET;
- }
-
- /**
- * Persists watermark to the file.
- */
- protected void persistWatermark(double watermark) {
- File watermarkFile = getWatermarkFile();
- try {
- if (watermarkFile.exists()) {
- if (!FileUtils.deleteQuietly(watermarkFile)) {
- _logger.warn("Cannot delete watermark file: {}, skipping",
watermarkFile);
- return;
- }
- }
- try (OutputStream outputStream = new FileOutputStream(watermarkFile,
false);
- DataOutputStream dataOutputStream = new
DataOutputStream(outputStream)) {
- dataOutputStream.writeDouble(watermark);
- }
- _logger.info("Persisted watermark: {} to file: {}", watermark,
watermarkFile);
- } catch (Exception e) {
- _logger.warn("Caught exception while persisting watermark file: {},
skipping", watermarkFile);
- }
- }
-
- /**
- * Deletes the watermark file.
- */
- protected void deleteWatermark() {
- File watermarkFile = getWatermarkFile();
- if (watermarkFile.exists()) {
- if (!FileUtils.deleteQuietly(watermarkFile)) {
- _logger.warn("Cannot delete watermark file: {}, skipping",
watermarkFile);
- }
- }
- }
-
protected File getWatermarkFile() {
return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION
+ _partitionId);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/WatermarkUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/WatermarkUtils.java
new file mode 100644
index 0000000000..d19591db4e
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/WatermarkUtils.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utils methods to manage the TTL watermark for dedup and upsert tables, as
both share very similar logic.
+ */
+public class WatermarkUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(WatermarkUtils.class);
+
+ private WatermarkUtils() {
+ }
+
+ /**
+ * Loads watermark from the file if exists.
+ */
+ public static double loadWatermark(File watermarkFile, double
defaultWatermark) {
+ if (watermarkFile.exists()) {
+ try {
+ byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+ double watermark = ByteBuffer.wrap(bytes).getDouble();
+ LOGGER.info("Loaded watermark: {} from file: {}", watermark,
watermarkFile);
+ return watermark;
+ } catch (Exception e) {
+ LOGGER.warn("Failed to load watermark from file: {}, skipping",
watermarkFile);
+ }
+ }
+ return defaultWatermark;
+ }
+
+ /**
+ * Persists watermark to the file.
+ */
+ public static void persistWatermark(double watermark, File watermarkFile) {
+ try {
+ if (watermarkFile.exists()) {
+ if (!FileUtils.deleteQuietly(watermarkFile)) {
+ LOGGER.warn("Cannot delete watermark file: {} to persist watermark:
{}, skipping", watermarkFile, watermark);
+ return;
+ }
+ }
+ try (OutputStream outputStream = new FileOutputStream(watermarkFile,
false);
+ DataOutputStream dataOutputStream = new
DataOutputStream(outputStream)) {
+ dataOutputStream.writeDouble(watermark);
+ }
+ LOGGER.info("Persisted watermark: {} to file: {}", watermark,
watermarkFile);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to persist watermark: {} to file: {}, skipping",
watermark, watermarkFile);
+ }
+ }
+
+ /**
+ * Deletes the watermark file.
+ */
+ public static void deleteWatermark(File watermarkFile) {
+ if (watermarkFile.exists()) {
+ if (!FileUtils.deleteQuietly(watermarkFile)) {
+ LOGGER.warn("Cannot delete watermark file: {}, skipping",
watermarkFile);
+ }
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
index c7855c1eb3..c0697eb4c3 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -30,6 +31,7 @@ import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.local.utils.WatermarkUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -38,6 +40,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -50,17 +53,27 @@ import static org.testng.Assert.assertTrue;
public class ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
+
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.class.getSimpleName());
private static final int METADATA_TTL = 10000;
private static final String DEDUP_TIME_COLUMN_NAME = "dedupTimeColumn";
private DedupContext.Builder _dedupContextBuilder;
@BeforeMethod
- public void setUpContextBuilder() {
+ public void setUpContextBuilder()
+ throws IOException {
+ FileUtils.forceMkdir(TEMP_DIR);
_dedupContextBuilder = new DedupContext.Builder();
_dedupContextBuilder.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setMetadataTTL(METADATA_TTL)
.setDedupTimeColumn(DEDUP_TIME_COLUMN_NAME).setTableIndexDir(mock(File.class))
-
.setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class));
+
.setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class))
+ .setTableIndexDir(TEMP_DIR);
+ }
+
+ @AfterMethod
+ public void cleanup() {
+ FileUtils.deleteQuietly(TEMP_DIR);
}
@Test
@@ -134,7 +147,6 @@ public class
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
dedupRecordInfoIterator =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 10);
metadataManager.doRemoveSegment(segment, dedupRecordInfoIterator);
assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 0);
- assertEquals(metadataManager._largestSeenTime.get(), 9000);
metadataManager.stop();
metadataManager.close();
@@ -204,7 +216,6 @@ public class
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
private void
verifyInitialSegmentAddition(ConcurrentMapPartitionDedupMetadataManager
metadataManager,
IndexSegment segment, HashFunction hashFunction) {
- assertEquals(metadataManager._largestSeenTime.get(), 9000);
assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 10);
verifyInMemoryState(metadataManager, 0, 10, segment, hashFunction);
}
@@ -245,23 +256,23 @@ public class
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
Iterator<DedupRecordInfo> dedupRecordInfoIterator2 =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
metadataManager.doAddOrReplaceSegment(null, segment2,
dedupRecordInfoIterator2);
+
+ metadataManager._largestSeenTime.set(19000);
metadataManager.removeExpiredPrimaryKeys();
assertEquals(metadataManager.getNumPrimaryKeys(), 11);
assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 11);
verifyInMemoryState(metadataManager, 9, 1, segment1, hashFunction);
verifyInMemoryState(metadataManager, 10, 10, segment2, hashFunction);
- assertEquals(metadataManager._largestSeenTime.get(), 19000);
+
assertEquals(WatermarkUtils.loadWatermark(metadataManager.getWatermarkFile(),
-1), 19000);
dedupRecordInfoIterator1 =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader1, 10);
metadataManager.doRemoveSegment(segment1, dedupRecordInfoIterator1);
assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 10);
verifyInMemoryState(metadataManager, 10, 10, segment2, hashFunction);
- assertEquals(metadataManager._largestSeenTime.get(), 19000);
dedupRecordInfoIterator2 =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
metadataManager.doRemoveSegment(segment2, dedupRecordInfoIterator2);
assertTrue(metadataManager._primaryKeyToSegmentAndTimeMap.isEmpty());
- assertEquals(metadataManager._largestSeenTime.get(), 19000);
metadataManager.stop();
metadataManager.close();
@@ -294,22 +305,21 @@ public class
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
Iterator<DedupRecordInfo> dedupRecordInfoIterator2 =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
metadataManager.doAddOrReplaceSegment(null, segment2,
dedupRecordInfoIterator2);
+ metadataManager._largestSeenTime.set(19000);
metadataManager.removeExpiredPrimaryKeys();
assertEquals(metadataManager.getNumPrimaryKeys(), 11);
assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 11);
verifyInMemoryState(metadataManager, 10, 10, segment2, hashFunction);
- assertEquals(metadataManager._largestSeenTime.get(), 19000);
+
assertEquals(WatermarkUtils.loadWatermark(metadataManager.getWatermarkFile(),
-1), 19000);
dedupRecordInfoIterator2 =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
metadataManager.doRemoveSegment(segment2, dedupRecordInfoIterator2);
assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1);
verifyInMemoryState(metadataManager, 9, 1, segment1, hashFunction);
- assertEquals(metadataManager._largestSeenTime.get(), 19000);
dedupRecordInfoIterator1 =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader1, 10);
metadataManager.doRemoveSegment(segment1, dedupRecordInfoIterator1);
assertTrue(metadataManager._primaryKeyToSegmentAndTimeMap.isEmpty());
- assertEquals(metadataManager._largestSeenTime.get(), 19000);
metadataManager.stop();
metadataManager.close();
@@ -351,7 +361,6 @@ public class
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1);
assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.get(primaryKeyHash),
Pair.of(immutableSegment, 25000.0));
- assertEquals(metadataManager._largestSeenTime.get(), 25000);
metadataManager.stop();
metadataManager.close();
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
index bab3682b63..f7eadeaf09 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -36,6 +37,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.mockito.Mockito;
import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -45,14 +47,24 @@ import static org.testng.Assert.assertSame;
public class ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
+
ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.class.getSimpleName());
private DedupContext.Builder _dedupContextBuilder;
@BeforeMethod
- public void setUpContextBuilder() {
+ public void setUpContextBuilder()
+ throws IOException {
+ FileUtils.forceMkdir(TEMP_DIR);
_dedupContextBuilder = new DedupContext.Builder();
_dedupContextBuilder.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setTableIndexDir(mock(File.class))
-
.setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class));
+
.setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class))
+ .setTableIndexDir(TEMP_DIR);
+ }
+
+ @AfterMethod
+ public void cleanup() {
+ FileUtils.deleteQuietly(TEMP_DIR);
}
@Test
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
index 36c5627902..8c4594dc91 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
@@ -26,6 +26,7 @@ import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
@@ -49,6 +50,8 @@ import org.testng.annotations.Test;
public class MutableSegmentDedupeTest {
+ private static final File TEMP_DIR =
+ new File(FileUtils.getTempDirectory(),
MutableSegmentDedupeTest.class.getSimpleName());
private static final String SCHEMA_FILE_PATH = "data/test_dedup_schema.json";
private static final String DATA_FILE_PATH = "data/test_dedup_data.json";
private MutableSegmentImpl _mutableSegmentImpl;
@@ -88,17 +91,18 @@ public class MutableSegmentDedupeTest {
TableConfig tableConfig = Mockito.mock(TableConfig.class);
Mockito.when(tableConfig.getTableName()).thenReturn("testTable_REALTIME");
Mockito.when(tableConfig.getDedupConfig()).thenReturn(dedupConfig);
- SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig
- = Mockito.mock(SegmentsValidationAndRetentionConfig.class);
+ SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig =
+ Mockito.mock(SegmentsValidationAndRetentionConfig.class);
Mockito.when(tableConfig.getValidationConfig()).thenReturn(segmentsValidationAndRetentionConfig);
Mockito.when(segmentsValidationAndRetentionConfig.getTimeColumnName()).thenReturn("secondsSinceEpoch");
TableDataManager tableDataManager = Mockito.mock(TableDataManager.class);
-
Mockito.when(tableDataManager.getTableDataDir()).thenReturn(Mockito.mock(File.class));
+ Mockito.when(tableDataManager.getTableDataDir()).thenReturn(TEMP_DIR);
return TableDedupMetadataManagerFactory.create(tableConfig, schema,
tableDataManager,
Mockito.mock(ServerMetrics.class));
}
- public List<Map<String, String>> loadJsonFile(String filePath) throws
IOException {
+ public List<Map<String, String>> loadJsonFile(String filePath)
+ throws IOException {
URL resourceUrl = this.getClass().getClassLoader().getResource(filePath);
if (resourceUrl == null) {
throw new IllegalArgumentException("File not found: " + filePath);
@@ -165,8 +169,8 @@ public class MutableSegmentDedupeTest {
}
}
- private void verifyGeneratedSegmentDataAgainstRawData(
- int docId, int rawDataIndex, List<Map<String, String>> rawData) {
+ private void verifyGeneratedSegmentDataAgainstRawData(int docId, int
rawDataIndex,
+ List<Map<String, String>> rawData) {
for (String columnName : rawData.get(0).keySet()) {
Assert.assertEquals(String.valueOf(_mutableSegmentImpl.getValue(docId,
columnName)),
String.valueOf(rawData.get(rawDataIndex).get(columnName)));
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 278f0f5ef5..90c03bb5de 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -40,6 +40,7 @@ import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import
org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager.RecordLocation;
import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.local.utils.WatermarkUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
@@ -228,10 +229,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, _contextBuilder.build());
double currentTimeMs = System.currentTimeMillis();
- upsertMetadataManager.persistWatermark(currentTimeMs);
+ WatermarkUtils.persistWatermark(currentTimeMs,
upsertMetadataManager.getWatermarkFile());
assertTrue(new File(INDEX_DIR, V1Constants.TTL_WATERMARK_TABLE_PARTITION +
0).exists());
- double watermark = upsertMetadataManager.loadWatermark();
+ double watermark =
WatermarkUtils.loadWatermark(upsertMetadataManager.getWatermarkFile(), -1);
assertEquals(watermark, currentTimeMs);
ImmutableSegmentImpl segment =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]