This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 983d3563ef add metadata ttl field to DedupConfig (#13636)
983d3563ef is described below
commit 983d3563ef9717648a24cb31e9bc2545054226c3
Author: Haitao Zhang <[email protected]>
AuthorDate: Fri Aug 16 12:09:05 2024 -0700
add metadata ttl field to DedupConfig (#13636)
* add metadata ttl field to DedupConfig
* implement dedup metadata ttl handling logic in
ConcurrentMapPartitionDedupMetadataManager
---
.../apache/pinot/common/metrics/ServerTimer.java | 5 +-
.../common/utils/config/TableConfigSerDeTest.java | 30 ++-
.../realtime/RealtimeSegmentDataManager.java | 9 +-
.../manager/realtime/RealtimeTableDataManager.java | 31 ++-
.../dedup/BasePartitionDedupMetadataManager.java | 246 +++++++++++++++++
.../local/dedup/BaseTableDedupMetadataManager.java | 61 ++++-
...ConcurrentMapPartitionDedupMetadataManager.java | 171 ++++++------
.../ConcurrentMapTableDedupMetadataManager.java | 5 +-
.../pinot/segment/local/dedup/DedupContext.java | 159 +++++++++++
...upMetadataManager.java => DedupRecordInfo.java} | 25 +-
.../pinot/segment/local/dedup/DedupUtils.java | 95 +++++++
.../local/dedup/PartitionDedupMetadataManager.java | 44 +++-
.../local/dedup/TableDedupMetadataManager.java | 8 +-
.../indexsegment/mutable/MutableSegmentImpl.java | 21 +-
.../local/realtime/impl/RealtimeSegmentConfig.java | 16 +-
.../local/segment/readers/PrimaryKeyReader.java | 69 +++++
.../upsert/BasePartitionUpsertMetadataManager.java | 4 +-
...nUpsertMetadataManagerForConsistentDeletes.java | 3 +-
.../pinot/segment/local/upsert/UpsertUtils.java | 51 +---
...apPartitionDedupMetadataManagerWithTTLTest.java | 291 +++++++++++++++++++++
...artitionDedupMetadataManagerWithoutTTLTest.java | 158 +++++++++++
.../pinot/segment/local/dedup/DedupTestUtils.java | 57 ++++
.../dedup/PartitionDedupMetadataManagerTest.java | 171 ------------
.../mutable/MutableSegmentDedupeTest.java | 93 ++++++-
.../mutable/MutableSegmentImplTestUtils.java | 17 +-
.../MutableSegmentImplUpsertComparisonColTest.java | 2 +-
.../mutable/MutableSegmentImplUpsertTest.java | 2 +-
.../src/test/resources/data/test_dedup_data.json | 12 +-
.../src/test/resources/data/test_dedup_schema.json | 4 +
.../apache/pinot/spi/config/table/DedupConfig.java | 40 ++-
.../apache/pinot/spi/config/table/TableConfig.java | 10 +
31 files changed, 1552 insertions(+), 358 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index 79e1eff8e0..b3e5e70641 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -52,10 +52,13 @@ public enum ServerTimer implements AbstractMetrics.Timer {
UPSERT_PRELOAD_TIME_MS("milliseconds", false,
"Total time taken to preload a table partition of an upsert table with
upsert snapshot"),
UPSERT_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false,
- "Total time taken to delete expired primary keys based on metadataTTL or
deletedKeysTTL"),
+ "Total time taken to delete expired upsert primary keys based on
metadataTTL or deletedKeysTTL"),
GRPC_QUERY_EXECUTION_MS("milliseconds", false, "Total execution time of a
successful query over gRPC"),
UPSERT_SNAPSHOT_TIME_MS("milliseconds", false, "Total time taken to take
upsert table snapshot"),
+ DEDUP_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false,
+ "Total time taken to delete expired dedup primary keys based on
metadataTTL or deletedKeysTTL"),
+
// Multi-stage
/**
* Time spent building the hash table for the join.
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index dc9235d793..47de3d6225 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -259,12 +259,22 @@ public class TableConfigSerDeTest {
checkTableConfigWithUpsertConfig(TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
}
{
- // with dedup config
+ // with dedup config - without metadata ttl and metadata time column
DedupConfig dedupConfig = new DedupConfig(true, HashFunction.MD5);
TableConfig tableConfig =
tableConfigBuilder.setDedupConfig(dedupConfig).build();
// Serialize then de-serialize
-
checkTableConfigWithDedupConfig(JsonUtils.stringToObject(tableConfig.toJsonString(),
TableConfig.class));
-
checkTableConfigWithDedupConfig(TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
+ checkTableConfigWithDedupConfigWithoutTTL(
+ JsonUtils.stringToObject(tableConfig.toJsonString(),
TableConfig.class));
+ checkTableConfigWithDedupConfigWithoutTTL(
+
TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
+ }
+ {
+ // with dedup config - with metadata ttl and metadata time column
+ DedupConfig dedupConfig = new DedupConfig(true, HashFunction.MD5, null,
null, 10, "dedupTimeColumn");
+ TableConfig tableConfig =
tableConfigBuilder.setDedupConfig(dedupConfig).build();
+ // Serialize then de-serialize
+
checkTableConfigWithDedupConfigWithTTL(JsonUtils.stringToObject(tableConfig.toJsonString(),
TableConfig.class));
+
checkTableConfigWithDedupConfigWithTTL(TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
}
{
// with SegmentsValidationAndRetentionConfig
@@ -556,11 +566,23 @@ public class TableConfigSerDeTest {
assertEquals(upsertConfig.getMode(), UpsertConfig.Mode.FULL);
}
- private void checkTableConfigWithDedupConfig(TableConfig tableConfig) {
+ private void checkTableConfigWithDedupConfigWithoutTTL(TableConfig
tableConfig) {
+ DedupConfig dedupConfig = tableConfig.getDedupConfig();
+ assertNotNull(dedupConfig);
+
+ assertTrue(dedupConfig.isDedupEnabled());
+ assertEquals(dedupConfig.getHashFunction(), HashFunction.MD5);
+ assertEquals(dedupConfig.getMetadataTTL(), 0);
+ assertNull(dedupConfig.getDedupTimeColumn());
+ }
+
+ private void checkTableConfigWithDedupConfigWithTTL(TableConfig tableConfig)
{
DedupConfig dedupConfig = tableConfig.getDedupConfig();
assertNotNull(dedupConfig);
assertTrue(dedupConfig.isDedupEnabled());
assertEquals(dedupConfig.getHashFunction(), HashFunction.MD5);
+ assertEquals(dedupConfig.getMetadataTTL(), 10);
+ assertEquals(dedupConfig.getDedupTimeColumn(), "dedupTimeColumn");
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 73915c5a2e..b04bf4a36e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -250,6 +250,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private final AtomicBoolean _acquiredConsumerSemaphore;
private final ServerMetrics _serverMetrics;
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+ private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
private final BooleanSupplier _isReadyToConsumeData;
private final MutableSegmentImpl _realtimeSegment;
private volatile StreamPartitionMsgOffset _currentOffset; // Next offset to
be consumed
@@ -722,6 +723,10 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
}
}
+ if (_partitionDedupMetadataManager != null &&
_tableConfig.getDedupMetadataTTL() > 0) {
+ _partitionDedupMetadataManager.removeExpiredPrimaryKeys();
+ }
+
while (!_state.isFinal()) {
if (_state.shouldConsume()) {
consumeLoop(); // Consume until we reached the end criteria, or
we are stopped.
@@ -1441,6 +1446,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_schema = schema;
_serverMetrics = serverMetrics;
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
+ _partitionDedupMetadataManager = partitionDedupMetadataManager;
_isReadyToConsumeData = isReadyToConsumeData;
_segmentVersion = indexLoadingConfig.getSegmentVersion();
_instanceId = _realtimeTableDataManager.getInstanceId();
@@ -1560,11 +1566,12 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
.setNullHandlingEnabled(_nullHandlingEnabled)
.setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode())
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
- .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setUpsertComparisonColumns(tableConfig.getUpsertComparisonColumns())
.setUpsertDeleteRecordColumn(tableConfig.getUpsertDeleteRecordColumn())
.setUpsertOutOfOrderRecordColumn(tableConfig.getOutOfOrderRecordColumn())
.setUpsertDropOutOfOrderRecord(tableConfig.isDropOutOfOrderRecord())
+ .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
+ .setDedupTimeColumn(tableConfig.getDedupTimeColumn())
.setFieldConfigList(tableConfig.getFieldConfigList());
// Create message decoder
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 8393da3884..8a99610c75 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -248,12 +248,22 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// Make sure we do metric cleanup when we shut down the table.
// Do this first, so we do not show ingestion lag during shutdown.
_ingestionDelayTracker.shutdown();
- if (_tableUpsertMetadataManager != null) {
+ if (_tableUpsertMetadataManager != null || _tableDedupMetadataManager !=
null) {
// Stop the upsert metadata manager first to prevent removing metadata
when destroying segments
- _tableUpsertMetadataManager.stop();
+ if (_tableUpsertMetadataManager != null) {
+ _tableUpsertMetadataManager.stop();
+ }
+ if (_tableDedupMetadataManager != null) {
+ _tableDedupMetadataManager.stop();
+ }
releaseAndRemoveAllSegments();
try {
- _tableUpsertMetadataManager.close();
+ if (_tableUpsertMetadataManager != null) {
+ _tableUpsertMetadataManager.close();
+ }
+ if (_tableDedupMetadataManager != null) {
+ _tableDedupMetadataManager.close();
+ }
} catch (IOException e) {
_logger.warn("Caught exception while closing upsert metadata manager",
e);
}
@@ -545,14 +555,13 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
return;
}
- // TODO: Change dedup handling to handle segment replacement
if (isDedupEnabled() && immutableSegment instanceof ImmutableSegmentImpl) {
- buildDedupMeta((ImmutableSegmentImpl) immutableSegment);
+ handleDedup((ImmutableSegmentImpl) immutableSegment);
}
super.addSegment(immutableSegment);
}
- private void buildDedupMeta(ImmutableSegmentImpl immutableSegment) {
+ private void handleDedup(ImmutableSegmentImpl immutableSegment) {
// TODO(saurabh) refactor commons code with handleUpsert
String segmentName = immutableSegment.getSegmentName();
Integer partitionGroupId =
@@ -563,7 +572,15 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
PartitionDedupMetadataManager partitionDedupMetadataManager =
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId);
immutableSegment.enableDedup(partitionDedupMetadataManager);
- partitionDedupMetadataManager.addSegment(immutableSegment);
+ SegmentDataManager oldSegmentManager =
_segmentDataManagerMap.get(segmentName);
+ if (oldSegmentManager != null) {
+ LOGGER.info("Replacing mutable segment: {} with immutable segment: {} in
partition dedup metadata manager",
+ oldSegmentManager.getSegment().getSegmentName(), segmentName);
+
partitionDedupMetadataManager.replaceSegment(oldSegmentManager.getSegment(),
immutableSegment);
+ } else {
+ LOGGER.info("Adding immutable segment: {} to partition dedup metadata
manager", segmentName);
+ partitionDedupMetadataManager.addSegment(immutableSegment);
+ }
}
private void handleUpsert(ImmutableSegment immutableSegment) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
new file mode 100644
index 0000000000..b711fdce44
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerTimer;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class BasePartitionDedupMetadataManager implements
PartitionDedupMetadataManager {
+ protected final String _tableNameWithType;
+ protected final List<String> _primaryKeyColumns;
+ protected final int _partitionId;
+ protected final ServerMetrics _serverMetrics;
+ protected final HashFunction _hashFunction;
+ protected final double _metadataTTL;
+ protected final String _dedupTimeColumn;
+ protected final Logger _logger;
+
+ // The following variables are always accessed within synchronized block
+ private boolean _stopped;
+ // Initialize with 1 pending operation to indicate the metadata manager can
take more operations
+ private int _numPendingOperations = 1;
+ private boolean _closed;
+
+ protected BasePartitionDedupMetadataManager(String tableNameWithType, int
partitionId, DedupContext dedupContext) {
+ _tableNameWithType = tableNameWithType;
+ _partitionId = partitionId;
+ _primaryKeyColumns = dedupContext.getPrimaryKeyColumns();
+ _hashFunction = dedupContext.getHashFunction();
+ _serverMetrics = dedupContext.getServerMetrics();
+ _metadataTTL = dedupContext.getMetadataTTL() >= 0 ?
dedupContext.getMetadataTTL() : 0;
+ _dedupTimeColumn = dedupContext.getDedupTimeColumn();
+ if (_metadataTTL > 0) {
+ Preconditions.checkArgument(_dedupTimeColumn != null,
+ "When metadataTTL is configured, metadata time column must be
configured for dedup enabled table: %s",
+ tableNameWithType);
+ }
+ _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId +
"-" + getClass().getSimpleName());
+ }
+
+ @Override
+ public boolean checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment
indexSegment) {
+ throw new UnsupportedOperationException(
+ "checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment indexSegment)
is " + "deprecated!");
+ }
+
+ @Override
+ public void addSegment(IndexSegment segment) {
+ if (!startOperation()) {
+ _logger.info("Skip adding segment: {} because dedup metadata manager is
already stopped",
+ segment.getSegmentName());
+ return;
+ }
+ try {
+ addOrReplaceSegment(null, segment);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Caught exception while adding segment: %s of table:
%s to %s", segment.getSegmentName(),
+ _tableNameWithType, this.getClass().getSimpleName()), e);
+ } finally {
+ finishOperation();
+ }
+ }
+
+ @Override
+ public void replaceSegment(IndexSegment oldSegment, IndexSegment newSegment)
{
+ if (!startOperation()) {
+ _logger.info("Skip replacing segment: {} with segment: {} because dedup
metadata manager is already stopped",
+ oldSegment.getSegmentName(), newSegment.getSegmentName());
+ return;
+ }
+ try {
+ addOrReplaceSegment(oldSegment, newSegment);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Caught exception while replacing segment: %s with
segment: %s of table: %s in %s",
+ oldSegment.getSegmentName(), newSegment.getSegmentName(),
_tableNameWithType,
+ this.getClass().getSimpleName()), e);
+ } finally {
+ finishOperation();
+ }
+ }
+
+ private void addOrReplaceSegment(@Nullable IndexSegment oldSegment,
IndexSegment newSegment)
+ throws IOException {
+ try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new
DedupUtils.DedupRecordInfoReader(newSegment,
+ _primaryKeyColumns, _dedupTimeColumn)) {
+ Iterator<DedupRecordInfo> dedupRecordInfoIterator =
+ DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader,
newSegment.getSegmentMetadata().getTotalDocs());
+ doAddOrReplaceSegment(oldSegment, newSegment, dedupRecordInfoIterator);
+ updatePrimaryKeyGauge();
+ }
+ }
+
+ /**
+ * Adds the dedup metadata for the new segment if old segment is null; or
replaces the dedup metadata for the given
+ * old segment with the new segment if the old segment is not null.
+ * @param oldSegment The old segment to replace. If null, add the new
segment.
+ * @param newSegment The new segment to add or replace.
+ * @param dedupRecordInfoIteratorOfNewSegment The iterator of dedup record
info of the new segment.
+ */
+ protected abstract void doAddOrReplaceSegment(@Nullable IndexSegment
oldSegment, IndexSegment newSegment,
+ Iterator<DedupRecordInfo> dedupRecordInfoIteratorOfNewSegment);
+
+ @Override
+ public void removeSegment(IndexSegment segment) {
+ if (!startOperation()) {
+ _logger.info("Skip removing segment: {} because metadata manager is
already stopped", segment.getSegmentName());
+ return;
+ }
+ try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new
DedupUtils.DedupRecordInfoReader(segment,
+ _primaryKeyColumns, _dedupTimeColumn)) {
+ Iterator<DedupRecordInfo> dedupRecordInfoIterator =
+ DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader,
segment.getSegmentMetadata().getTotalDocs());
+ doRemoveSegment(segment, dedupRecordInfoIterator);
+ updatePrimaryKeyGauge();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Caught exception while removing segment: %s of table:
%s from %s", segment.getSegmentName(),
+ _tableNameWithType, this.getClass().getSimpleName()), e);
+ } finally {
+ finishOperation();
+ }
+ }
+
+ protected abstract void doRemoveSegment(IndexSegment segment,
Iterator<DedupRecordInfo> dedupRecordInfoIterator);
+
+ @Override
+ public void removeExpiredPrimaryKeys() {
+ if (!startOperation()) {
+ _logger.info("Skip removing expired primary keys because metadata
manager is already stopped");
+ return;
+ }
+ try {
+ long startTime = System.currentTimeMillis();
+ doRemoveExpiredPrimaryKeys();
+ long duration = System.currentTimeMillis() - startTime;
+ _serverMetrics.addTimedTableValue(_tableNameWithType,
ServerTimer.DEDUP_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS,
+ duration, TimeUnit.MILLISECONDS);
+ } finally {
+ finishOperation();
+ }
+ }
+
+ /**
+ * Removes all primary keys that have dedup time smaller than
(largestSeenDedupTime - TTL).
+ */
+ protected abstract void doRemoveExpiredPrimaryKeys();
+
+ protected synchronized boolean startOperation() {
+ if (_stopped || _numPendingOperations == 0) {
+ return false;
+ }
+ _numPendingOperations++;
+ return true;
+ }
+
+ protected synchronized void finishOperation() {
+ _numPendingOperations--;
+ if (_numPendingOperations == 0) {
+ notifyAll();
+ }
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (_stopped) {
+ _logger.warn("Metadata manager is already stopped");
+ return;
+ }
+ _stopped = true;
+ _numPendingOperations--;
+ _logger.info("Stopped the metadata manager with {} pending operations,
current primary key count: {}",
+ _numPendingOperations, getNumPrimaryKeys());
+ }
+
+ @Override
+ public synchronized void close()
+ throws IOException {
+ Preconditions.checkState(_stopped, "Must stop the metadata manager before
closing it");
+ if (_closed) {
+ _logger.warn("Metadata manager is already closed");
+ return;
+ }
+ _closed = true;
+ _logger.info("Closing the metadata manager");
+ while (_numPendingOperations != 0) {
+ _logger.info("Waiting for {} pending operations to finish",
_numPendingOperations);
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(
+ String.format("Interrupted while waiting for %d pending operations
to finish", _numPendingOperations), e);
+ }
+ }
+ doClose();
+ // We don't remove the segment from the metadata manager when
+ // it's closed. This was done to make table deletion faster. Since we
don't remove the segment, we never decrease
+ // the primary key count. So, we set the primary key count to 0 here.
+ updatePrimaryKeyGauge(0);
+ _logger.info("Closed the metadata manager");
+ }
+
+ protected abstract long getNumPrimaryKeys();
+
+ protected void updatePrimaryKeyGauge(long numPrimaryKeys) {
+ _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
+ numPrimaryKeys);
+ }
+
+ protected void updatePrimaryKeyGauge() {
+ updatePrimaryKeyGauge(getNumPrimaryKeys());
+ }
+
+ protected void doClose()
+ throws IOException {
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
index 8e1bac69d1..80639ebd5e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.dedup;
import com.google.common.base.Preconditions;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,32 +27,51 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.table.DedupConfig;
-import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
-abstract class BaseTableDedupMetadataManager implements
TableDedupMetadataManager {
+public abstract class BaseTableDedupMetadataManager implements
TableDedupMetadataManager {
protected final Map<Integer, PartitionDedupMetadataManager>
_partitionMetadataManagerMap = new ConcurrentHashMap<>();
protected String _tableNameWithType;
- protected List<String> _primaryKeyColumns;
- protected ServerMetrics _serverMetrics;
- protected HashFunction _hashFunction;
+ protected DedupContext _dedupContext;
@Override
public void init(TableConfig tableConfig, Schema schema, TableDataManager
tableDataManager,
ServerMetrics serverMetrics) {
_tableNameWithType = tableConfig.getTableName();
- _primaryKeyColumns = schema.getPrimaryKeyColumns();
- Preconditions.checkArgument(!CollectionUtils.isEmpty(_primaryKeyColumns),
+ List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
+ Preconditions.checkArgument(!CollectionUtils.isEmpty(primaryKeyColumns),
"Primary key columns must be configured for dedup enabled table: %s",
_tableNameWithType);
- _serverMetrics = serverMetrics;
-
DedupConfig dedupConfig = tableConfig.getDedupConfig();
Preconditions.checkArgument(dedupConfig != null, "Dedup must be enabled
for table: %s", _tableNameWithType);
- _hashFunction = dedupConfig.getHashFunction();
+ double metadataTTL = dedupConfig.getMetadataTTL();
+ String dedupTimeColumn = dedupConfig.getDedupTimeColumn();
+ if (dedupTimeColumn == null) {
+ dedupTimeColumn = tableConfig.getValidationConfig().getTimeColumnName();
+ }
+ if (metadataTTL > 0) {
+ Preconditions.checkArgument(dedupTimeColumn != null,
+ "When metadataTTL is configured, metadata time column or time column
must be configured for "
+ + "dedup enabled table: %s", _tableNameWithType);
+ }
+
+ DedupContext.Builder dedupContextBuider = new DedupContext.Builder();
+ dedupContextBuider
+ .setTableConfig(tableConfig)
+ .setSchema(schema)
+ .setPrimaryKeyColumns(primaryKeyColumns)
+ .setHashFunction(dedupConfig.getHashFunction())
+ .setMetadataTTL(metadataTTL)
+ .setDedupTimeColumn(dedupTimeColumn)
+ .setTableIndexDir(tableDataManager.getTableDataDir())
+ .setTableDataManager(tableDataManager)
+ .setServerMetrics(serverMetrics);
+ _dedupContext = dedupContextBuider.build();
+
+ initCustomVariables();
}
public PartitionDedupMetadataManager getOrCreatePartitionManager(int
partitionId) {
@@ -62,4 +82,25 @@ abstract class BaseTableDedupMetadataManager implements
TableDedupMetadataManage
* Create PartitionDedupMetadataManager for given partition id.
*/
abstract protected PartitionDedupMetadataManager
createPartitionDedupMetadataManager(Integer partitionId);
+
+ /**
+ * Can be overridden to initialize custom variables after other variables
are set
+ */
+ protected void initCustomVariables() {
+ }
+
+ @Override
+ public void stop() {
+ for (PartitionDedupMetadataManager metadataManager :
_partitionMetadataManagerMap.values()) {
+ metadataManager.stop();
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ for (PartitionDedupMetadataManager metadataManager :
_partitionMetadataManagerMap.values()) {
+ metadataManager.close();
+ }
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
index a8925e2200..033611160c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
@@ -19,106 +19,121 @@
package org.apache.pinot.segment.local.dedup;
import com.google.common.annotations.VisibleForTesting;
-import java.util.HashMap;
+import com.google.common.util.concurrent.AtomicDouble;
+import java.io.IOException;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metrics.ServerGauge;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.spi.config.table.HashFunction;
-import org.apache.pinot.spi.data.readers.PrimaryKey;
-import org.apache.pinot.spi.utils.ByteArray;
-class ConcurrentMapPartitionDedupMetadataManager implements
PartitionDedupMetadataManager {
- private final String _tableNameWithType;
- private final List<String> _primaryKeyColumns;
- private final int _partitionId;
- private final ServerMetrics _serverMetrics;
- private final HashFunction _hashFunction;
+class ConcurrentMapPartitionDedupMetadataManager extends
BasePartitionDedupMetadataManager {
@VisibleForTesting
- final ConcurrentHashMap<Object, IndexSegment> _primaryKeyToSegmentMap = new
ConcurrentHashMap<>();
+ final AtomicDouble _largestSeenTime = new AtomicDouble(0);
+ @VisibleForTesting
+ final ConcurrentHashMap<Object, Pair<IndexSegment, Double>>
_primaryKeyToSegmentAndTimeMap =
+ new ConcurrentHashMap<>();
- public ConcurrentMapPartitionDedupMetadataManager(String tableNameWithType,
List<String> primaryKeyColumns,
- int partitionId, ServerMetrics serverMetrics, HashFunction hashFunction)
{
- _tableNameWithType = tableNameWithType;
- _primaryKeyColumns = primaryKeyColumns;
- _partitionId = partitionId;
- _serverMetrics = serverMetrics;
- _hashFunction = hashFunction;
+ protected ConcurrentMapPartitionDedupMetadataManager(String
tableNameWithType, int partitionId,
+ DedupContext dedupContext) {
+ super(tableNameWithType, partitionId, dedupContext);
}
- public void addSegment(IndexSegment segment) {
- // Add all PKs to _primaryKeyToSegmentMap
- Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment);
- while (primaryKeyIterator.hasNext()) {
- PrimaryKey pk = primaryKeyIterator.next();
- _primaryKeyToSegmentMap.put(HashUtils.hashPrimaryKey(pk, _hashFunction),
segment);
+ @Override
+ protected void doAddOrReplaceSegment(IndexSegment oldSegment, IndexSegment
newSegment,
+ Iterator<DedupRecordInfo> dedupRecordInfoIteratorOfNewSegment) {
+ String segmentName = newSegment.getSegmentName();
+ while (dedupRecordInfoIteratorOfNewSegment.hasNext()) {
+ DedupRecordInfo dedupRecordInfo =
dedupRecordInfoIteratorOfNewSegment.next();
+ double dedupTime = dedupRecordInfo.getDedupTime();
+ _largestSeenTime.getAndUpdate(time -> Math.max(time, dedupTime));
+
_primaryKeyToSegmentAndTimeMap.compute(HashUtils.hashPrimaryKey(dedupRecordInfo.getPrimaryKey(),
_hashFunction),
+ (primaryKey, segmentAndTime) -> {
+ if (segmentAndTime == null) {
+ return Pair.of(newSegment, dedupTime);
+ }
+ // when oldSegment is null, it means we are adding a new segment
+ // when oldSegment is not null, it means we are replacing an
existing segment
+ if (oldSegment == null) {
+ _logger.warn("When adding a new segment: record in segment: {}
with primary key: {} and dedup "
+ + "time: {} already exists in segment: {} with dedup
time: {}", segmentName,
+ dedupRecordInfo.getPrimaryKey(), dedupTime,
segmentAndTime.getLeft().getSegmentName(),
+ segmentAndTime.getRight());
+ } else {
+ if (segmentAndTime.getLeft() != oldSegment) {
+ _logger.warn("When replacing a segment: record in segment: {}
with primary key: {} and dedup "
+ + "time: {} exists in segment: {} (but not the
segment: {} to replace) with dedup time: {}",
+ segmentName, dedupRecordInfo.getPrimaryKey(), dedupTime,
segmentAndTime.getLeft().getSegmentName(),
+ oldSegment.getSegmentName(), segmentAndTime.getRight());
+ }
+ }
+ // When dedup time is the same, we always keep the latest segment
+ // This will handle segment replacement case correctly - a typical
case is when a mutable segment is
+ // replaced by an immutable segment
+ if (segmentAndTime.getRight() <= dedupTime) {
+ return Pair.of(newSegment, dedupTime);
+ }
+ return segmentAndTime;
+ });
}
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
- _primaryKeyToSegmentMap.size());
}
- public void removeSegment(IndexSegment segment) {
- // TODO(saurabh): Explain reload scenario here
- Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment);
- while (primaryKeyIterator.hasNext()) {
- PrimaryKey pk = primaryKeyIterator.next();
- _primaryKeyToSegmentMap.compute(HashUtils.hashPrimaryKey(pk,
_hashFunction), (primaryKey, currentSegment) -> {
- if (currentSegment == segment) {
- return null;
- } else {
- return currentSegment;
- }
- });
+ @Override
+ protected void doRemoveSegment(IndexSegment segment,
Iterator<DedupRecordInfo> dedupRecordInfoIterator) {
+ while (dedupRecordInfoIterator.hasNext()) {
+ DedupRecordInfo dedupRecordInfo = dedupRecordInfoIterator.next();
+ _primaryKeyToSegmentAndTimeMap.computeIfPresent(
+ HashUtils.hashPrimaryKey(dedupRecordInfo.getPrimaryKey(),
_hashFunction), (primaryKey, segmentAndTime) -> {
+ // do not need to compare dedup time because we are removing the
segment
+ if (segmentAndTime.getLeft() == segment) {
+ return null;
+ } else {
+ return segmentAndTime;
+ }
+ });
}
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
- _primaryKeyToSegmentMap.size());
}
- @VisibleForTesting
- Iterator<PrimaryKey> getPrimaryKeyIterator(IndexSegment segment) {
- Map<String, PinotSegmentColumnReader> columnToReaderMap = new HashMap<>();
- for (String primaryKeyColumn : _primaryKeyColumns) {
- columnToReaderMap.put(primaryKeyColumn, new
PinotSegmentColumnReader(segment, primaryKeyColumn));
+ @Override
+ protected void doRemoveExpiredPrimaryKeys() {
+ if (_metadataTTL > 0) {
+ double smallestTimeToKeep = _largestSeenTime.get() - _metadataTTL;
+ _primaryKeyToSegmentAndTimeMap.entrySet().removeIf(entry ->
entry.getValue().getRight() < smallestTimeToKeep);
}
- int numTotalDocs = segment.getSegmentMetadata().getTotalDocs();
- int numPrimaryKeyColumns = _primaryKeyColumns.size();
- return new Iterator<PrimaryKey>() {
- private int _docId = 0;
+ }
- @Override
- public boolean hasNext() {
- return _docId < numTotalDocs;
+ @Override
+ public boolean checkRecordPresentOrUpdate(DedupRecordInfo dedupRecordInfo,
IndexSegment indexSegment) {
+ if (!startOperation()) {
+ _logger.info("Skip adding record to {} because metadata manager is
already stopped",
+ indexSegment.getSegmentName());
+ return true;
+ }
+ try {
+ _largestSeenTime.getAndUpdate(time -> Math.max(time,
dedupRecordInfo.getDedupTime()));
+ boolean present = _primaryKeyToSegmentAndTimeMap.putIfAbsent(
+ HashUtils.hashPrimaryKey(dedupRecordInfo.getPrimaryKey(),
_hashFunction),
+ Pair.of(indexSegment, dedupRecordInfo.getDedupTime())) != null;
+ if (!present) {
+ _serverMetrics.setValueOfPartitionGauge(_tableNameWithType,
_partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
+ _primaryKeyToSegmentAndTimeMap.size());
}
+ return present;
+ } finally {
+ finishOperation();
+ }
+ }
- @Override
- public PrimaryKey next() {
- Object[] values = new Object[numPrimaryKeyColumns];
- for (int i = 0; i < numPrimaryKeyColumns; i++) {
- Object value =
columnToReaderMap.get(_primaryKeyColumns.get(i)).getValue(_docId);
- if (value instanceof byte[]) {
- value = new ByteArray((byte[]) value);
- }
- values[i] = value;
- }
- _docId++;
- return new PrimaryKey(values);
- }
- };
+ @Override
+ protected long getNumPrimaryKeys() {
+ return _primaryKeyToSegmentAndTimeMap.size();
}
- public boolean checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment
indexSegment) {
- boolean present =
- _primaryKeyToSegmentMap.putIfAbsent(HashUtils.hashPrimaryKey(pk,
_hashFunction), indexSegment) != null;
- if (!present) {
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType,
_partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
- _primaryKeyToSegmentMap.size());
- }
- return present;
+ @Override
+ protected void doClose()
+ throws IOException {
+ _primaryKeyToSegmentAndTimeMap.clear();
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
index 0f3de3aa96..4bb7929ffe 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
@@ -18,10 +18,9 @@
*/
package org.apache.pinot.segment.local.dedup;
-class ConcurrentMapTableDedupMetadataManager extends
BaseTableDedupMetadataManager {
+class ConcurrentMapTableDedupMetadataManager extends
BaseTableDedupMetadataManager {
protected PartitionDedupMetadataManager
createPartitionDedupMetadataManager(Integer partitionId) {
- return new ConcurrentMapPartitionDedupMetadataManager(_tableNameWithType,
_primaryKeyColumns, partitionId,
- _serverMetrics, _hashFunction);
+ return new ConcurrentMapPartitionDedupMetadataManager(_tableNameWithType,
partitionId, _dedupContext);
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
new file mode 100644
index 0000000000..a523f26957
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.List;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class DedupContext {
+ private final TableConfig _tableConfig;
+ private final Schema _schema;
+ private final List<String> _primaryKeyColumns;
+ private final HashFunction _hashFunction;
+ private final double _metadataTTL;
+ private final String _dedupTimeColumn;
+ private final File _tableIndexDir;
+ private final TableDataManager _tableDataManager;
+ private final ServerMetrics _serverMetrics;
+
+ private DedupContext(TableConfig tableConfig, Schema schema, List<String>
primaryKeyColumns,
+ HashFunction hashFunction, double metadataTTL, String dedupTimeColumn,
File tableIndexDir,
+ TableDataManager tableDataManager, ServerMetrics serverMetrics) {
+ _tableConfig = tableConfig;
+ _schema = schema;
+ _primaryKeyColumns = primaryKeyColumns;
+ _hashFunction = hashFunction;
+ _metadataTTL = metadataTTL;
+ _dedupTimeColumn = dedupTimeColumn;
+ _tableIndexDir = tableIndexDir;
+ _tableDataManager = tableDataManager;
+ _serverMetrics = serverMetrics;
+ }
+
+ public TableConfig getTableConfig() {
+ return _tableConfig;
+ }
+
+ public Schema getSchema() {
+ return _schema;
+ }
+
+ public List<String> getPrimaryKeyColumns() {
+ return _primaryKeyColumns;
+ }
+
+ public HashFunction getHashFunction() {
+ return _hashFunction;
+ }
+
+ public double getMetadataTTL() {
+ return _metadataTTL;
+ }
+
+ public String getDedupTimeColumn() {
+ return _dedupTimeColumn;
+ }
+
+ public File getTableIndexDir() {
+ return _tableIndexDir;
+ }
+
+ public TableDataManager getTableDataManager() {
+ return _tableDataManager;
+ }
+
+ public ServerMetrics getServerMetrics() {
+ return _serverMetrics;
+ }
+
+ public static class Builder {
+ private TableConfig _tableConfig;
+ private Schema _schema;
+ private List<String> _primaryKeyColumns;
+ private HashFunction _hashFunction;
+ private double _metadataTTL;
+ private String _dedupTimeColumn;
+ private File _tableIndexDir;
+ private TableDataManager _tableDataManager;
+ private ServerMetrics _serverMetrics;
+
+ public Builder setTableConfig(TableConfig tableConfig) {
+ _tableConfig = tableConfig;
+ return this;
+ }
+
+ public Builder setSchema(Schema schema) {
+ _schema = schema;
+ return this;
+ }
+
+ public Builder setPrimaryKeyColumns(List<String> primaryKeyColumns) {
+ _primaryKeyColumns = primaryKeyColumns;
+ return this;
+ }
+
+ public Builder setHashFunction(HashFunction hashFunction) {
+ _hashFunction = hashFunction;
+ return this;
+ }
+
+ public Builder setMetadataTTL(double metadataTTL) {
+ _metadataTTL = metadataTTL;
+ return this;
+ }
+
+ public Builder setDedupTimeColumn(String deupTimeColumn) {
+ _dedupTimeColumn = deupTimeColumn;
+ return this;
+ }
+
+ public Builder setTableIndexDir(File tableIndexDir) {
+ _tableIndexDir = tableIndexDir;
+ return this;
+ }
+
+ public Builder setTableDataManager(TableDataManager tableDataManager) {
+ _tableDataManager = tableDataManager;
+ return this;
+ }
+
+ public Builder setServerMetrics(ServerMetrics serverMetrics) {
+ _serverMetrics = serverMetrics;
+ return this;
+ }
+
+ public DedupContext build() {
+ Preconditions.checkState(_tableConfig != null, "Table config must be
set");
+ Preconditions.checkState(_schema != null, "Schema must be set");
+ Preconditions.checkState(CollectionUtils.isNotEmpty(_primaryKeyColumns),
"Primary key columns must be set");
+ Preconditions.checkState(_hashFunction != null, "Hash function must be
set");
+ Preconditions.checkState(_tableIndexDir != null, "Table index directory
must be set");
+ return new DedupContext(_tableConfig, _schema, _primaryKeyColumns,
_hashFunction, _metadataTTL, _dedupTimeColumn,
+ _tableIndexDir, _tableDataManager, _serverMetrics);
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupRecordInfo.java
similarity index 62%
copy from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
copy to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupRecordInfo.java
index 0f3de3aa96..191605076d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupRecordInfo.java
@@ -18,10 +18,27 @@
*/
package org.apache.pinot.segment.local.dedup;
-class ConcurrentMapTableDedupMetadataManager extends
BaseTableDedupMetadataManager {
+import org.apache.pinot.spi.data.readers.PrimaryKey;
- protected PartitionDedupMetadataManager
createPartitionDedupMetadataManager(Integer partitionId) {
- return new ConcurrentMapPartitionDedupMetadataManager(_tableNameWithType,
_primaryKeyColumns, partitionId,
- _serverMetrics, _hashFunction);
+
+public class DedupRecordInfo {
+ private final PrimaryKey _primaryKey;
+ private final double _dedupTime;
+
+ public DedupRecordInfo(PrimaryKey primaryKey, double dedupTime) {
+ _primaryKey = primaryKey;
+ _dedupTime = dedupTime;
+ }
+
+ public DedupRecordInfo(PrimaryKey primaryKey) {
+ this(primaryKey, Double.MIN_VALUE);
+ }
+
+ public PrimaryKey getPrimaryKey() {
+ return _primaryKey;
+ }
+
+ public double getDedupTime() {
+ return _dedupTime;
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupUtils.java
new file mode 100644
index 0000000000..2b9c1c3cda
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupUtils.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+
+
+public class DedupUtils {
+ private DedupUtils() {
+ }
+
+ public static class DedupRecordInfoReader implements Closeable {
+ private final PrimaryKeyReader _primaryKeyReader;
+ private final PinotSegmentColumnReader _dedupTimeColumnReader;
+
+ public DedupRecordInfoReader(IndexSegment segment, List<String>
primaryKeyColumns,
+ @Nullable String dedupTimeColumn) {
+ _primaryKeyReader = new PrimaryKeyReader(segment, primaryKeyColumns);
+ if (dedupTimeColumn != null) {
+ _dedupTimeColumnReader = new PinotSegmentColumnReader(segment,
dedupTimeColumn);
+ } else {
+ _dedupTimeColumnReader = null;
+ }
+ }
+
+ @VisibleForTesting
+ public DedupRecordInfoReader(PrimaryKeyReader primaryKeyReader,
+ @Nullable PinotSegmentColumnReader dedupTimeColumnReader) {
+ _primaryKeyReader = primaryKeyReader;
+ _dedupTimeColumnReader = dedupTimeColumnReader;
+ }
+
+ public DedupRecordInfo getDedupRecordInfo(int docId) {
+ PrimaryKey primaryKey = _primaryKeyReader.getPrimaryKey(docId);
+ double dedupTime =
+ (_dedupTimeColumnReader != null) ? ((Number)
_dedupTimeColumnReader.getValue(docId)).doubleValue()
+ : Double.MIN_VALUE;
+ return new DedupRecordInfo(primaryKey, dedupTime);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _primaryKeyReader.close();
+ if (_dedupTimeColumnReader != null) {
+ _dedupTimeColumnReader.close();
+ }
+ }
+ }
+
+ /**
+ * Returns an iterator of {@link DedupRecordInfo} for all the documents from
the segment.
+ */
+ public static Iterator<DedupRecordInfo>
getDedupRecordInfoIterator(DedupRecordInfoReader dedupRecordInfoReader,
+ int numDocs) {
+ return new Iterator<DedupRecordInfo>() {
+ private int _docId = 0;
+
+ @Override
+ public boolean hasNext() {
+ return _docId < numDocs;
+ }
+
+ @Override
+ public DedupRecordInfo next() {
+ return dedupRecordInfoReader.getDedupRecordInfo(_docId++);
+ }
+ };
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
index 82dfc9d5ed..835ce6dfa7 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
@@ -18,24 +18,58 @@
*/
package org.apache.pinot.segment.local.dedup;
+import java.io.Closeable;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.data.readers.PrimaryKey;
-public interface PartitionDedupMetadataManager {
+public interface PartitionDedupMetadataManager extends Closeable {
/**
* Initializes the dedup metadata for the given immutable segment.
*/
- public void addSegment(IndexSegment segment);
+ void addSegment(IndexSegment segment);
+
+ /**
+ * Replaces the dedup metadata for the given old segment with the new
segment.
+ */
+ default void replaceSegment(IndexSegment oldSegment, IndexSegment
newSegment) {
+ // since this is a newly added method, by default, add the new segment to
keep backward compatibility
+ addSegment(newSegment);
+ }
/**
* Removes the dedup metadata for the given segment.
*/
- public void removeSegment(IndexSegment segment);
+ void removeSegment(IndexSegment segment);
+
+ /**
+ * Remove the expired primary keys from the metadata when TTL is enabled.
+ */
+ void removeExpiredPrimaryKeys();
/**
- * Add the primary key to the given segment to the dedup matadata if it was
absent.
- * Returns true if the key was already present.
+ * Add the primary key to the given segment to the dedup matadata if it is
absent.
+ * Returns true if the key was already present, i.e., the new record
associated with the given {@link PrimaryKey}
+ * is a duplicate and should be skipped/dropped.
*/
+ @Deprecated
boolean checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment indexSegment);
+
+ /**
+ * Add the primary key to the given segment to the dedup matadata if it is
absent and with in the retention time.
+ * Returns true if the key was already present, i.e., the new record
associated with the given {@link DedupRecordInfo}
+ * is a duplicate and should be skipped/dropped.
+ * @param dedupRecordInfo The primary key and the dedup time.
+ * @param indexSegment The segment to which the record belongs.
+ * @return true if the key was already present, i.e., the new record
associated with the given {@link DedupRecordInfo}
+ * is a duplicate and should be skipped/dropped.
+ */
+ default boolean checkRecordPresentOrUpdate(DedupRecordInfo dedupRecordInfo,
IndexSegment indexSegment) {
+ return checkRecordPresentOrUpdate(dedupRecordInfo.getPrimaryKey(),
indexSegment);
+ }
+
+ /**
+ * Stops the metadata manager. After invoking this method, no access to the
metadata will be accepted.
+ */
+ void stop();
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
index aa900f75db..5c0bd1d830 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
@@ -18,13 +18,14 @@
*/
package org.apache.pinot.segment.local.dedup;
+import java.io.Closeable;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
-public interface TableDedupMetadataManager {
+public interface TableDedupMetadataManager extends Closeable {
/**
* Initialize TableDedupMetadataManager.
*/
@@ -34,4 +35,9 @@ public interface TableDedupMetadataManager {
* Create a new PartitionDedupMetadataManager if not present already,
otherwise return existing one.
*/
PartitionDedupMetadataManager getOrCreatePartitionManager(int partitionId);
+
+ /**
+ * Stops the metadata manager. After invoking this method, no access to the
metadata will be accepted.
+ */
+ void stop();
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 847f589f2a..730d7a02ea 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -47,6 +47,7 @@ import
org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.segment.local.aggregator.ValueAggregator;
import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.dedup.DedupRecordInfo;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
@@ -158,6 +159,7 @@ public class MutableSegmentImpl implements MutableSegment {
private volatile long _latestIngestionTimeMs = Long.MIN_VALUE;
private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
+ private final String _dedupTimeColumn;
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final List<String> _upsertComparisonColumns;
@@ -354,6 +356,11 @@ public class MutableSegmentImpl implements MutableSegment {
}
_partitionDedupMetadataManager = config.getPartitionDedupMetadataManager();
+ if (_partitionDedupMetadataManager != null) {
+ _dedupTimeColumn = config.getDedupTimeColumn() == null ? _timeColumnName
: config.getDedupTimeColumn();
+ } else {
+ _dedupTimeColumn = null;
+ }
_partitionUpsertMetadataManager =
config.getPartitionUpsertMetadataManager();
if (_partitionUpsertMetadataManager != null) {
@@ -469,8 +476,8 @@ public class MutableSegmentImpl implements MutableSegment {
int numDocsIndexed = _numDocsIndexed;
if (isDedupEnabled()) {
- PrimaryKey primaryKey =
row.getPrimaryKey(_schema.getPrimaryKeyColumns());
- if
(_partitionDedupMetadataManager.checkRecordPresentOrUpdate(primaryKey, this)) {
+ DedupRecordInfo dedupRecordInfo = getDedupRecordInfo(row);
+ if
(_partitionDedupMetadataManager.checkRecordPresentOrUpdate(dedupRecordInfo,
this)) {
if (_serverMetrics != null) {
_serverMetrics.addMeteredTableValue(_realtimeTableName,
ServerMeter.REALTIME_DEDUP_DROPPED, 1);
}
@@ -536,6 +543,16 @@ public class MutableSegmentImpl implements MutableSegment {
return _partitionDedupMetadataManager != null;
}
+ private DedupRecordInfo getDedupRecordInfo(GenericRow row) {
+ PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
+ // it is okay not having dedup time column if metadata ttl is not enabled
+ if (_dedupTimeColumn == null) {
+ return new DedupRecordInfo(primaryKey);
+ }
+ double dedupTime = ((Number) row.getValue(_dedupTimeColumn)).doubleValue();
+ return new DedupRecordInfo(primaryKey, dedupTime);
+ }
+
private RecordInfo getRecordInfo(GenericRow row, int docId) {
PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
Comparable comparisonValue = getComparisonValue(row);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 956135f7ca..2d5e7d10d2 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -65,6 +65,7 @@ public class RealtimeSegmentConfig {
private final String _upsertOutOfOrderRecordColumn;
private final boolean _upsertDropOutOfOrderRecord;
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+ private final String _dedupTimeColumn;
private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
private final String _consumerDir;
private final List<FieldConfig> _fieldConfigList;
@@ -78,7 +79,7 @@ public class RealtimeSegmentConfig {
PartitionFunction partitionFunction, int partitionId, boolean
aggregateMetrics, boolean nullHandlingEnabled,
String consumerDir, UpsertConfig.Mode upsertMode, List<String>
upsertComparisonColumns,
String upsertDeleteRecordColumn, String upsertOutOfOrderRecordColumn,
boolean upsertDropOutOfOrderRecord,
- PartitionUpsertMetadataManager partitionUpsertMetadataManager,
+ PartitionUpsertMetadataManager partitionUpsertMetadataManager, String
dedupTimeColumn,
PartitionDedupMetadataManager partitionDedupMetadataManager,
List<FieldConfig> fieldConfigList,
List<AggregationConfig> ingestionAggregationConfigs) {
_tableNameWithType = tableNameWithType;
@@ -105,6 +106,7 @@ public class RealtimeSegmentConfig {
_upsertOutOfOrderRecordColumn = upsertOutOfOrderRecordColumn;
_upsertDropOutOfOrderRecord = upsertDropOutOfOrderRecord;
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
+ _dedupTimeColumn = dedupTimeColumn;
_partitionDedupMetadataManager = partitionDedupMetadataManager;
_fieldConfigList = fieldConfigList;
_ingestionAggregationConfigs = ingestionAggregationConfigs;
@@ -210,6 +212,10 @@ public class RealtimeSegmentConfig {
return _partitionUpsertMetadataManager;
}
+ public String getDedupTimeColumn() {
+ return _dedupTimeColumn;
+ }
+
public PartitionDedupMetadataManager getPartitionDedupMetadataManager() {
return _partitionDedupMetadataManager;
}
@@ -247,6 +253,7 @@ public class RealtimeSegmentConfig {
private String _upsertOutOfOrderRecordColumn;
private boolean _upsertDropOutOfOrderRecord;
private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+ private String _dedupTimeColumn;
private PartitionDedupMetadataManager _partitionDedupMetadataManager;
private List<FieldConfig> _fieldConfigList;
private List<AggregationConfig> _ingestionAggregationConfigs;
@@ -401,6 +408,11 @@ public class RealtimeSegmentConfig {
return this;
}
+ public Builder setDedupTimeColumn(String dedupTimeColumn) {
+ _dedupTimeColumn = dedupTimeColumn;
+ return this;
+ }
+
public Builder
setPartitionDedupMetadataManager(PartitionDedupMetadataManager
partitionDedupMetadataManager) {
_partitionDedupMetadataManager = partitionDedupMetadataManager;
return this;
@@ -427,7 +439,7 @@ public class RealtimeSegmentConfig {
_memoryManager, _statsHistory, _partitionColumn, _partitionFunction,
_partitionId, _aggregateMetrics,
_nullHandlingEnabled, _consumerDir, _upsertMode,
_upsertComparisonColumns, _upsertDeleteRecordColumn,
_upsertOutOfOrderRecordColumn, _upsertDropOutOfOrderRecord,
- _partitionUpsertMetadataManager, _partitionDedupMetadataManager,
_fieldConfigList,
+ _partitionUpsertMetadataManager, _dedupTimeColumn,
_partitionDedupMetadataManager, _fieldConfigList,
_ingestionAggregationConfigs);
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PrimaryKeyReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PrimaryKeyReader.java
new file mode 100644
index 0000000000..551d242b7c
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PrimaryKeyReader.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.readers;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+public class PrimaryKeyReader implements Closeable {
+ public final List<PinotSegmentColumnReader> _primaryKeyColumnReaders;
+
+ public PrimaryKeyReader(IndexSegment segment, List<String>
primaryKeyColumns) {
+ _primaryKeyColumnReaders = new ArrayList<>(primaryKeyColumns.size());
+ for (String primaryKeyColumn : primaryKeyColumns) {
+ _primaryKeyColumnReaders.add(new PinotSegmentColumnReader(segment,
primaryKeyColumn));
+ }
+ }
+
+ public PrimaryKey getPrimaryKey(int docId) {
+ int numPrimaryKeys = _primaryKeyColumnReaders.size();
+ Object[] values = new Object[numPrimaryKeys];
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ values[i] = getValue(_primaryKeyColumnReaders.get(i), docId);
+ }
+ return new PrimaryKey(values);
+ }
+
+ public void getPrimaryKey(int docId, PrimaryKey buffer) {
+ Object[] values = buffer.getValues();
+ int numPrimaryKeys = values.length;
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ values[i] = getValue(_primaryKeyColumnReaders.get(i), docId);
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ for (PinotSegmentColumnReader primaryKeyColumnReader :
_primaryKeyColumnReaders) {
+ primaryKeyColumnReader.close();
+ }
+ }
+
+ private static Object getValue(PinotSegmentColumnReader columnReader, int
docId) {
+ Object value = columnReader.getValue(docId);
+ return value instanceof byte[] ? new ByteArray((byte[]) value) : value;
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 9ae0c5b746..6951fc8197 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -65,6 +65,7 @@ import
org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -740,8 +741,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
protected void removeSegment(IndexSegment segment, MutableRoaringBitmap
validDocIds) {
- try (
- UpsertUtils.PrimaryKeyReader primaryKeyReader = new
UpsertUtils.PrimaryKeyReader(segment, _primaryKeyColumns)) {
+ try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment,
_primaryKeyColumns)) {
removeSegment(segment,
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
} catch (Exception e) {
throw new RuntimeException(
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
index 7e8f510737..7054b1c030 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -33,6 +33,7 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerMeter;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.readers.LazyRow;
+import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -187,7 +188,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
long startTimeMs = System.currentTimeMillis();
try (
- UpsertUtils.PrimaryKeyReader primaryKeyReader = new
UpsertUtils.PrimaryKeyReader(segment, _primaryKeyColumns)) {
+ PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment,
_primaryKeyColumns)) {
removeSegment(segment,
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader,
segment.getSegmentMetadata().getTotalDocs()));
} catch (Exception e) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index 47d6a72743..3086da1969 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -20,11 +20,11 @@ package org.apache.pinot.segment.local.upsert;
import java.io.Closeable;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
import org.apache.pinot.segment.spi.IndexSegment;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.data.readers.PrimaryKey;
@@ -161,48 +161,10 @@ public class UpsertUtils {
throws IOException {
_primaryKeyReader.close();
_comparisonColumnReader.close();
- }
- }
-
- public static class PrimaryKeyReader implements Closeable {
- public final List<PinotSegmentColumnReader> _primaryKeyColumnReaders;
-
- public PrimaryKeyReader(IndexSegment segment, List<String>
primaryKeyColumns) {
- _primaryKeyColumnReaders = new ArrayList<>(primaryKeyColumns.size());
- for (String primaryKeyColumn : primaryKeyColumns) {
- _primaryKeyColumnReaders.add(new PinotSegmentColumnReader(segment,
primaryKeyColumn));
- }
- }
-
- public PrimaryKey getPrimaryKey(int docId) {
- int numPrimaryKeys = _primaryKeyColumnReaders.size();
- Object[] values = new Object[numPrimaryKeys];
- for (int i = 0; i < numPrimaryKeys; i++) {
- values[i] = getValue(_primaryKeyColumnReaders.get(i), docId);
- }
- return new PrimaryKey(values);
- }
-
- public void getPrimaryKey(int docId, PrimaryKey buffer) {
- Object[] values = buffer.getValues();
- int numPrimaryKeys = values.length;
- for (int i = 0; i < numPrimaryKeys; i++) {
- values[i] = getValue(_primaryKeyColumnReaders.get(i), docId);
+ if (_deleteRecordColumnReader != null) {
+ _deleteRecordColumnReader.close();
}
}
-
- @Override
- public void close()
- throws IOException {
- for (PinotSegmentColumnReader primaryKeyColumnReader :
_primaryKeyColumnReaders) {
- primaryKeyColumnReader.close();
- }
- }
- }
-
- static Object getValue(PinotSegmentColumnReader columnReader, int docId) {
- Object value = columnReader.getValue(docId);
- return value instanceof byte[] ? new ByteArray((byte[]) value) : value;
}
public interface ComparisonColumnReader extends Closeable {
@@ -246,7 +208,7 @@ public class UpsertUtils {
PinotSegmentColumnReader columnReader = _comparisonColumnReaders[i];
Comparable comparisonValue = null;
if (!columnReader.isNull(docId)) {
- comparisonValue = (Comparable) UpsertUtils.getValue(columnReader,
docId);
+ comparisonValue = (Comparable) getValue(columnReader, docId);
}
comparisonColumns[i] = comparisonValue;
}
@@ -254,6 +216,11 @@ public class UpsertUtils {
return new ComparisonColumns(comparisonColumns,
ComparisonColumns.SEALED_SEGMENT_COMPARISON_INDEX);
}
+ private static Object getValue(PinotSegmentColumnReader columnReader, int
docId) {
+ Object value = columnReader.getValue(docId);
+ return value instanceof byte[] ? new ByteArray((byte[]) value) : value;
+ }
+
@Override
public void close()
throws IOException {
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
new file mode 100644
index 0000000000..6349caac74
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
+import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.*;
+
+
+public class ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
+ private static final int METADATA_TTL = 10000;
+ private ConcurrentMapPartitionDedupMetadataManager _metadataManager;
+
+ @BeforeMethod
+ public void setUp() {
+ DedupContext.Builder dedupContextBuider = new DedupContext.Builder();
+
dedupContextBuider.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
+
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setHashFunction(HashFunction.NONE)
+ .setMetadataTTL(METADATA_TTL).setDedupTimeColumn("dedupTimeColumn")
+
.setTableIndexDir(mock(File.class)).setTableDataManager(mock(TableDataManager.class))
+ .setServerMetrics(mock(ServerMetrics.class));
+ DedupContext dedupContext = dedupContextBuider.build();
+ _metadataManager =
+ new
ConcurrentMapPartitionDedupMetadataManager(DedupTestUtils.REALTIME_TABLE_NAME,
0, dedupContext);
+ }
+
+ @Test
+ public void creatingMetadataManagerThrowsExceptions() {
+ DedupContext.Builder dedupContextBuider = new DedupContext.Builder();
+
dedupContextBuider.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
+
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setHashFunction(HashFunction.NONE).setMetadataTTL(1)
+ .setDedupTimeColumn(null).setTableIndexDir(mock(File.class))
+
.setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class));
+ DedupContext dedupContext = dedupContextBuider.build();
+ assertThrows(IllegalArgumentException.class,
+ () -> new
ConcurrentMapPartitionDedupMetadataManager(DedupTestUtils.REALTIME_TABLE_NAME,
0,
+ dedupContext));
+ }
+
+ @Test
+ public void verifyRemoveExpiredPrimaryKeys() {
+ IndexSegment segment = Mockito.mock(IndexSegment.class);
+ for (int i = 0; i < 20; i++) {
+ double time = i * 1000;
+ Object primaryKeyKey =
HashUtils.hashPrimaryKey(DedupTestUtils.getPrimaryKey(i), HashFunction.NONE);
+
_metadataManager._primaryKeyToSegmentAndTimeMap.computeIfAbsent(primaryKeyKey,
k -> Pair.of(segment, time));
+ }
+ _metadataManager._largestSeenTime.set(19000);
+ assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 20);
+ verifyInMemoryState(0, 20, segment);
+
+ _metadataManager.removeExpiredPrimaryKeys();
+ assertEquals(_metadataManager.getNumPrimaryKeys(), 11);
+ assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 11);
+ verifyInMemoryState(9, 11, segment);
+ }
+
+ @Test
+ public void verifyAddRemoveTheSameSegment() {
+ DedupUtils.DedupRecordInfoReader dedupRecordInfoReader =
generateDedupRecordInfoReader(10, 0);
+ Iterator<DedupRecordInfo> dedupRecordInfoIterator =
+ DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 10);
+ IndexSegment segment = DedupTestUtils.mockSegment(1, 10);
+ _metadataManager.doAddOrReplaceSegment(null, segment,
dedupRecordInfoIterator);
+ verifyInitialSegmentAddition(segment);
+
+ dedupRecordInfoIterator =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 10);
+ _metadataManager.doRemoveSegment(segment, dedupRecordInfoIterator);
+ assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 0);
+ assertEquals(_metadataManager._largestSeenTime.get(), 9000);
+ }
+
+ @Test
+ public void verifyAddingTwoSegmentWithSamePrimaryKeys() {
+ DedupUtils.DedupRecordInfoReader dedupRecordInfoReader =
generateDedupRecordInfoReader(10, 0);
+ IndexSegment segment = DedupTestUtils.mockSegment(1, 10);
+ Iterator<DedupRecordInfo> dedupRecordInfoIterator =
+ DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 10);
+ _metadataManager.doAddOrReplaceSegment(null, segment,
dedupRecordInfoIterator);
+ verifyInitialSegmentAddition(segment);
+
+ IndexSegment segment2 = DedupTestUtils.mockSegment(2, 10);
+ dedupRecordInfoIterator =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 10);
+ _metadataManager.doAddOrReplaceSegment(segment, segment2,
dedupRecordInfoIterator);
+ verifyInitialSegmentAddition(segment2);
+ }
+
+ @Test
+ public void verifyRemoveAnotherSegmentWithTheSamePrimaryKeys() {
+ DedupUtils.DedupRecordInfoReader dedupRecordInfoReader =
generateDedupRecordInfoReader(10, 0);
+ IndexSegment segment = DedupTestUtils.mockSegment(1, 10);
+ Iterator<DedupRecordInfo> dedupRecordInfoIterator =
+ DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 10);
+ _metadataManager.doAddOrReplaceSegment(null, segment,
dedupRecordInfoIterator);
+ verifyInitialSegmentAddition(segment);
+
+ IndexSegment segment2 = DedupTestUtils.mockSegment(2, 10);
+ dedupRecordInfoIterator =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 10);
+ _metadataManager.doRemoveSegment(segment2, dedupRecordInfoIterator);
+ verifyInitialSegmentAddition(segment);
+ }
+
+ private void verifyInitialSegmentAddition(IndexSegment segment) {
+ assertEquals(_metadataManager._largestSeenTime.get(), 9000);
+ assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 10);
+ verifyInMemoryState(0, 10, segment);
+ }
+
+ private void verifyInMemoryState(int startPrimaryKeyId, int recordCount,
IndexSegment segment) {
+ for (int primaryKeyId = startPrimaryKeyId; primaryKeyId <
startPrimaryKeyId + recordCount; primaryKeyId++) {
+ PrimaryKey primaryKey = DedupTestUtils.getPrimaryKey(primaryKeyId);
+
assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.get(primaryKey),
+ Pair.of(segment, (double) primaryKeyId * 1000));
+ }
+ }
+
+ @Test
+ public void verifyAddTwoDifferentSegmentsRemoveEarlySegmentFirst() {
+ DedupUtils.DedupRecordInfoReader dedupRecordInfoReader1 =
generateDedupRecordInfoReader(10, 0);
+ IndexSegment segment1 = DedupTestUtils.mockSegment(1, 10);
+ Iterator<DedupRecordInfo> dedupRecordInfoIterator1 =
+ DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader1, 10);
+ _metadataManager.doAddOrReplaceSegment(null, segment1,
dedupRecordInfoIterator1);
+ verifyInitialSegmentAddition(segment1);
+
+ DedupUtils.DedupRecordInfoReader dedupRecordInfoReader2 =
generateDedupRecordInfoReader(10, 10);
+ IndexSegment segment2 = DedupTestUtils.mockSegment(2, 10);
+ Iterator<DedupRecordInfo> dedupRecordInfoIterator2 =
+ DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
+ _metadataManager.doAddOrReplaceSegment(null, segment2,
dedupRecordInfoIterator2);
+ _metadataManager.removeExpiredPrimaryKeys();
+ assertEquals(_metadataManager.getNumPrimaryKeys(), 11);
+ assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 11);
+ verifyInMemoryState(9, 1, segment1);
+ verifyInMemoryState(10, 10, segment2);
+ assertEquals(_metadataManager._largestSeenTime.get(), 19000);
+
+ dedupRecordInfoIterator1 =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader1, 10);
+ _metadataManager.doRemoveSegment(segment1, dedupRecordInfoIterator1);
+ assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 10);
+ verifyInMemoryState(10, 10, segment2);
+ assertEquals(_metadataManager._largestSeenTime.get(), 19000);
+
+ dedupRecordInfoIterator2 =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
+ _metadataManager.doRemoveSegment(segment2, dedupRecordInfoIterator2);
+ assertTrue(_metadataManager._primaryKeyToSegmentAndTimeMap.isEmpty());
+ assertEquals(_metadataManager._largestSeenTime.get(), 19000);
+ }
+
+ @Test
+ public void verifyAddTwoDifferentSegmentsRemoveRecentSegmentFirst() {
+ DedupUtils.DedupRecordInfoReader dedupRecordInfoReader1 =
generateDedupRecordInfoReader(10, 0);
+ IndexSegment segment1 = DedupTestUtils.mockSegment(1, 10);
+ Iterator<DedupRecordInfo> dedupRecordInfoIterator1 =
+ DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader1, 10);
+ _metadataManager.doAddOrReplaceSegment(null, segment1,
dedupRecordInfoIterator1);
+ verifyInitialSegmentAddition(segment1);
+
+ DedupUtils.DedupRecordInfoReader dedupRecordInfoReader2 =
generateDedupRecordInfoReader(10, 10);
+ IndexSegment segment2 = DedupTestUtils.mockSegment(2, 10);
+ Iterator<DedupRecordInfo> dedupRecordInfoIterator2 =
+ DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
+ _metadataManager.doAddOrReplaceSegment(null, segment2,
dedupRecordInfoIterator2);
+ _metadataManager.removeExpiredPrimaryKeys();
+ assertEquals(_metadataManager.getNumPrimaryKeys(), 11);
+ assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 11);
+ verifyInMemoryState(10, 10, segment2);
+ assertEquals(_metadataManager._largestSeenTime.get(), 19000);
+
+ dedupRecordInfoIterator2 =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
+ _metadataManager.doRemoveSegment(segment2, dedupRecordInfoIterator2);
+ assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1);
+ verifyInMemoryState(9, 1, segment1);
+ assertEquals(_metadataManager._largestSeenTime.get(), 19000);
+
+ dedupRecordInfoIterator1 =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader1, 10);
+ _metadataManager.doRemoveSegment(segment1, dedupRecordInfoIterator1);
+ assertTrue(_metadataManager._primaryKeyToSegmentAndTimeMap.isEmpty());
+ assertEquals(_metadataManager._largestSeenTime.get(), 19000);
+ }
+
+ @Test
+ public void verifyAddingSegmentWithDuplicatedPrimaryKeys() {
+ PrimaryKey primaryKey = DedupTestUtils.getPrimaryKey(0);
+ PrimaryKeyReader primaryKeyReader = Mockito.mock(PrimaryKeyReader.class);
+ for (int i = 0; i < 3; i++) {
+ Mockito.when(primaryKeyReader.getPrimaryKey(i)).thenReturn(primaryKey);
+ }
+ PinotSegmentColumnReader dedupTimeColumnReader =
Mockito.mock(PinotSegmentColumnReader.class);
+ Mockito.when(dedupTimeColumnReader.getValue(0)).thenReturn(1000.0);
+ Mockito.when(dedupTimeColumnReader.getValue(1)).thenReturn(15000.0);
+ Mockito.when(dedupTimeColumnReader.getValue(2)).thenReturn(25000.0);
+ DedupUtils.DedupRecordInfoReader dedupRecordInfoReader =
+ new DedupUtils.DedupRecordInfoReader(primaryKeyReader,
dedupTimeColumnReader);
+ _metadataManager._largestSeenTime.set(20000);
+
+ ImmutableSegmentImpl immutableSegment = DedupTestUtils.mockSegment(1, 3);
+ Iterator<DedupRecordInfo> dedupRecordInfoIterator =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 3);
+ _metadataManager.doAddOrReplaceSegment(null, immutableSegment,
dedupRecordInfoIterator);
+ assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1);
+ Object primaryKeyHash = HashUtils.hashPrimaryKey(primaryKey,
HashFunction.NONE);
+ assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1);
+
assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.get(primaryKeyHash),
+ Pair.of(immutableSegment, 25000.0));
+ assertEquals(_metadataManager._largestSeenTime.get(), 25000);
+ }
+
+ @Test
+ public void verifyAddRow() {
+ _metadataManager._largestSeenTime.set(20000);
+
+ PrimaryKey primaryKey = DedupTestUtils.getPrimaryKey(0);
+ DedupRecordInfo dedupRecordInfo = new DedupRecordInfo(primaryKey, 1000);
+ ImmutableSegmentImpl immutableSegment = DedupTestUtils.mockSegment(1, 1);
+ assertFalse(_metadataManager.checkRecordPresentOrUpdate(dedupRecordInfo,
immutableSegment));
+ assertFalse(_metadataManager._primaryKeyToSegmentAndTimeMap.isEmpty());
+ assertEquals(_metadataManager._largestSeenTime.get(), 20000);
+
+ Object primaryKeyHash = HashUtils.hashPrimaryKey(primaryKey,
HashFunction.NONE);
+ dedupRecordInfo = new DedupRecordInfo(primaryKey, 15000);
+ assertTrue(_metadataManager.checkRecordPresentOrUpdate(dedupRecordInfo,
immutableSegment));
+ assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1);
+
assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.get(primaryKeyHash),
+ Pair.of(immutableSegment, 1000.0));
+ assertEquals(_metadataManager._largestSeenTime.get(), 20000);
+
+ dedupRecordInfo = new DedupRecordInfo(primaryKey, 25000);
+ assertTrue(_metadataManager.checkRecordPresentOrUpdate(dedupRecordInfo,
immutableSegment));
+ assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1);
+
assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.get(primaryKeyHash),
+ Pair.of(immutableSegment, 1000.0));
+ assertEquals(_metadataManager._largestSeenTime.get(), 25000);
+ }
+
+ private static DedupUtils.DedupRecordInfoReader
generateDedupRecordInfoReader(int numberOfDocs,
+ int startPrimaryKeyValue) {
+ PrimaryKeyReader primaryKeyReader = Mockito.mock(PrimaryKeyReader.class);
+ PinotSegmentColumnReader dedupTimeColumnReader =
Mockito.mock(PinotSegmentColumnReader.class);
+ for (int i = 0; i < numberOfDocs; i++) {
+ int primaryKeyValue = startPrimaryKeyValue + i;
+
Mockito.when(primaryKeyReader.getPrimaryKey(i)).thenReturn(DedupTestUtils.getPrimaryKey(primaryKeyValue));
+ double time = primaryKeyValue * 1000;
+ Mockito.when(dedupTimeColumnReader.getValue(i)).thenReturn(time);
+ }
+ return new DedupUtils.DedupRecordInfoReader(primaryKeyReader,
dedupTimeColumnReader);
+ }
+
+ @Test
+ public void testAddSegmentAfterStop() {
+ IndexSegment segment = DedupTestUtils.mockSegment(1, 10);
+ // throws when not stopped
+ assertThrows(RuntimeException.class, () ->
_metadataManager.addSegment(segment));
+ _metadataManager.stop();
+ // do not throw when stopped
+ _metadataManager.addSegment(segment);
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
new file mode 100644
index 0000000000..b27ec4ffb0
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
+import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertSame;
+
+
+public class ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest {
+ private ConcurrentMapPartitionDedupMetadataManager _metadataManager;
+
+ @BeforeMethod
+ public void setUp() {
+ DedupContext.Builder dedupContextBuider = new DedupContext.Builder();
+
dedupContextBuider.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
+
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setHashFunction(HashFunction.NONE)
+
.setTableIndexDir(mock(File.class)).setTableDataManager(mock(TableDataManager.class))
+ .setServerMetrics(mock(ServerMetrics.class));
+ DedupContext dedupContext = dedupContextBuider.build();
+ _metadataManager =
+ new
ConcurrentMapPartitionDedupMetadataManager(DedupTestUtils.REALTIME_TABLE_NAME,
0, dedupContext);
+ }
+
+ @Test
+ public void verifyAddRemoveSegment() {
+ HashFunction hashFunction = HashFunction.NONE;
+
+ // Add the first segment
+ DedupUtils.DedupRecordInfoReader dedupRecordInfoReader =
generateDedupRecordInfoReader();
+ Iterator<DedupRecordInfo> dedupRecordInfoIterator =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 6);
+ ImmutableSegmentImpl segment1 = DedupTestUtils.mockSegment(1, 6);
+ _metadataManager.doAddOrReplaceSegment(null, segment1,
dedupRecordInfoIterator);
+
Assert.assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 3);
+ checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 0,
segment1, hashFunction);
+ checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 1,
segment1, hashFunction);
+ checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 2,
segment1, hashFunction);
+
+ // reset the iterator
+ dedupRecordInfoIterator =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 6);
+ _metadataManager.doRemoveSegment(segment1, dedupRecordInfoIterator);
+
Assert.assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 0);
+ }
+
+ @Test
+ public void verifyReloadSegment() {
+ HashFunction hashFunction = HashFunction.NONE;
+
+ // Add the first segment
+ DedupUtils.DedupRecordInfoReader dedupRecordInfoReader =
generateDedupRecordInfoReader();
+ Iterator<DedupRecordInfo> dedupRecordInfoIterator =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 6);
+ ImmutableSegmentImpl segment1 = DedupTestUtils.mockSegment(1, 6);
+ _metadataManager.doAddOrReplaceSegment(null, segment1,
dedupRecordInfoIterator);
+
+ // Remove another segment with same PK rows
+ // reset the iterator
+ dedupRecordInfoIterator =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 6);
+ ImmutableSegmentImpl segment2 = DedupTestUtils.mockSegment(1, 6);
+ _metadataManager.doRemoveSegment(segment2, dedupRecordInfoIterator);
+
Assert.assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 3);
+
+ // Keys should still exist
+ checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 0,
segment1, hashFunction);
+ checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 1,
segment1, hashFunction);
+ checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 2,
segment1, hashFunction);
+ }
+
+ @Test
+ public void verifyAddRow() {
+ HashFunction hashFunction = HashFunction.NONE;
+
+ // Add the first segment
+ DedupUtils.DedupRecordInfoReader dedupRecordInfoReader =
generateDedupRecordInfoReader();
+ Iterator<DedupRecordInfo> dedupRecordInfoIterator =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 6);
+ ImmutableSegmentImpl segment1 = DedupTestUtils.mockSegment(1, 6);
+ _metadataManager.doAddOrReplaceSegment(null, segment1,
dedupRecordInfoIterator);
+
+ // Same PK exists
+ // reset the iterator
+ dedupRecordInfoIterator =
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 6);
+ ImmutableSegmentImpl segment2 = DedupTestUtils.mockSegment(2, 6);
+ while (dedupRecordInfoIterator.hasNext()) {
+ DedupRecordInfo dedupRecordInfo = dedupRecordInfoIterator.next();
+
Assert.assertTrue(_metadataManager.checkRecordPresentOrUpdate(dedupRecordInfo,
segment2));
+ }
+ checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 0,
segment1, hashFunction);
+ checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 1,
segment1, hashFunction);
+ checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 2,
segment1, hashFunction);
+
+ // New PK
+ Assert.assertFalse(_metadataManager.checkRecordPresentOrUpdate(
+ new DedupRecordInfo(DedupTestUtils.getPrimaryKey(3), 3000),
segment2));
+ checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 3,
segment2, hashFunction);
+
+ // Same PK as the one recently ingested
+ Assert.assertTrue(_metadataManager.checkRecordPresentOrUpdate(
+ new DedupRecordInfo(DedupTestUtils.getPrimaryKey(3), 4000), segment2));
+ }
+
+ private static DedupUtils.DedupRecordInfoReader
generateDedupRecordInfoReader() {
+ PrimaryKeyReader primaryKeyReader = Mockito.mock(PrimaryKeyReader.class);
+ PinotSegmentColumnReader dedupTimeColumnReader =
Mockito.mock(PinotSegmentColumnReader.class);
+
Mockito.when(primaryKeyReader.getPrimaryKey(0)).thenReturn(DedupTestUtils.getPrimaryKey(0));
+
Mockito.when(primaryKeyReader.getPrimaryKey(1)).thenReturn(DedupTestUtils.getPrimaryKey(1));
+
Mockito.when(primaryKeyReader.getPrimaryKey(2)).thenReturn(DedupTestUtils.getPrimaryKey(2));
+
Mockito.when(primaryKeyReader.getPrimaryKey(3)).thenReturn(DedupTestUtils.getPrimaryKey(0));
+
Mockito.when(primaryKeyReader.getPrimaryKey(4)).thenReturn(DedupTestUtils.getPrimaryKey(1));
+
Mockito.when(primaryKeyReader.getPrimaryKey(5)).thenReturn(DedupTestUtils.getPrimaryKey(0));
+ for (int i = 0; i < 6; i++) {
+ Mockito.when(dedupTimeColumnReader.getValue(i)).thenReturn(i * 1000);
+ }
+ return new DedupUtils.DedupRecordInfoReader(primaryKeyReader,
dedupTimeColumnReader);
+ }
+
+ private static void checkRecordLocation(Map<Object, Pair<IndexSegment,
Double>> recordLocationMap, int keyValue,
+ IndexSegment segment, HashFunction hashFunction) {
+ IndexSegment indexSegment =
+
recordLocationMap.get(HashUtils.hashPrimaryKey(DedupTestUtils.getPrimaryKey(keyValue),
hashFunction)).getLeft();
+ assertNotNull(indexSegment);
+ assertSame(indexSegment, segment);
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/DedupTestUtils.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/DedupTestUtils.java
new file mode 100644
index 0000000000..13d19fddc6
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/DedupTestUtils.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import org.apache.pinot.common.utils.LLCSegmentName;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class DedupTestUtils {
+ public static final String RAW_TABLE_NAME = "testTable";
+ public static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+
+ private DedupTestUtils() {
+ }
+
+ public static ImmutableSegmentImpl mockSegment(int sequenceNumber, int
totalDocs) {
+ // Mock the segment name
+ ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
+ String segmentName = getSegmentName(sequenceNumber);
+ when(segment.getSegmentName()).thenReturn(segmentName);
+ // Mock the segment total doc
+ SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+ when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+ when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs);
+ return segment;
+ }
+
+ public static String getSegmentName(int sequenceNumber) {
+ return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber,
System.currentTimeMillis()).toString();
+ }
+
+ public static PrimaryKey getPrimaryKey(int value) {
+ return new PrimaryKey(new Object[]{value});
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
deleted file mode 100644
index 0fed3d59dd..0000000000
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.dedup;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
-import org.apache.pinot.segment.local.utils.HashUtils;
-import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.spi.config.table.HashFunction;
-import org.apache.pinot.spi.data.readers.PrimaryKey;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertSame;
-
-
-public class PartitionDedupMetadataManagerTest {
- private static final String RAW_TABLE_NAME = "testTable";
- private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
-
- @Test
- public void verifyAddRemoveSegment() {
- HashFunction hashFunction = HashFunction.NONE;
- TestMetadataManager metadataManager =
- new TestMetadataManager(REALTIME_TABLE_NAME, null, 0,
mock(ServerMetrics.class), hashFunction);
- Map<Object, IndexSegment> recordLocationMap =
metadataManager._primaryKeyToSegmentMap;
-
- // Add the first segment
- List<PrimaryKey> pkList1 = new ArrayList<>();
- pkList1.add(getPrimaryKey(0));
- pkList1.add(getPrimaryKey(1));
- pkList1.add(getPrimaryKey(2));
- pkList1.add(getPrimaryKey(0));
- pkList1.add(getPrimaryKey(1));
- pkList1.add(getPrimaryKey(0));
- metadataManager._primaryKeyIterator = pkList1.iterator();
- ImmutableSegmentImpl segment1 = mockSegment(1);
- metadataManager.addSegment(segment1);
- checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment1, hashFunction);
-
- metadataManager._primaryKeyIterator = pkList1.iterator();
- metadataManager.removeSegment(segment1);
- Assert.assertEquals(recordLocationMap.size(), 0);
- }
-
- @Test
- public void verifyReloadSegment() {
- HashFunction hashFunction = HashFunction.NONE;
- TestMetadataManager metadataManager =
- new TestMetadataManager(REALTIME_TABLE_NAME, null, 0,
mock(ServerMetrics.class), hashFunction);
- Map<Object, IndexSegment> recordLocationMap =
metadataManager._primaryKeyToSegmentMap;
-
- // Add the first segment
- List<PrimaryKey> pkList1 = new ArrayList<>();
- pkList1.add(getPrimaryKey(0));
- pkList1.add(getPrimaryKey(1));
- pkList1.add(getPrimaryKey(2));
- pkList1.add(getPrimaryKey(0));
- pkList1.add(getPrimaryKey(1));
- pkList1.add(getPrimaryKey(0));
- metadataManager._primaryKeyIterator = pkList1.iterator();
- ImmutableSegmentImpl segment1 = mockSegment(1);
- metadataManager.addSegment(segment1);
-
- // Remove another segment with same PK rows
- metadataManager._primaryKeyIterator = pkList1.iterator();
- ImmutableSegmentImpl segment2 = mockSegment(1);
- metadataManager.removeSegment(segment2);
- Assert.assertEquals(recordLocationMap.size(), 3);
-
- // Keys should still exist
- checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment1, hashFunction);
- }
-
- @Test
- public void verifyAddRow() {
- HashFunction hashFunction = HashFunction.NONE;
- TestMetadataManager metadataManager =
- new TestMetadataManager(REALTIME_TABLE_NAME, null, 0,
mock(ServerMetrics.class), hashFunction);
- Map<Object, IndexSegment> recordLocationMap =
metadataManager._primaryKeyToSegmentMap;
-
- // Add the first segment
- List<PrimaryKey> pkList1 = new ArrayList<>();
- pkList1.add(getPrimaryKey(0));
- pkList1.add(getPrimaryKey(1));
- pkList1.add(getPrimaryKey(2));
- pkList1.add(getPrimaryKey(0));
- pkList1.add(getPrimaryKey(1));
- pkList1.add(getPrimaryKey(0));
- metadataManager._primaryKeyIterator = pkList1.iterator();
- ImmutableSegmentImpl segment1 = mockSegment(1);
- metadataManager.addSegment(segment1);
-
- // Same PK exists
- ImmutableSegmentImpl segment2 = mockSegment(2);
-
Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(getPrimaryKey(0),
segment2));
- checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
-
- // New PK
-
Assert.assertFalse(metadataManager.checkRecordPresentOrUpdate(getPrimaryKey(3),
segment2));
- checkRecordLocation(recordLocationMap, 3, segment2, hashFunction);
-
- // Same PK as the one recently ingested
-
Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(getPrimaryKey(3),
segment2));
- }
-
- private static ImmutableSegmentImpl mockSegment(int sequenceNumber) {
- ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
- String segmentName = getSegmentName(sequenceNumber);
- when(segment.getSegmentName()).thenReturn(segmentName);
- return segment;
- }
-
- private static String getSegmentName(int sequenceNumber) {
- return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber,
System.currentTimeMillis()).toString();
- }
-
- private static PrimaryKey getPrimaryKey(int value) {
- return new PrimaryKey(new Object[]{value});
- }
-
- private static void checkRecordLocation(Map<Object, IndexSegment>
recordLocationMap, int keyValue,
- IndexSegment segment, HashFunction hashFunction) {
- IndexSegment indexSegment =
recordLocationMap.get(HashUtils.hashPrimaryKey(getPrimaryKey(keyValue),
hashFunction));
- assertNotNull(indexSegment);
- assertSame(indexSegment, segment);
- }
-
- private static class TestMetadataManager extends
ConcurrentMapPartitionDedupMetadataManager {
- Iterator<PrimaryKey> _primaryKeyIterator;
-
- TestMetadataManager(String tableNameWithType, List<String>
primaryKeyColumns, int partitionId,
- ServerMetrics serverMetrics, HashFunction hashFunction) {
- super(tableNameWithType, primaryKeyColumns, partitionId, serverMetrics,
hashFunction);
- }
-
- @Override
- Iterator<PrimaryKey> getPrimaryKeyIterator(IndexSegment segment) {
- return _primaryKeyIterator;
- }
- }
-}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
index ebc092eb2b..36c5627902 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
@@ -19,9 +19,13 @@
package org.apache.pinot.segment.local.indexsegment.mutable;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
+import java.io.IOException;
import java.net.URL;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
@@ -30,6 +34,7 @@ import
org.apache.pinot.segment.local.dedup.TableDedupMetadataManagerFactory;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
@@ -48,7 +53,7 @@ public class MutableSegmentDedupeTest {
private static final String DATA_FILE_PATH = "data/test_dedup_data.json";
private MutableSegmentImpl _mutableSegmentImpl;
- private void setup(boolean dedupEnabled)
+ private void setup(boolean dedupEnabled, double metadataTTL, String
dedupTimeColumn)
throws Exception {
URL schemaResourceUrl =
this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
URL dataResourceUrl =
this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
@@ -57,11 +62,13 @@ public class MutableSegmentDedupeTest {
.setDedupConfig(new DedupConfig(dedupEnabled,
HashFunction.NONE)).build();
CompositeTransformer recordTransformer =
CompositeTransformer.getDefaultTransformer(tableConfig, schema);
File jsonFile = new File(dataResourceUrl.getFile());
+ DedupConfig dedupConfig = new DedupConfig(true, HashFunction.NONE, null,
null, metadataTTL, dedupTimeColumn);
PartitionDedupMetadataManager partitionDedupMetadataManager =
- (dedupEnabled) ?
getTableDedupMetadataManager(schema).getOrCreatePartitionManager(0) : null;
+ (dedupEnabled) ? getTableDedupMetadataManager(schema,
dedupConfig).getOrCreatePartitionManager(0) : null;
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(schema,
Collections.emptySet(), Collections.emptySet(),
- Collections.emptySet(), false, true, null, "secondsSinceEpoch",
null, partitionDedupMetadataManager);
+ Collections.emptySet(), false, true, null, "secondsSinceEpoch",
null, dedupConfig,
+ partitionDedupMetadataManager);
GenericRow reuse = new GenericRow();
try (RecordReader recordReader =
RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile,
schema.getColumnNames(), null)) {
@@ -69,30 +76,100 @@ public class MutableSegmentDedupeTest {
recordReader.next(reuse);
GenericRow transformedRow = recordTransformer.transform(reuse);
_mutableSegmentImpl.index(transformedRow, null);
+ if (dedupEnabled) {
+ partitionDedupMetadataManager.removeExpiredPrimaryKeys();
+ }
reuse.clear();
}
}
}
- private static TableDedupMetadataManager getTableDedupMetadataManager(Schema
schema) {
+ private static TableDedupMetadataManager getTableDedupMetadataManager(Schema
schema, DedupConfig dedupConfig) {
TableConfig tableConfig = Mockito.mock(TableConfig.class);
Mockito.when(tableConfig.getTableName()).thenReturn("testTable_REALTIME");
- Mockito.when(tableConfig.getDedupConfig()).thenReturn(new
DedupConfig(true, HashFunction.NONE));
- return TableDedupMetadataManagerFactory.create(tableConfig, schema,
Mockito.mock(TableDataManager.class),
+ Mockito.when(tableConfig.getDedupConfig()).thenReturn(dedupConfig);
+ SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig
+ = Mockito.mock(SegmentsValidationAndRetentionConfig.class);
+
Mockito.when(tableConfig.getValidationConfig()).thenReturn(segmentsValidationAndRetentionConfig);
+
Mockito.when(segmentsValidationAndRetentionConfig.getTimeColumnName()).thenReturn("secondsSinceEpoch");
+ TableDataManager tableDataManager = Mockito.mock(TableDataManager.class);
+
Mockito.when(tableDataManager.getTableDataDir()).thenReturn(Mockito.mock(File.class));
+ return TableDedupMetadataManagerFactory.create(tableConfig, schema,
tableDataManager,
Mockito.mock(ServerMetrics.class));
}
+ public List<Map<String, String>> loadJsonFile(String filePath) throws
IOException {
+ URL resourceUrl = this.getClass().getClassLoader().getResource(filePath);
+ if (resourceUrl == null) {
+ throw new IllegalArgumentException("File not found: " + filePath);
+ }
+ File jsonFile = new File(resourceUrl.getFile());
+ ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper.readValue(jsonFile, List.class);
+ }
+
@Test
public void testDedupeEnabled()
throws Exception {
- setup(true);
+ setup(true, 0, null);
Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 2);
+ List<Map<String, String>> rawData = loadJsonFile(DATA_FILE_PATH);
+ for (int i = 0; i < 2; i++) {
+ verifyGeneratedSegmentDataAgainstRawData(i, i, rawData);
+ }
}
@Test
public void testDedupeDisabled()
throws Exception {
- setup(false);
+ setup(false, 0, null);
Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 4);
+ List<Map<String, String>> rawData = loadJsonFile(DATA_FILE_PATH);
+ for (int i = 0; i < 4; i++) {
+ verifyGeneratedSegmentDataAgainstRawData(i, i, rawData);
+ }
+ }
+
+ @Test
+ public void testDedupWithMetadataTTLWithoutDedupTimeColumn()
+ throws Exception {
+ setup(true, 1000, null);
+ checkGeneratedSegmentDataWhenTableTimeColumnIsUsedAsDedupTimeColumn();
+ }
+
+ @Test
+ public void testDedupWithMetadataTTLWithTableTimeColumn()
+ throws Exception {
+ setup(true, 1000, "secondsSinceEpoch");
+ checkGeneratedSegmentDataWhenTableTimeColumnIsUsedAsDedupTimeColumn();
+ }
+
+ private void
checkGeneratedSegmentDataWhenTableTimeColumnIsUsedAsDedupTimeColumn()
+ throws IOException {
+ Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 3);
+ List<Map<String, String>> rawData = loadJsonFile(DATA_FILE_PATH);
+ for (int i = 0; i < 2; i++) {
+ verifyGeneratedSegmentDataAgainstRawData(i, i, rawData);
+ }
+ verifyGeneratedSegmentDataAgainstRawData(2, 3, rawData);
+ }
+
+ @Test
+ public void testDedupWithMetadataTTLWithDedupTimeColumn()
+ throws Exception {
+ setup(true, 1000, "dedupTime");
+ Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 2);
+ List<Map<String, String>> rawData = loadJsonFile(DATA_FILE_PATH);
+ for (int i = 0; i < 2; i++) {
+ verifyGeneratedSegmentDataAgainstRawData(i, i, rawData);
+ }
+ }
+
+ private void verifyGeneratedSegmentDataAgainstRawData(
+ int docId, int rawDataIndex, List<Map<String, String>> rawData) {
+ for (String columnName : rawData.get(0).keySet()) {
+ Assert.assertEquals(String.valueOf(_mutableSegmentImpl.getValue(docId,
columnName)),
+ String.valueOf(rawData.get(rawDataIndex).get(columnName)));
+ }
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
index 6a8ca27480..b23f203ec7 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -34,6 +34,7 @@ import
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.JsonIndexConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -58,7 +59,7 @@ public class MutableSegmentImplTestUtils {
Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns,
List<AggregationConfig> preAggregationConfigs) {
return createMutableSegmentImpl(schema, noDictionaryColumns,
varLengthDictionaryColumns, invertedIndexColumns,
- Collections.emptyMap(), false, false, null, null, null, null, null,
preAggregationConfigs);
+ Collections.emptyMap(), false, false, null, null, null, null, null,
null, preAggregationConfigs);
}
public static MutableSegmentImpl createMutableSegmentImpl(Schema schema,
Set<String> noDictionaryColumns,
@@ -71,31 +72,31 @@ public class MutableSegmentImplTestUtils {
Set<String> varLengthDictionaryColumns, Set<String>
invertedIndexColumns, boolean aggregateMetrics,
boolean nullHandlingEnabled) {
return createMutableSegmentImpl(schema, noDictionaryColumns,
varLengthDictionaryColumns, invertedIndexColumns,
- aggregateMetrics, nullHandlingEnabled, null, null, null, null);
+ aggregateMetrics, nullHandlingEnabled, null, null, null, null, null);
}
public static MutableSegmentImpl createMutableSegmentImpl(Schema schema,
Set<String> noDictionaryColumns,
Set<String> varLengthDictionaryColumns, Set<String>
invertedIndexColumns, boolean aggregateMetrics,
boolean nullHandlingEnabled, UpsertConfig upsertConfig, String
timeColumnName,
- PartitionUpsertMetadataManager partitionUpsertMetadataManager,
+ PartitionUpsertMetadataManager partitionUpsertMetadataManager,
DedupConfig dedupConfig,
PartitionDedupMetadataManager partitionDedupMetadataManager) {
return createMutableSegmentImpl(schema, noDictionaryColumns,
varLengthDictionaryColumns, invertedIndexColumns,
Collections.emptyMap(), aggregateMetrics, nullHandlingEnabled,
upsertConfig, timeColumnName,
- partitionUpsertMetadataManager, partitionDedupMetadataManager, null,
Collections.emptyList());
+ partitionUpsertMetadataManager, dedupConfig,
partitionDedupMetadataManager, null, Collections.emptyList());
}
public static MutableSegmentImpl createMutableSegmentImpl(Schema schema,
Set<String> noDictionaryColumns,
Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns,
Map<String, JsonIndexConfig> jsonIndexConfigs, ServerMetrics
serverMetrics) {
return createMutableSegmentImpl(schema, noDictionaryColumns,
varLengthDictionaryColumns, invertedIndexColumns,
- jsonIndexConfigs, false, true, null, null, null, null, serverMetrics,
Collections.emptyList());
+ jsonIndexConfigs, false, true, null, null, null, null, null,
serverMetrics, Collections.emptyList());
}
public static MutableSegmentImpl createMutableSegmentImpl(Schema schema,
Set<String> noDictionaryColumns,
Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns,
Map<String, JsonIndexConfig> jsonIndexConfigs, boolean aggregateMetrics,
boolean nullHandlingEnabled,
UpsertConfig upsertConfig, String timeColumnName,
PartitionUpsertMetadataManager partitionUpsertMetadataManager,
- PartitionDedupMetadataManager partitionDedupMetadataManager,
ServerMetrics serverMetrics,
+ DedupConfig dedupConfig, PartitionDedupMetadataManager
partitionDedupMetadataManager, ServerMetrics serverMetrics,
List<AggregationConfig> aggregationConfigs) {
RealtimeSegmentStatsHistory statsHistory =
mock(RealtimeSegmentStatsHistory.class);
@@ -106,6 +107,7 @@ public class MutableSegmentImplTestUtils {
List<String> comparisonColumns = upsertConfig == null ? null :
upsertConfig.getComparisonColumns();
boolean isUpsertDropOutOfOrderRecord = upsertConfig == null ? false :
upsertConfig.isDropOutOfOrderRecord();
String upsertOutOfOrderRecordColumn = upsertConfig == null ? null :
upsertConfig.getOutOfOrderRecordColumn();
+ String dedupTimeColumn = dedupConfig == null ? null :
dedupConfig.getDedupTimeColumn();
DictionaryIndexConfig varLengthDictConf = new DictionaryIndexConfig(false,
true);
RealtimeSegmentConfig.Builder segmentConfBuilder = new
RealtimeSegmentConfig.Builder()
.setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
@@ -119,10 +121,11 @@ public class MutableSegmentImplTestUtils {
.setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode)
.setUpsertComparisonColumns(comparisonColumns)
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
- .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setIngestionAggregationConfigs(aggregationConfigs)
.setUpsertDropOutOfOrderRecord(isUpsertDropOutOfOrderRecord)
.setUpsertOutOfOrderRecordColumn(upsertOutOfOrderRecordColumn)
+ .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
+ .setDedupTimeColumn(dedupTimeColumn)
.setConsumerDir(TEMP_DIR.getAbsolutePath() + "/" + UUID.randomUUID() +
"/consumerDir");
for (Map.Entry<String, JsonIndexConfig> entry :
jsonIndexConfigs.entrySet()) {
segmentConfBuilder.setIndex(entry.getKey(), StandardIndexes.json(),
entry.getValue());
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index b7a73b238f..73ea233607 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -91,7 +91,7 @@ public class MutableSegmentImplUpsertComparisonColTest {
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema,
Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), false, true, upsertConfig,
"secondsSinceEpoch", _partitionUpsertMetadataManager,
- null);
+ null, null);
GenericRow reuse = new GenericRow();
try (RecordReader recordReader =
RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile,
_schema.getColumnNames(), null)) {
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index cdc2fdbf77..4f46bd6d59 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -101,7 +101,7 @@ public class MutableSegmentImplUpsertTest {
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema,
Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), false, true, upsertConfigWithHash,
"secondsSinceEpoch",
- _partitionUpsertMetadataManager, null);
+ _partitionUpsertMetadataManager, null, null);
GenericRow reuse = new GenericRow();
try (RecordReader recordReader =
RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile,
diff --git a/pinot-segment-local/src/test/resources/data/test_dedup_data.json
b/pinot-segment-local/src/test/resources/data/test_dedup_data.json
index 11f9050f53..dfc7ad0889 100644
--- a/pinot-segment-local/src/test/resources/data/test_dedup_data.json
+++ b/pinot-segment-local/src/test/resources/data/test_dedup_data.json
@@ -2,21 +2,25 @@
{
"event_id": "aa",
"description" : "first",
- "secondsSinceEpoch": 1567205394
+ "dedupTime": 5000,
+ "secondsSinceEpoch": 5000
},
{
"event_id": "bb",
"description" : "first",
- "secondsSinceEpoch": 1567205396
+ "dedupTime": 6000,
+ "secondsSinceEpoch": 1000
},
{
"event_id": "aa",
"description" : "second",
- "secondsSinceEpoch": 1567205397
+ "dedupTime": 3000,
+ "secondsSinceEpoch": 5500
},
{
"event_id": "bb",
"description" : "second",
- "secondsSinceEpoch": 1567205392
+ "dedupTime": 1000,
+ "secondsSinceEpoch": 7000
}
]
diff --git a/pinot-segment-local/src/test/resources/data/test_dedup_schema.json
b/pinot-segment-local/src/test/resources/data/test_dedup_schema.json
index 859c6f70c7..8c8b615916 100644
--- a/pinot-segment-local/src/test/resources/data/test_dedup_schema.json
+++ b/pinot-segment-local/src/test/resources/data/test_dedup_schema.json
@@ -8,6 +8,10 @@
{
"name": "description",
"dataType": "STRING"
+ },
+ {
+ "name": "dedupTime",
+ "dataType": "LONG"
}
],
"timeFieldSpec": {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
index 17ff1e271a..78d7b3b9f0 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
@@ -20,25 +20,47 @@ package org.apache.pinot.spi.config.table;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.Map;
import org.apache.pinot.spi.config.BaseJsonConfig;
public class DedupConfig extends BaseJsonConfig {
+ @JsonPropertyDescription("Whether dedup is enabled or not.")
private final boolean _dedupEnabled;
+ @JsonPropertyDescription("Function to hash the primary key.")
private final HashFunction _hashFunction;
+ @JsonPropertyDescription("Custom class for dedup metadata manager. If not
specified, the default implementation "
+ + "ConcurrentMapTableDedupMetadataManager will be used.")
private final String _metadataManagerClass;
+ @JsonPropertyDescription("Custom configs for dedup metadata manager.")
+ private final Map<String, String> _metadataManagerConfigs;
+ @JsonPropertyDescription("When larger than 0, use it for dedup metadata
cleanup, it uses the same unit as the "
+ + "dedup time column. The metadata will be cleaned up when the dedup
time is older than the current time "
+ + "minus metadata TTL. Notice that the metadata may not be cleaned up
immediately after the TTL, it depends on "
+ + "the cleanup schedule.")
+ private final double _metadataTTL;
+ @JsonPropertyDescription("Time column used to calculate dedup metadata TTL.
When it is not specified, the time column"
+ + " from the table config will be used.")
+ private final String _dedupTimeColumn;
public DedupConfig(@JsonProperty(value = "dedupEnabled", required = true)
boolean dedupEnabled,
@JsonProperty(value = "hashFunction") HashFunction hashFunction) {
- this(dedupEnabled, hashFunction, null);
+ this(dedupEnabled, hashFunction, null, null, 0, null);
}
+
@JsonCreator
public DedupConfig(@JsonProperty(value = "dedupEnabled", required = true)
boolean dedupEnabled,
@JsonProperty(value = "hashFunction") HashFunction hashFunction,
- @JsonProperty(value = "metadataManagerClass") String metadataManagerClass
- ) {
+ @JsonProperty(value = "metadataManagerClass") String
metadataManagerClass,
+ @JsonProperty(value = "metadataManagerConfigs") Map<String, String>
metadataManagerConfigs,
+ @JsonProperty(value = "metadataTTL") double metadataTTL,
+ @JsonProperty(value = "dedupTimeColumn") String dedupTimeColumn) {
_dedupEnabled = dedupEnabled;
_hashFunction = hashFunction == null ? HashFunction.NONE : hashFunction;
_metadataManagerClass = metadataManagerClass;
+ _metadataManagerConfigs = metadataManagerConfigs;
+ _metadataTTL = metadataTTL;
+ _dedupTimeColumn = dedupTimeColumn;
}
public HashFunction getHashFunction() {
@@ -52,4 +74,16 @@ public class DedupConfig extends BaseJsonConfig {
public String getMetadataManagerClass() {
return _metadataManagerClass;
}
+
+ public Map<String, String> getMetadataManagerConfigs() {
+ return _metadataManagerConfigs;
+ }
+
+ public double getMetadataTTL() {
+ return _metadataTTL;
+ }
+
+ public String getDedupTimeColumn() {
+ return _dedupTimeColumn;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index b374855d59..90338cc7af 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -394,6 +394,16 @@ public class TableConfig extends BaseJsonConfig {
return _upsertConfig == null ? 0 : _upsertConfig.getMetadataTTL();
}
+ @JsonIgnore
+ public String getDedupTimeColumn() {
+ return _dedupConfig == null ? null : _dedupConfig.getDedupTimeColumn();
+ }
+
+ @JsonIgnore
+ public double getDedupMetadataTTL() {
+ return _dedupConfig == null ? 0 : _dedupConfig.getMetadataTTL();
+ }
+
@JsonIgnore
@Nullable
public String getUpsertDeleteRecordColumn() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]