This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6bf696ec25 Make upsert metadata manager pluggable (#9186)
6bf696ec25 is described below
commit 6bf696ec25f2571c252c07099b81f9876f839b91
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Aug 12 02:26:46 2022 -0700
Make upsert metadata manager pluggable (#9186)
---
.../manager/realtime/RealtimeTableDataManager.java | 26 +-
...adataAndDictionaryAggregationPlanMakerTest.java | 4 +-
.../indexsegment/mutable/MutableSegmentImpl.java | 2 +-
.../upsert/BaseTableUpsertMetadataManager.java | 79 ++++
...ncurrentMapPartitionUpsertMetadataManager.java} | 124 ++----
...> ConcurrentMapTableUpsertMetadataManager.java} | 36 +-
.../upsert/PartitionUpsertMetadataManager.java | 430 +--------------------
.../local/{utils => upsert}/RecordInfo.java | 5 +-
.../local/upsert/TableUpsertMetadataManager.java | 38 +-
.../upsert/TableUpsertMetadataManagerFactory.java | 65 ++++
.../pinot/segment/local/upsert/UpsertUtils.java | 76 ++++
.../dedup/PartitionDedupMetadataManagerTest.java | 2 +-
.../MutableSegmentImplUpsertComparisonColTest.java | 8 +-
.../mutable/MutableSegmentImplUpsertTest.java | 7 +-
...rentMapPartitionUpsertMetadataManagerTest.java} | 16 +-
.../pinot/spi/config/table/UpsertConfig.java | 24 ++
16 files changed, 338 insertions(+), 604 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index f51da5195b..af7f366ebd 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -57,9 +57,9 @@ import
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
-import org.apache.pinot.segment.local.upsert.PartialUpsertHandler;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
import org.apache.pinot.segment.local.utils.SchemaUtils;
import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
@@ -188,26 +188,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
_tableUpsertMetadataManager);
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
Preconditions.checkState(schema != null, "Failed to find schema for
table: %s", _tableNameWithType);
-
- List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
- Preconditions.checkState(!CollectionUtils.isEmpty(primaryKeyColumns),
- "Primary key columns must be configured for upsert");
-
- String comparisonColumn = upsertConfig.getComparisonColumn();
- if (comparisonColumn == null) {
- comparisonColumn =
tableConfig.getValidationConfig().getTimeColumnName();
- }
-
- PartialUpsertHandler partialUpsertHandler = null;
- if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
- assert upsertConfig.getPartialUpsertStrategies() != null;
- partialUpsertHandler = new PartialUpsertHandler(schema,
upsertConfig.getPartialUpsertStrategies(),
- upsertConfig.getDefaultPartialUpsertStrategy(), comparisonColumn);
- }
-
- _tableUpsertMetadataManager =
- new TableUpsertMetadataManager(_tableNameWithType,
primaryKeyColumns, comparisonColumn,
- upsertConfig.getHashFunction(), partialUpsertHandler,
_serverMetrics);
+ _tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(tableConfig, schema, this,
_serverMetrics);
}
}
@@ -264,7 +245,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
public boolean isPartialUpsertEnabled() {
- return _tableUpsertMetadataManager != null &&
_tableUpsertMetadataManager.isPartialUpsertEnabled();
+ return _tableUpsertMetadataManager != null
+ && _tableUpsertMetadataManager.getUpsertMode() ==
UpsertConfig.Mode.PARTIAL;
}
/*
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 4bfb67dfbe..e49b90e657 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -38,7 +38,7 @@ import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUt
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import
org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
@@ -125,7 +125,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class);
_upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR,
SEGMENT_NAME), ReadMode.heap);
((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
- new PartitionUpsertMetadataManager("testTable_REALTIME", 0,
Collections.singletonList("column6"),
+ new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME",
0, Collections.singletonList("column6"),
"daysSinceEpoch", HashFunction.NONE, null, serverMetrics), new
ThreadSafeMutableRoaringBitmap());
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 775fe06f5c..d6de8d0168 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -62,11 +62,11 @@ import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext
import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProvider;
import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.RecordInfo;
import org.apache.pinot.segment.local.utils.FixedIntArrayOffHeapIdMap;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.local.utils.IdMap;
import org.apache.pinot.segment.local.utils.IngestionUtils;
-import org.apache.pinot.segment.local.utils.RecordInfo;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.MutableSegment;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
new file mode 100644
index 0000000000..95666d3ea2
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+@ThreadSafe
+public abstract class BaseTableUpsertMetadataManager implements
TableUpsertMetadataManager {
+ protected String _tableNameWithType;
+ protected List<String> _primaryKeyColumns;
+ protected String _comparisonColumn;
+ protected HashFunction _hashFunction;
+ protected PartialUpsertHandler _partialUpsertHandler;
+ protected ServerMetrics _serverMetrics;
+
+ @Override
+ public void init(TableConfig tableConfig, Schema schema, TableDataManager
tableDataManager,
+ ServerMetrics serverMetrics) {
+ _tableNameWithType = tableConfig.getTableName();
+
+ UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+ Preconditions.checkArgument(upsertConfig != null && upsertConfig.getMode()
!= UpsertConfig.Mode.NONE,
+ "Upsert must be enabled for table: %s", _tableNameWithType);
+
+ _primaryKeyColumns = schema.getPrimaryKeyColumns();
+ Preconditions.checkArgument(!CollectionUtils.isEmpty(_primaryKeyColumns),
+ "Primary key columns must be configured for upsert enabled table: %s",
_tableNameWithType);
+
+ _comparisonColumn = upsertConfig.getComparisonColumn();
+ if (_comparisonColumn == null) {
+ _comparisonColumn =
tableConfig.getValidationConfig().getTimeColumnName();
+ }
+
+ _hashFunction = upsertConfig.getHashFunction();
+
+ if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
+ Map<String, UpsertConfig.Strategy> partialUpsertStrategies =
upsertConfig.getPartialUpsertStrategies();
+ Preconditions.checkArgument(partialUpsertStrategies != null,
+ "Partial-upsert strategies must be configured for partial-upsert
enabled table: %s", _tableNameWithType);
+ _partialUpsertHandler =
+ new PartialUpsertHandler(schema, partialUpsertStrategies,
upsertConfig.getDefaultPartialUpsertStrategy(),
+ _comparisonColumn);
+ }
+
+ _serverMetrics = serverMetrics;
+ }
+
+ @Override
+ public UpsertConfig.Mode getUpsertMode() {
+ return _partialUpsertHandler == null ? UpsertConfig.Mode.FULL :
UpsertConfig.Mode.PARTIAL;
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
similarity index 84%
copy from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
copy to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index d1042fe5c1..e6890c8a6c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -38,7 +38,6 @@ import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.utils.HashUtils;
-import org.apache.pinot.segment.local.utils.RecordInfo;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -47,7 +46,6 @@ import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
-import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.slf4j.Logger;
@@ -55,32 +53,11 @@ import org.slf4j.LoggerFactory;
/**
- * Manages the upsert metadata per partition.
- * <p>For multiple records with the same comparison value (default to
timestamp), the manager will preserve the latest
- * record based on the sequence number of the segment. If 2 records with the
same comparison value are in the same
- * segment, the one with larger doc id will be preserved. Note that for tables
with sorted column, the records will be
- * re-ordered when committing the segment, and we will use the re-ordered doc
ids instead of the ingestion doc ids to
- * decide the record to preserve.
- *
- * <p>There will be short term inconsistency when updating the upsert
metadata, but should be consistent after the
- * operation is done:
- * <ul>
- * <li>
- * When updating a new record, it first removes the doc id from the
current location, then update the new location.
- * </li>
- * <li>
- * When adding a new segment, it removes the doc ids from the current
locations before the segment being added to
- * the RealtimeTableDataManager.
- * </li>
- * <li>
- * When replacing an existing segment, after the record location being
replaced with the new segment, the following
- * updates applied to the new segment's valid doc ids won't be reflected
to the replaced segment's valid doc ids.
- * </li>
- * </ul>
+ * Implementation of {@link PartitionUpsertMetadataManager} that is backed by
a {@link ConcurrentHashMap}.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
@ThreadSafe
-public class PartitionUpsertMetadataManager {
+public class ConcurrentMapPartitionUpsertMetadataManager implements
PartitionUpsertMetadataManager {
private static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS =
TimeUnit.MINUTES.toNanos(1);
private final String _tableNameWithType;
@@ -105,9 +82,9 @@ public class PartitionUpsertMetadataManager {
private long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
private int _numOutOfOrderEvents = 0;
- public PartitionUpsertMetadataManager(String tableNameWithType, int
partitionId, List<String> primaryKeyColumns,
- String comparisonColumn, HashFunction hashFunction, @Nullable
PartialUpsertHandler partialUpsertHandler,
- ServerMetrics serverMetrics) {
+ public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType,
int partitionId,
+ List<String> primaryKeyColumns, String comparisonColumn, HashFunction
hashFunction,
+ @Nullable PartialUpsertHandler partialUpsertHandler, ServerMetrics
serverMetrics) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
_primaryKeyColumns = primaryKeyColumns;
@@ -118,16 +95,12 @@ public class PartitionUpsertMetadataManager {
_logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId +
"-" + getClass().getSimpleName());
}
- /**
- * Returns the primary key columns.
- */
+ @Override
public List<String> getPrimaryKeyColumns() {
return _primaryKeyColumns;
}
- /**
- * Initializes the upsert metadata for the given immutable segment.
- */
+ @Override
public void addSegment(ImmutableSegment segment) {
addSegment(segment, null, null);
}
@@ -154,7 +127,7 @@ public class PartitionUpsertMetadataManager {
validDocIds = new ThreadSafeMutableRoaringBitmap();
}
if (recordInfoIterator == null) {
- recordInfoIterator = getRecordInfoIterator(segment);
+ recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment,
_primaryKeyColumns, _comparisonColumn);
}
addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds,
recordInfoIterator, null, null);
} finally {
@@ -169,42 +142,6 @@ public class PartitionUpsertMetadataManager {
_logger.info("Finished adding segment: {}, current primary key count: {}",
segmentName, numPrimaryKeys);
}
- private Iterator<RecordInfo> getRecordInfoIterator(ImmutableSegment segment)
{
- int numTotalDocs = segment.getSegmentMetadata().getTotalDocs();
- return new Iterator<RecordInfo>() {
- private int _docId = 0;
-
- @Override
- public boolean hasNext() {
- return _docId < numTotalDocs;
- }
-
- @Override
- public RecordInfo next() {
- PrimaryKey primaryKey = new PrimaryKey(new
Object[_primaryKeyColumns.size()]);
- getPrimaryKey(segment, _docId, primaryKey);
-
- Object comparisonValue = segment.getValue(_docId, _comparisonColumn);
- if (comparisonValue instanceof byte[]) {
- comparisonValue = new ByteArray((byte[]) comparisonValue);
- }
- return new RecordInfo(primaryKey, _docId++, (Comparable)
comparisonValue);
- }
- };
- }
-
- private void getPrimaryKey(IndexSegment segment, int docId, PrimaryKey
buffer) {
- Object[] values = buffer.getValues();
- int numPrimaryKeyColumns = values.length;
- for (int i = 0; i < numPrimaryKeyColumns; i++) {
- Object value = segment.getValue(docId, _primaryKeyColumns.get(i));
- if (value instanceof byte[]) {
- value = new ByteArray((byte[]) value);
- }
- values[i] = value;
- }
- }
-
private void addOrReplaceSegment(ImmutableSegmentImpl segment,
ThreadSafeMutableRoaringBitmap validDocIds,
Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment
oldSegment,
@Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
@@ -293,9 +230,7 @@ public class PartitionUpsertMetadataManager {
}
}
- /**
- * Updates the upsert metadata for a new consumed record in the given
consuming segment.
- */
+ @Override
public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
ThreadSafeMutableRoaringBitmap validDocIds =
Objects.requireNonNull(segment.getValidDocIds());
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
_hashFunction),
@@ -330,9 +265,7 @@ public class PartitionUpsertMetadataManager {
_primaryKeyToRecordLocationMap.size());
}
- /**
- * Replaces the upsert metadata for the old segment with the new immutable
segment.
- */
+ @Override
public void replaceSegment(ImmutableSegment segment, IndexSegment
oldSegment) {
replaceSegment(segment, null, null, oldSegment);
}
@@ -363,7 +296,7 @@ public class PartitionUpsertMetadataManager {
validDocIds = new ThreadSafeMutableRoaringBitmap();
}
if (recordInfoIterator == null) {
- recordInfoIterator = getRecordInfoIterator(segment);
+ recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment,
_primaryKeyColumns, _comparisonColumn);
}
addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds,
recordInfoIterator, oldSegment,
validDocIdsForOldSegment);
@@ -402,9 +335,7 @@ public class PartitionUpsertMetadataManager {
_logger.info("Finished replacing segment: {}, current primary key count:
{}", segmentName, numPrimaryKeys);
}
- /**
- * Removes the upsert metadata for the given segment.
- */
+ @Override
public void removeSegment(IndexSegment segment) {
String segmentName = segment.getSegmentName();
_logger.info("Removing {} segment: {}, current primary key count: {}",
@@ -446,7 +377,7 @@ public class PartitionUpsertMetadataManager {
PeekableIntIterator iterator = validDocIds.getIntIterator();
while (iterator.hasNext()) {
int docId = iterator.next();
- getPrimaryKey(segment, docId, primaryKey);
+ UpsertUtils.getPrimaryKey(segment, _primaryKeyColumns, docId,
primaryKey);
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
_hashFunction),
(pk, recordLocation) -> {
if (recordLocation.getSegment() == segment) {
@@ -457,9 +388,7 @@ public class PartitionUpsertMetadataManager {
}
}
- /**
- * Returns the merged record when partial-upsert is enabled.
- */
+ @Override
public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) {
// Directly return the record when partial-upsert is not enabled
if (_partialUpsertHandler == null) {
@@ -498,4 +427,29 @@ public class PartitionUpsertMetadataManager {
return record;
}
}
+
+ @VisibleForTesting
+ static class RecordLocation {
+ private final IndexSegment _segment;
+ private final int _docId;
+ private final Comparable _comparisonValue;
+
+ public RecordLocation(IndexSegment indexSegment, int docId, Comparable
comparisonValue) {
+ _segment = indexSegment;
+ _docId = docId;
+ _comparisonValue = comparisonValue;
+ }
+
+ public IndexSegment getSegment() {
+ return _segment;
+ }
+
+ public int getDocId() {
+ return _docId;
+ }
+
+ public Comparable getComparisonValue() {
+ return _comparisonValue;
+ }
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
similarity index 51%
rename from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
rename to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 35dff04a75..67474e145d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -18,33 +18,23 @@
*/
package org.apache.pinot.segment.local.upsert;
-import org.apache.pinot.segment.spi.IndexSegment;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.concurrent.ThreadSafe;
/**
- * Indicate a record's location on the local host.
+ * Implementation of {@link TableUpsertMetadataManager} that is backed by a
{@link ConcurrentHashMap}.
*/
-public class RecordLocation {
- private final IndexSegment _segment;
- private final int _docId;
- /** value used to denote the order */
- private final Comparable _comparisonValue;
+@ThreadSafe
+public class ConcurrentMapTableUpsertMetadataManager extends
BaseTableUpsertMetadataManager {
+ private final Map<Integer, ConcurrentMapPartitionUpsertMetadataManager>
_partitionMetadataManagerMap =
+ new ConcurrentHashMap<>();
- public RecordLocation(IndexSegment indexSegment, int docId, Comparable
comparisonValue) {
- _segment = indexSegment;
- _docId = docId;
- _comparisonValue = comparisonValue;
- }
-
- public IndexSegment getSegment() {
- return _segment;
- }
-
- public int getDocId() {
- return _docId;
- }
-
- public Comparable getComparisonValue() {
- return _comparisonValue;
+ @Override
+ public ConcurrentMapPartitionUpsertMetadataManager
getOrCreatePartitionManager(int partitionId) {
+ return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
+ k -> new
ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k,
_primaryKeyColumns,
+ _comparisonColumn, _hashFunction, _partialUpsertHandler,
_serverMetrics));
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index d1042fe5c1..2c5f68df45 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -18,40 +18,12 @@
*/
package org.apache.pinot.segment.local.upsert;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Iterator;
import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.common.metrics.ServerGauge;
-import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
-import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
-import org.apache.pinot.segment.local.utils.HashUtils;
-import org.apache.pinot.segment.local.utils.RecordInfo;
-import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
-import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.PrimaryKey;
-import org.apache.pinot.spi.utils.ByteArray;
-import org.roaringbitmap.PeekableIntIterator;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -78,424 +50,36 @@ import org.slf4j.LoggerFactory;
* </li>
* </ul>
*/
-@SuppressWarnings({"rawtypes", "unchecked"})
@ThreadSafe
-public class PartitionUpsertMetadataManager {
- private static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS =
TimeUnit.MINUTES.toNanos(1);
-
- private final String _tableNameWithType;
- private final int _partitionId;
- private final List<String> _primaryKeyColumns;
- private final String _comparisonColumn;
- private final HashFunction _hashFunction;
- private final PartialUpsertHandler _partialUpsertHandler;
- private final ServerMetrics _serverMetrics;
- private final Logger _logger;
-
- // TODO(upsert): consider an off-heap KV store to persist this mapping to
improve the recovery speed.
- @VisibleForTesting
- final ConcurrentHashMap<Object, RecordLocation>
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
-
- @VisibleForTesting
- final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet();
-
- // Reused for reading previous record during partial upsert
- private final GenericRow _reuse = new GenericRow();
-
- private long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
- private int _numOutOfOrderEvents = 0;
-
- public PartitionUpsertMetadataManager(String tableNameWithType, int
partitionId, List<String> primaryKeyColumns,
- String comparisonColumn, HashFunction hashFunction, @Nullable
PartialUpsertHandler partialUpsertHandler,
- ServerMetrics serverMetrics) {
- _tableNameWithType = tableNameWithType;
- _partitionId = partitionId;
- _primaryKeyColumns = primaryKeyColumns;
- _comparisonColumn = comparisonColumn;
- _hashFunction = hashFunction;
- _partialUpsertHandler = partialUpsertHandler;
- _serverMetrics = serverMetrics;
- _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId +
"-" + getClass().getSimpleName());
- }
+public interface PartitionUpsertMetadataManager {
/**
* Returns the primary key columns.
*/
- public List<String> getPrimaryKeyColumns() {
- return _primaryKeyColumns;
- }
+ List<String> getPrimaryKeyColumns();
/**
* Initializes the upsert metadata for the given immutable segment.
*/
- public void addSegment(ImmutableSegment segment) {
- addSegment(segment, null, null);
- }
-
- @VisibleForTesting
- void addSegment(ImmutableSegment segment, @Nullable
ThreadSafeMutableRoaringBitmap validDocIds,
- @Nullable Iterator<RecordInfo> recordInfoIterator) {
- String segmentName = segment.getSegmentName();
- _logger.info("Adding segment: {}, current primary key count: {}",
segmentName,
- _primaryKeyToRecordLocationMap.size());
-
- if (segment instanceof EmptyIndexSegment) {
- _logger.info("Skip adding empty segment: {}", segmentName);
- return;
- }
-
- Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType,
segmentName);
- segmentLock.lock();
- try {
- Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
- "Got unsupported segment implementation: {} for segment: {}, table:
{}", segment.getClass(), segmentName,
- _tableNameWithType);
- if (validDocIds == null) {
- validDocIds = new ThreadSafeMutableRoaringBitmap();
- }
- if (recordInfoIterator == null) {
- recordInfoIterator = getRecordInfoIterator(segment);
- }
- addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds,
recordInfoIterator, null, null);
- } finally {
- segmentLock.unlock();
- }
-
- // Update metrics
- int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
- numPrimaryKeys);
-
- _logger.info("Finished adding segment: {}, current primary key count: {}",
segmentName, numPrimaryKeys);
- }
-
- private Iterator<RecordInfo> getRecordInfoIterator(ImmutableSegment segment)
{
- int numTotalDocs = segment.getSegmentMetadata().getTotalDocs();
- return new Iterator<RecordInfo>() {
- private int _docId = 0;
-
- @Override
- public boolean hasNext() {
- return _docId < numTotalDocs;
- }
-
- @Override
- public RecordInfo next() {
- PrimaryKey primaryKey = new PrimaryKey(new
Object[_primaryKeyColumns.size()]);
- getPrimaryKey(segment, _docId, primaryKey);
-
- Object comparisonValue = segment.getValue(_docId, _comparisonColumn);
- if (comparisonValue instanceof byte[]) {
- comparisonValue = new ByteArray((byte[]) comparisonValue);
- }
- return new RecordInfo(primaryKey, _docId++, (Comparable)
comparisonValue);
- }
- };
- }
-
- private void getPrimaryKey(IndexSegment segment, int docId, PrimaryKey
buffer) {
- Object[] values = buffer.getValues();
- int numPrimaryKeyColumns = values.length;
- for (int i = 0; i < numPrimaryKeyColumns; i++) {
- Object value = segment.getValue(docId, _primaryKeyColumns.get(i));
- if (value instanceof byte[]) {
- value = new ByteArray((byte[]) value);
- }
- values[i] = value;
- }
- }
-
- private void addOrReplaceSegment(ImmutableSegmentImpl segment,
ThreadSafeMutableRoaringBitmap validDocIds,
- Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment
oldSegment,
- @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
- String segmentName = segment.getSegmentName();
- segment.enableUpsert(this, validDocIds);
-
- AtomicInteger numKeysInWrongSegment = new AtomicInteger();
- while (recordInfoIterator.hasNext()) {
- RecordInfo recordInfo = recordInfoIterator.next();
-
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
_hashFunction),
- (primaryKey, currentRecordLocation) -> {
- if (currentRecordLocation != null) {
- // Existing primary key
- IndexSegment currentSegment = currentRecordLocation.getSegment();
- int comparisonResult =
-
recordInfo.getComparisonValue().compareTo(currentRecordLocation.getComparisonValue());
-
- // The current record is in the same segment
- // Update the record location when there is a tie to keep the
newer record. Note that the record info
- // iterator will return records with incremental doc ids.
- if (currentSegment == segment) {
- if (comparisonResult >= 0) {
- validDocIds.replace(currentRecordLocation.getDocId(),
recordInfo.getDocId());
- return new RecordLocation(segment, recordInfo.getDocId(),
recordInfo.getComparisonValue());
- } else {
- return currentRecordLocation;
- }
- }
-
- // The current record is in an old segment being replaced
- // This could happen when committing a consuming segment, or
reloading a completed segment. In this
- // case, we want to update the record location when there is a
tie because the record locations should
- // point to the new added segment instead of the old segment
being replaced. Also, do not update the valid
- // doc ids for the old segment because it has not been replaced
yet. We pass in an optional valid doc ids
- // snapshot for the old segment, which can be updated and used
to track the docs not replaced yet.
- if (currentSegment == oldSegment) {
- if (comparisonResult >= 0) {
- validDocIds.add(recordInfo.getDocId());
- if (validDocIdsForOldSegment != null) {
-
validDocIdsForOldSegment.remove(currentRecordLocation.getDocId());
- }
- return new RecordLocation(segment, recordInfo.getDocId(),
recordInfo.getComparisonValue());
- } else {
- return currentRecordLocation;
- }
- }
-
- // This should not happen because the previously replaced
segment should have all keys removed. We still
- // handle it here, and also track the number of keys not
properly replaced previously.
- String currentSegmentName = currentSegment.getSegmentName();
- if (currentSegmentName.equals(segmentName)) {
- numKeysInWrongSegment.getAndIncrement();
- if (comparisonResult >= 0) {
- validDocIds.add(recordInfo.getDocId());
- return new RecordLocation(segment, recordInfo.getDocId(),
recordInfo.getComparisonValue());
- } else {
- return currentRecordLocation;
- }
- }
-
- // The current record is in a different segment
- // Update the record location when getting a newer comparison
value, or the value is the same as the
- // current value, but the segment has a larger sequence number
(the segment is newer than the current
- // segment).
- if (comparisonResult > 0 || (comparisonResult == 0 &&
LLCSegmentName.isLowLevelConsumerSegmentName(
- segmentName) &&
LLCSegmentName.isLowLevelConsumerSegmentName(currentSegmentName)
- && LLCSegmentName.getSequenceNumber(segmentName) >
LLCSegmentName.getSequenceNumber(
- currentSegmentName))) {
-
Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentRecordLocation.getDocId());
- validDocIds.add(recordInfo.getDocId());
- return new RecordLocation(segment, recordInfo.getDocId(),
recordInfo.getComparisonValue());
- } else {
- return currentRecordLocation;
- }
- } else {
- // New primary key
- validDocIds.add(recordInfo.getDocId());
- return new RecordLocation(segment, recordInfo.getDocId(),
recordInfo.getComparisonValue());
- }
- });
- }
- int numKeys = numKeysInWrongSegment.get();
- if (numKeys > 0) {
- _logger.warn("Found {} primary keys in the wrong segment when adding
segment: {}", numKeys, segmentName);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.UPSERT_KEYS_IN_WRONG_SEGMENT, numKeys);
- }
- }
+ void addSegment(ImmutableSegment segment);
/**
* Updates the upsert metadata for a new consumed record in the given
consuming segment.
*/
- public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
- ThreadSafeMutableRoaringBitmap validDocIds =
Objects.requireNonNull(segment.getValidDocIds());
-
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
_hashFunction),
- (primaryKey, currentRecordLocation) -> {
- if (currentRecordLocation != null) {
- // Existing primary key
-
- // Update the record location when the new comparison value is
greater than or equal to the current value.
- // Update the record location when there is a tie to keep the
newer record.
- if
(recordInfo.getComparisonValue().compareTo(currentRecordLocation.getComparisonValue())
>= 0) {
- IndexSegment currentSegment = currentRecordLocation.getSegment();
- int currentDocId = currentRecordLocation.getDocId();
- if (segment == currentSegment) {
- validDocIds.replace(currentDocId, recordInfo.getDocId());
- } else {
-
Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentDocId);
- validDocIds.add(recordInfo.getDocId());
- }
- return new RecordLocation(segment, recordInfo.getDocId(),
recordInfo.getComparisonValue());
- } else {
- return currentRecordLocation;
- }
- } else {
- // New primary key
- validDocIds.add(recordInfo.getDocId());
- return new RecordLocation(segment, recordInfo.getDocId(),
recordInfo.getComparisonValue());
- }
- });
-
- // Update metrics
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
- _primaryKeyToRecordLocationMap.size());
- }
+ void addRecord(MutableSegment segment, RecordInfo recordInfo);
/**
* Replaces the upsert metadata for the old segment with the new immutable
segment.
*/
- public void replaceSegment(ImmutableSegment segment, IndexSegment
oldSegment) {
- replaceSegment(segment, null, null, oldSegment);
- }
-
- @VisibleForTesting
- void replaceSegment(ImmutableSegment segment, @Nullable
ThreadSafeMutableRoaringBitmap validDocIds,
- @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment
oldSegment) {
- String segmentName = segment.getSegmentName();
-
Preconditions.checkArgument(segmentName.equals(oldSegment.getSegmentName()),
- "Cannot replace segment with different name for table: {}, old
segment: {}, new segment: {}",
- _tableNameWithType, oldSegment.getSegmentName(), segmentName);
- _logger.info("Replacing {} segment: {}, current primary key count: {}",
- oldSegment instanceof ImmutableSegment ? "immutable" : "mutable",
segmentName,
- _primaryKeyToRecordLocationMap.size());
-
- Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType,
segmentName);
- segmentLock.lock();
- try {
- MutableRoaringBitmap validDocIdsForOldSegment =
- oldSegment.getValidDocIds() != null ?
oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
- if (segment instanceof EmptyIndexSegment) {
- _logger.info("Skip adding empty segment: {}", segmentName);
- } else {
- Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
- "Got unsupported segment implementation: {} for segment: {},
table: {}", segment.getClass(), segmentName,
- _tableNameWithType);
- if (validDocIds == null) {
- validDocIds = new ThreadSafeMutableRoaringBitmap();
- }
- if (recordInfoIterator == null) {
- recordInfoIterator = getRecordInfoIterator(segment);
- }
- addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds,
recordInfoIterator, oldSegment,
- validDocIdsForOldSegment);
- }
-
- if (validDocIdsForOldSegment != null &&
!validDocIdsForOldSegment.isEmpty()) {
- int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
- if (_partialUpsertHandler != null) {
- // For partial-upsert table, because we do not restore the original
record location when removing the primary
- // keys not replaced, it can potentially cause inconsistency between
replicas. This can happen when a
- // consuming segment is replaced by a committed segment that is
consumed from a different server with
- // different records (some stream consumer cannot guarantee
consuming the messages in the same order).
- _logger.warn("Found {} primary keys not replaced when replacing
segment: {} for partial-upsert table. This "
- + "can potentially cause inconsistency between replicas",
numKeysNotReplaced, segmentName);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
- numKeysNotReplaced);
- } else {
- _logger.info("Found {} primary keys not replaced when replacing
segment: {}", numKeysNotReplaced,
- segmentName);
- }
- removeSegment(oldSegment, validDocIdsForOldSegment);
- }
- } finally {
- segmentLock.unlock();
- }
-
- if (!(oldSegment instanceof EmptyIndexSegment)) {
- _replacedSegments.add(oldSegment);
- }
-
- // Update metrics
- int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
- numPrimaryKeys);
-
- _logger.info("Finished replacing segment: {}, current primary key count:
{}", segmentName, numPrimaryKeys);
- }
+ void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment);
/**
* Removes the upsert metadata for the given segment.
*/
- public void removeSegment(IndexSegment segment) {
- String segmentName = segment.getSegmentName();
- _logger.info("Removing {} segment: {}, current primary key count: {}",
- segment instanceof ImmutableSegment ? "immutable" : "mutable",
segmentName,
- _primaryKeyToRecordLocationMap.size());
-
- if (_replacedSegments.remove(segment)) {
- _logger.info("Skip removing replaced segment: {}", segmentName);
- return;
- }
-
- Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType,
segmentName);
- segmentLock.lock();
- try {
- MutableRoaringBitmap validDocIds =
- segment.getValidDocIds() != null ?
segment.getValidDocIds().getMutableRoaringBitmap() : null;
- if (validDocIds == null || validDocIds.isEmpty()) {
- _logger.info("Skip removing segment without valid docs: {}",
segmentName);
- return;
- }
-
- _logger.info("Removing {} primary keys for segment: {}",
validDocIds.getCardinality(), segmentName);
- removeSegment(segment, validDocIds);
- } finally {
- segmentLock.unlock();
- }
-
- // Update metrics
- int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
- numPrimaryKeys);
-
- _logger.info("Finished removing segment: {}, current primary key count:
{}", segmentName, numPrimaryKeys);
- }
-
- private void removeSegment(IndexSegment segment, MutableRoaringBitmap
validDocIds) {
- assert !validDocIds.isEmpty();
- PrimaryKey primaryKey = new PrimaryKey(new
Object[_primaryKeyColumns.size()]);
- PeekableIntIterator iterator = validDocIds.getIntIterator();
- while (iterator.hasNext()) {
- int docId = iterator.next();
- getPrimaryKey(segment, docId, primaryKey);
-
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
_hashFunction),
- (pk, recordLocation) -> {
- if (recordLocation.getSegment() == segment) {
- return null;
- }
- return recordLocation;
- });
- }
- }
+ void removeSegment(IndexSegment segment);
/**
* Returns the merged record when partial-upsert is enabled.
*/
- public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) {
- // Directly return the record when partial-upsert is not enabled
- if (_partialUpsertHandler == null) {
- return record;
- }
-
- AtomicReference<GenericRow> previousRecordReference = new
AtomicReference<>();
- RecordLocation currentRecordLocation =
_primaryKeyToRecordLocationMap.computeIfPresent(
- HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
(pk, recordLocation) -> {
- if
(recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue())
>= 0) {
- _reuse.clear();
-
previousRecordReference.set(recordLocation.getSegment().getRecord(recordLocation.getDocId(),
_reuse));
- }
- return recordLocation;
- });
- if (currentRecordLocation != null) {
- // Existing primary key
- GenericRow previousRecord = previousRecordReference.get();
- if (previousRecord != null) {
- return _partialUpsertHandler.merge(previousRecord, record);
- } else {
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L);
- _numOutOfOrderEvents++;
- long currentTimeNs = System.nanoTime();
- if (currentTimeNs - _lastOutOfOrderEventReportTimeNs >
OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) {
- _logger.warn("Skipped {} out-of-order events for partial-upsert
table (the last event has current comparison "
- + "value: {}, record comparison value: {})",
_numOutOfOrderEvents,
- currentRecordLocation.getComparisonValue(),
recordInfo.getComparisonValue());
- _lastOutOfOrderEventReportTimeNs = currentTimeNs;
- _numOutOfOrderEvents = 0;
- }
- return record;
- }
- } else {
- // New primary key
- return record;
- }
- }
+ GenericRow updateRecord(GenericRow record, RecordInfo recordInfo);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/RecordInfo.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java
similarity index 92%
rename from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/RecordInfo.java
rename to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java
index 5e235c86e0..f4f139f6c3 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/RecordInfo.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java
@@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.segment.local.utils;
+package org.apache.pinot.segment.local.upsert;
import org.apache.pinot.spi.data.readers.PrimaryKey;
-public final class RecordInfo {
+@SuppressWarnings("rawtypes")
+public class RecordInfo {
private final PrimaryKey _primaryKey;
private final int _docId;
private final Comparable _comparisonValue;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index 108438e95e..ffafb999ee 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -18,45 +18,23 @@
*/
package org.apache.pinot.segment.local.upsert;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
/**
* The manager of the upsert metadata of a table.
*/
@ThreadSafe
-public class TableUpsertMetadataManager {
- private final Map<Integer, PartitionUpsertMetadataManager>
_partitionMetadataManagerMap = new ConcurrentHashMap<>();
- private final String _tableNameWithType;
- private final List<String> _primaryKeyColumns;
- private final String _comparisonColumn;
- private final HashFunction _hashFunction;
- private final PartialUpsertHandler _partialUpsertHandler;
- private final ServerMetrics _serverMetrics;
+public interface TableUpsertMetadataManager {
- public TableUpsertMetadataManager(String tableNameWithType, List<String>
primaryKeyColumns, String comparisonColumn,
- HashFunction hashFunction, @Nullable PartialUpsertHandler
partialUpsertHandler, ServerMetrics serverMetrics) {
- _tableNameWithType = tableNameWithType;
- _primaryKeyColumns = primaryKeyColumns;
- _comparisonColumn = comparisonColumn;
- _hashFunction = hashFunction;
- _partialUpsertHandler = partialUpsertHandler;
- _serverMetrics = serverMetrics;
- }
+ void init(TableConfig tableConfig, Schema schema, TableDataManager
tableDataManager, ServerMetrics serverMetrics);
- public PartitionUpsertMetadataManager getOrCreatePartitionManager(int
partitionId) {
- return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
- k -> new PartitionUpsertMetadataManager(_tableNameWithType, k,
_primaryKeyColumns, _comparisonColumn,
- _hashFunction, _partialUpsertHandler, _serverMetrics));
- }
+ ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int
partitionId);
- public boolean isPartialUpsertEnabled() {
- return _partialUpsertHandler != null;
- }
+ UpsertConfig.Mode getUpsertMode();
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
new file mode 100644
index 0000000000..989f5a1e2c
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TableUpsertMetadataManagerFactory {
+ private TableUpsertMetadataManagerFactory() {
+ }
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class);
+
+ public static TableUpsertMetadataManager create(TableConfig tableConfig,
Schema schema,
+ TableDataManager tableDataManager, ServerMetrics serverMetrics) {
+ String tableNameWithType = tableConfig.getTableName();
+ UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+ Preconditions.checkArgument(upsertConfig != null, "Must provide upsert
config for table: %s", tableNameWithType);
+
+ TableUpsertMetadataManager metadataManager;
+ String metadataManagerClass = upsertConfig.getMetadataManagerClass();
+ if (StringUtils.isNotEmpty(metadataManagerClass)) {
+ LOGGER.info("Creating TableUpsertMetadataManager with class: {} for
table: {}", metadataManagerClass,
+ tableNameWithType);
+ try {
+ metadataManager =
+ (TableUpsertMetadataManager)
Class.forName(metadataManagerClass).getConstructor().newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Caught exception constructing
TableUpsertMetadataManager with class: %s for table: %s",
+ metadataManagerClass, tableNameWithType));
+ }
+ } else {
+ LOGGER.info("Creating ConcurrentMapTableUpsertMetadataManager for table:
{}", tableNameWithType);
+ metadataManager = new ConcurrentMapTableUpsertMetadataManager();
+ }
+
+ metadataManager.init(tableConfig, schema, tableDataManager, serverMetrics);
+ return metadataManager;
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
new file mode 100644
index 0000000000..a7ea1b92f7
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.util.Iterator;
+import java.util.List;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+@SuppressWarnings("rawtypes")
+public class UpsertUtils {
+ private UpsertUtils() {
+ }
+
+ /**
+ * Returns an iterator of {@link RecordInfo} from the segment.
+ */
+ public static Iterator<RecordInfo> getRecordInfoIterator(ImmutableSegment
segment, List<String> primaryKeyColumns,
+ String comparisonColumn) {
+ int numTotalDocs = segment.getSegmentMetadata().getTotalDocs();
+ return new Iterator<RecordInfo>() {
+ private int _docId = 0;
+
+ @Override
+ public boolean hasNext() {
+ return _docId < numTotalDocs;
+ }
+
+ @Override
+ public RecordInfo next() {
+ PrimaryKey primaryKey = new PrimaryKey(new
Object[primaryKeyColumns.size()]);
+ getPrimaryKey(segment, primaryKeyColumns, _docId, primaryKey);
+
+ Object comparisonValue = segment.getValue(_docId, comparisonColumn);
+ if (comparisonValue instanceof byte[]) {
+ comparisonValue = new ByteArray((byte[]) comparisonValue);
+ }
+ return new RecordInfo(primaryKey, _docId++, (Comparable)
comparisonValue);
+ }
+ };
+ }
+
+ /**
+ * Reads a primary key from the segment.
+ */
+ public static void getPrimaryKey(IndexSegment segment, List<String>
primaryKeyColumns, int docId, PrimaryKey buffer) {
+ Object[] values = buffer.getValues();
+ int numPrimaryKeyColumns = values.length;
+ for (int i = 0; i < numPrimaryKeyColumns; i++) {
+ Object value = segment.getValue(docId, primaryKeyColumns.get(i));
+ if (value instanceof byte[]) {
+ value = new ByteArray((byte[]) value);
+ }
+ values[i] = value;
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
index 39f7d6ae98..ca305278ca 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
@@ -25,8 +25,8 @@ import java.util.Map;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.upsert.RecordInfo;
import org.apache.pinot.segment.local.utils.HashUtils;
-import org.apache.pinot.segment.local.utils.RecordInfo;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.PrimaryKey;
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index 08cbf3e91c..a8d8e956c4 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -22,10 +22,10 @@ import java.io.File;
import java.net.URL;
import java.util.Collections;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
-import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
-import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -66,8 +66,8 @@ public class MutableSegmentImplUpsertComparisonColTest {
_recordTransformer =
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
_partitionUpsertMetadataManager =
- new TableUpsertMetadataManager("testTable_REALTIME",
_schema.getPrimaryKeyColumns(), "offset",
- HashFunction.NONE, null,
mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
+ TableUpsertMetadataManagerFactory.create(_tableConfig, _schema,
mock(TableDataManager.class),
+ mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema,
Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), false, true, offsetUpsertConfig,
"secondsSinceEpoch",
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index 015ba5705a..a2439d9f8f 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -22,9 +22,10 @@ import java.io.File;
import java.net.URL;
import java.util.Collections;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
-import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -64,8 +65,8 @@ public class MutableSegmentImplUpsertTest {
_recordTransformer =
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
_partitionUpsertMetadataManager =
- new TableUpsertMetadataManager("testTable_REALTIME",
_schema.getPrimaryKeyColumns(), "secondsSinceEpoch",
- hashFunction, null,
mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
+ TableUpsertMetadataManagerFactory.create(_tableConfig, _schema,
mock(TableDataManager.class),
+ mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema,
Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), false, true, upsertConfigWithHash,
"secondsSinceEpoch",
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
similarity index 96%
rename from
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
rename to
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index aa1392d6df..5a6e45573c 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -26,8 +26,8 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import
org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager.RecordLocation;
import org.apache.pinot.segment.local.utils.HashUtils;
-import org.apache.pinot.segment.local.utils.RecordInfo;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -49,7 +49,7 @@ import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
-public class PartitionUpsertMetadataManagerTest {
+public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private static final String RAW_TABLE_NAME = "testTable";
private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
@@ -61,9 +61,9 @@ public class PartitionUpsertMetadataManagerTest {
}
private void verifyAddReplaceRemoveSegment(HashFunction hashFunction) {
- PartitionUpsertMetadataManager upsertMetadataManager =
- new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
Collections.singletonList("pk"), "timeCol",
- hashFunction, null, mock(ServerMetrics.class));
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
+ "timeCol", hashFunction, null, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
@@ -231,9 +231,9 @@ public class PartitionUpsertMetadataManagerTest {
}
private void verifyAddRecord(HashFunction hashFunction) {
- PartitionUpsertMetadataManager upsertMetadataManager =
- new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
Collections.singletonList("pk"), "timeCol",
- hashFunction, null, mock(ServerMetrics.class));
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
+ "timeCol", hashFunction, null, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index f687be7501..162e6b3030 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -54,6 +54,12 @@ public class UpsertConfig extends BaseJsonConfig {
@JsonPropertyDescription("Column for upsert comparison, default to time
column")
private String _comparisonColumn;
+ @JsonPropertyDescription("Custom class for upsert metadata manager")
+ private String _metadataManagerClass;
+
+ @JsonPropertyDescription("Custom configs for upsert metadata manager")
+ private Map<String, String> _metadataManagerConfigs;
+
@Deprecated
public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode,
@JsonProperty("partialUpsertStrategies") @Nullable Map<String, Strategy>
partialUpsertStrategies,
@@ -105,6 +111,16 @@ public class UpsertConfig extends BaseJsonConfig {
return _comparisonColumn;
}
+ @Nullable
+ public String getMetadataManagerClass() {
+ return _metadataManagerClass;
+ }
+
+ @Nullable
+ public Map<String, String> getMetadataManagerConfigs() {
+ return _metadataManagerConfigs;
+ }
+
public void setHashFunction(HashFunction hashFunction) {
_hashFunction = hashFunction;
}
@@ -136,4 +152,12 @@ public class UpsertConfig extends BaseJsonConfig {
public void setComparisonColumn(String comparisonColumn) {
_comparisonColumn = comparisonColumn;
}
+
+ public void setMetadataManagerClass(String metadataManagerClass) {
+ _metadataManagerClass = metadataManagerClass;
+ }
+
+ public void setMetadataManagerConfigs(Map<String, String>
metadataManagerConfigs) {
+ _metadataManagerConfigs = metadataManagerConfigs;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]