This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f4e5518 Add partial upsert config and mergers (#6899)
f4e5518 is described below
commit f4e55182a8c552c8493d1693868b0c752b1a1d56
Author: deemoliu <[email protected]>
AuthorDate: Tue Jun 22 11:45:29 2021 -0700
Add partial upsert config and mergers (#6899)
Add the following pieces for partial upsert:
- The upsert metadata and upsert data manager changes to support previous
record lookup process
- Partial upsert merger and handler framework for different partial upsert
strategies
- Null value handling during partial upsert
- Make sure all segments are online before consume
- Table config validation
- A partial upsert quickstart demo using MeetupEventRSVP
---
.../manager/realtime/RealtimeTableDataManager.java | 46 ++++--
.../org/apache/pinot/core/plan/FilterPlanNode.java | 12 +-
.../core/plan/maker/InstancePlanMakerImplV2.java | 2 +-
.../query/pruner/SelectionQuerySegmentPruner.java | 2 +-
.../realtime/LLRealtimeSegmentDataManagerTest.java | 12 +-
...adataAndDictionaryAggregationPlanMakerTest.java | 4 +-
.../pruner/SelectionQuerySegmentPrunerTest.java | 6 +-
.../indexsegment/immutable/EmptyIndexSegment.java | 4 +-
.../immutable/ImmutableSegmentImpl.java | 18 +-
.../indexsegment/mutable/IntermediateSegment.java | 4 +-
.../indexsegment/mutable/MutableSegmentImpl.java | 65 ++++----
.../realtime/impl/geospatial/MutableH3Index.java | 2 +-
.../invertedindex/RealtimeInvertedIndexReader.java | 2 +-
.../impl/nullvalue/MutableNullValueVector.java | 2 +-
.../index/readers/ValidDocIndexReaderImpl.java | 2 +-
.../segment/local/upsert/PartialUpsertHandler.java | 142 ++++++++++++++++
.../upsert/PartitionUpsertMetadataManager.java | 161 +++++++++++-------
.../pinot/segment/local/upsert/RecordLocation.java | 18 +-
.../local/upsert/TableUpsertMetadataManager.java | 10 +-
.../merger/IncrementMerger.java} | 33 ++--
.../local/upsert/merger/OverwriteMerger.java | 27 +--
.../local/upsert/merger/PartialUpsertMerger.java | 32 ++--
.../upsert/merger/PartialUpsertMergerFactory.java | 31 ++--
.../segment/local/utils/TableConfigUtils.java | 40 ++++-
.../mutable/MutableSegmentImplUpsertTest.java | 4 +-
.../local/upsert/PartialUpsertHandlerTest.java | 81 +++++++++
.../upsert/PartitionUpsertMetadataManagerTest.java | 94 ++++++-----
.../segment/local/utils}/TableConfigUtilsTest.java | 181 +++++++++++++++------
.../org/apache/pinot/segment/spi/IndexSegment.java | 4 +-
.../spi/index}/ThreadSafeMutableRoaringBitmap.java | 2 +-
.../pinot/spi/config/table/UpsertConfig.java | 28 +++-
.../apache/pinot/spi/data/readers/GenericRow.java | 10 +-
.../pinot/spi/config/table/UpsertConfigTest.java | 18 +-
pinot-tools/pom.xml | 11 ++
.../pinot/tools/PartialUpsertQuickStart.java | 119 ++++++++++++++
...t_partial_meetupRsvp_realtime_table_config.json | 53 ++++++
36 files changed, 944 insertions(+), 338 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index b1a9e2d..4cb8cca 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -54,17 +54,18 @@ import
org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
-import
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import
org.apache.pinot.segment.local.segment.index.loader.V3RemoveIndexException;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
+import org.apache.pinot.segment.local.upsert.PartialUpsertHandler;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.local.utils.SchemaUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -158,9 +159,18 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", _tableNameWithType);
_upsertMode = tableConfig.getUpsertMode();
if (isUpsertEnabled()) {
+ UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+ assert upsertConfig != null;
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
Preconditions.checkState(schema != null, "Failed to find schema for
table: %s", _tableNameWithType);
- _tableUpsertMetadataManager = new
TableUpsertMetadataManager(_tableNameWithType, _serverMetrics);
+
+ PartialUpsertHandler partialUpsertHandler = null;
+ if (isPartialUpsertEnabled()) {
+ partialUpsertHandler =
+ new PartialUpsertHandler(_helixManager, _tableNameWithType,
upsertConfig.getPartialUpsertStrategies());
+ }
+ _tableUpsertMetadataManager =
+ new TableUpsertMetadataManager(_tableNameWithType, _serverMetrics,
partialUpsertHandler);
_primaryKeyColumns = schema.getPrimaryKeyColumns();
Preconditions.checkState(!CollectionUtils.isEmpty(_primaryKeyColumns),
"Primary key columns must be configured for upsert");
@@ -232,6 +242,10 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
return _upsertMode != UpsertConfig.Mode.NONE;
}
+ public boolean isPartialUpsertEnabled() {
+ return _upsertMode == UpsertConfig.Mode.PARTIAL;
+ }
+
/*
* This call comes in one of two ways:
* For HL Segments:
@@ -300,11 +314,12 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// The segment is uploaded to an upsert enabled realtime table. Download
the segment and load.
Preconditions.checkArgument(realtimeSegmentZKMetadata instanceof
LLCRealtimeSegmentZKMetadata,
"Upload segment is not LLC segment");
- String downURL =
((LLCRealtimeSegmentZKMetadata)realtimeSegmentZKMetadata).getDownloadUrl();
- Preconditions.checkNotNull(downURL, "Upload segment metadata has no
download url");
- downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, downURL);
- _logger.info("Downloaded, untarred and add segment {} of table {} from
{}", segmentName, tableConfig.getTableName(),
- downURL);
+ String downloadUrl = ((LLCRealtimeSegmentZKMetadata)
realtimeSegmentZKMetadata).getDownloadUrl();
+ Preconditions.checkNotNull(downloadUrl, "Upload segment metadata has no
download url");
+ downloadSegmentFromDeepStore(segmentName, indexLoadingConfig,
downloadUrl);
+ _logger
+ .info("Downloaded, untarred and add segment {} of table {} from {}",
segmentName, tableConfig.getTableName(),
+ downloadUrl);
return;
}
@@ -354,17 +369,20 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
private void handleUpsert(ImmutableSegmentImpl immutableSegment) {
+ String segmentName = immutableSegment.getSegmentName();
+ int partitionGroupId = SegmentUtils
+ .getRealtimeSegmentPartitionId(segmentName, _tableNameWithType,
_helixManager, _primaryKeyColumns.get(0));
+ PartitionUpsertMetadataManager partitionUpsertMetadataManager =
+
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId);
+ ThreadSafeMutableRoaringBitmap validDocIds = new
ThreadSafeMutableRoaringBitmap();
+ immutableSegment.enableUpsert(partitionUpsertMetadataManager, validDocIds);
+
Map<String, PinotSegmentColumnReader> columnToReaderMap = new HashMap<>();
for (String primaryKeyColumn : _primaryKeyColumns) {
columnToReaderMap.put(primaryKeyColumn, new
PinotSegmentColumnReader(immutableSegment, primaryKeyColumn));
}
columnToReaderMap.put(_timeColumnName, new
PinotSegmentColumnReader(immutableSegment, _timeColumnName));
int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs();
- String segmentName = immutableSegment.getSegmentName();
- int partitionGroupId = SegmentUtils
- .getRealtimeSegmentPartitionId(segmentName, this.getTableName(),
_helixManager, _primaryKeyColumns.get(0));
- PartitionUpsertMetadataManager partitionUpsertMetadataManager =
-
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId);
int numPrimaryKeyColumns = _primaryKeyColumns.size();
Iterator<PartitionUpsertMetadataManager.RecordInfo> recordInfoIterator =
new Iterator<PartitionUpsertMetadataManager.RecordInfo>() {
@@ -392,9 +410,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
return new PartitionUpsertMetadataManager.RecordInfo(primaryKey,
_docId++, timestamp);
}
};
- ThreadSafeMutableRoaringBitmap validDocIds =
- partitionUpsertMetadataManager.addSegment(segmentName,
recordInfoIterator);
- immutableSegment.enableUpsert(partitionUpsertMetadataManager, validDocIds);
+ partitionUpsertMetadataManager.addSegment(immutableSegment,
recordInfoIterator);
}
public void downloadAndReplaceSegment(String segmentName,
LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 2ad5c26..4418e1d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -49,9 +49,9 @@ import org.apache.pinot.core.util.QueryOptions;
import
org.apache.pinot.segment.local.segment.index.datasource.MutableDataSource;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
import org.apache.pinot.spi.exception.BadQueryRequestException;
@@ -71,23 +71,23 @@ public class FilterPlanNode implements PlanNode {
@Override
public BaseFilterOperator run() {
FilterContext filter = _queryContext.getFilter();
- ValidDocIndexReader validDocIndexReader = _indexSegment.getValidDocIndex();
+ ThreadSafeMutableRoaringBitmap validDocIds =
_indexSegment.getValidDocIds();
boolean upsertSkipped = false;
if (_queryContext.getQueryOptions() != null) {
upsertSkipped = new
QueryOptions(_queryContext.getQueryOptions()).isSkipUpsert();
}
if (filter != null) {
BaseFilterOperator filterOperator = constructPhysicalOperator(filter,
_queryContext.getDebugOptions());
- if (validDocIndexReader != null && !upsertSkipped) {
+ if (validDocIds != null && !upsertSkipped) {
BaseFilterOperator validDocFilter =
- new
BitmapBasedFilterOperator(validDocIndexReader.getValidDocBitmap(), false,
_numDocs);
+ new
BitmapBasedFilterOperator(validDocIds.getMutableRoaringBitmap(), false,
_numDocs);
return
FilterOperatorUtils.getAndFilterOperator(Arrays.asList(filterOperator,
validDocFilter), _numDocs,
_queryContext.getDebugOptions());
} else {
return filterOperator;
}
- } else if (validDocIndexReader != null && !upsertSkipped) {
- return new
BitmapBasedFilterOperator(validDocIndexReader.getValidDocBitmap(), false,
_numDocs);
+ } else if (validDocIds != null && !upsertSkipped) {
+ return new
BitmapBasedFilterOperator(validDocIds.getMutableRoaringBitmap(), false,
_numDocs);
} else {
return new MatchAllFilterOperator(_numDocs);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 3f94d80..3fb5a2a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -175,7 +175,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
// Use metadata/dictionary to solve the query if possible
// NOTE: Skip the segment with valid doc index because the valid doc
index is equivalent to a filter.
- if (queryContext.getFilter() == null &&
indexSegment.getValidDocIndex() == null) {
+ if (queryContext.getFilter() == null && indexSegment.getValidDocIds()
== null) {
if (isFitForMetadataBasedPlan(queryContext)) {
return new MetadataBasedAggregationPlanNode(indexSegment,
queryContext);
} else if (isFitForDictionaryBasedPlan(queryContext, indexSegment)) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
index 0d40c97..7eea40b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
@@ -74,7 +74,7 @@ public class SelectionQuerySegmentPruner implements
SegmentPruner {
}
// Skip pruning segments for upsert table because valid doc index is
equivalent to a filter
- if (segments.get(0).getValidDocIndex() != null) {
+ if (segments.get(0).getValidDocIds() != null) {
return segments;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 5f4aff4..6555bfa 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -47,7 +47,6 @@ import
org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -318,8 +317,8 @@ public class LLRealtimeSegmentDataManagerTest {
segmentDataManager._consumeOffsets.add(catchupOffset); // Offset after
catchup
final SegmentCompletionProtocol.Response holdResponse1 = new
SegmentCompletionProtocol.Response(
new SegmentCompletionProtocol.Response.Params()
-
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).
- withStreamPartitionMsgOffset(firstOffset.toString()));
+
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)
+ .withStreamPartitionMsgOffset(firstOffset.toString()));
final SegmentCompletionProtocol.Response catchupResponse = new
SegmentCompletionProtocol.Response(
new SegmentCompletionProtocol.Response.Params()
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP)
@@ -382,8 +381,8 @@ public class LLRealtimeSegmentDataManagerTest {
final LongMsgOffset endOffset = new LongMsgOffset(_startOffsetValue + 500);
segmentDataManager._consumeOffsets.add(endOffset);
SegmentCompletionProtocol.Response.Params params = new
SegmentCompletionProtocol.Response.Params();
- params.withStreamPartitionMsgOffset(endOffset.toString()).
- withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
+ params.withStreamPartitionMsgOffset(endOffset.toString())
+ .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
final SegmentCompletionProtocol.Response keepResponse = new
SegmentCompletionProtocol.Response(params);
segmentDataManager._responses.add(keepResponse);
@@ -816,8 +815,7 @@ public class LLRealtimeSegmentDataManagerTest {
throws Exception {
super(segmentZKMetadata, tableConfig, realtimeTableDataManager,
resourceDataDir,
new IndexLoadingConfig(makeInstanceDataManagerConfig(),
tableConfig), schema, llcSegmentName,
- semaphoreMap.get(llcSegmentName.getPartitionGroupId()),
serverMetrics,
- new PartitionUpsertMetadataManager("testTable_REALTIME", 0,
serverMetrics));
+ semaphoreMap.get(llcSegmentName.getPartitionGroupId()),
serverMetrics, null);
_state = LLRealtimeSegmentDataManager.class.getDeclaredField("_state");
_state.setAccessible(true);
_shouldStop =
LLRealtimeSegmentDataManager.class.getDeclaredField("_shouldStop");
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index b1e6e81..956ad31 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -36,12 +36,12 @@ import
org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
@@ -124,7 +124,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class);
_upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR,
SEGMENT_NAME), ReadMode.heap);
((ImmutableSegmentImpl) _upsertIndexSegment)
- .enableUpsert(new PartitionUpsertMetadataManager("testTable_REALTIME",
0, serverMetrics),
+ .enableUpsert(new PartitionUpsertMetadataManager("testTable_REALTIME",
0, serverMetrics, null),
new ThreadSafeMutableRoaringBitmap());
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
index d7a01b0..c7f7d57 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
@@ -27,7 +27,7 @@ import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
@@ -217,8 +217,8 @@ public class SelectionQuerySegmentPrunerTest {
when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs);
if (upsert) {
- ValidDocIndexReader validDocIndex = mock(ValidDocIndexReader.class);
- when(indexSegment.getValidDocIndex()).thenReturn(validDocIndex);
+ ThreadSafeMutableRoaringBitmap validDocIds =
mock(ThreadSafeMutableRoaringBitmap.class);
+ when(indexSegment.getValidDocIds()).thenReturn(validDocIds);
}
return indexSegment;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
index c5d6b6e..d9c56c5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
@@ -25,12 +25,12 @@ import javax.annotation.Nullable;
import org.apache.pinot.segment.local.segment.index.datasource.EmptyDataSource;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.slf4j.Logger;
@@ -89,7 +89,7 @@ public class EmptyIndexSegment implements ImmutableSegment {
@Nullable
@Override
- public ValidDocIndexReader getValidDocIndex() {
+ public ThreadSafeMutableRoaringBitmap getValidDocIds() {
return null;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index 653349e..4f110df 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -24,21 +24,19 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
-import
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
import
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
-import
org.apache.pinot.segment.local.segment.index.readers.ValidDocIndexReaderImpl;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexContainer;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -57,7 +55,6 @@ public class ImmutableSegmentImpl implements ImmutableSegment
{
// For upsert
private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private ThreadSafeMutableRoaringBitmap _validDocIds;
- private ValidDocIndexReader _validDocIndex;
private PinotSegmentRecordReader _pinotSegmentRecordReader;
public ImmutableSegmentImpl(SegmentDirectory segmentDirectory,
SegmentMetadataImpl segmentMetadata,
@@ -76,7 +73,6 @@ public class ImmutableSegmentImpl implements ImmutableSegment
{
ThreadSafeMutableRoaringBitmap validDocIds) {
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
_validDocIds = validDocIds;
- _validDocIndex = new ValidDocIndexReaderImpl(validDocIds);
}
@Override
@@ -140,6 +136,11 @@ public class ImmutableSegmentImpl implements
ImmutableSegment {
public void destroy() {
String segmentName = getSegmentName();
LOGGER.info("Trying to destroy segment : {}", segmentName);
+
+ // Remove the upsert metadata before closing the readers
+ if (_partitionUpsertMetadataManager != null) {
+ _partitionUpsertMetadataManager.removeSegment(this);
+ }
for (Map.Entry<String, ColumnIndexContainer> entry :
_indexContainerMap.entrySet()) {
try {
entry.getValue().close();
@@ -159,9 +160,6 @@ public class ImmutableSegmentImpl implements
ImmutableSegment {
LOGGER.error("Failed to close star-tree. Continuing with error.", e);
}
}
- if (_partitionUpsertMetadataManager != null) {
- _partitionUpsertMetadataManager.removeSegment(segmentName, _validDocIds);
- }
if (_pinotSegmentRecordReader != null) {
try {
_pinotSegmentRecordReader.close();
@@ -178,8 +176,8 @@ public class ImmutableSegmentImpl implements
ImmutableSegment {
@Nullable
@Override
- public ValidDocIndexReader getValidDocIndex() {
- return _validDocIndex;
+ public ThreadSafeMutableRoaringBitmap getValidDocIds() {
+ return _validDocIds;
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
index ee0bea4..875811e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
@@ -42,9 +42,9 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.reader.MutableDictionary;
import org.apache.pinot.segment.spi.index.reader.MutableForwardIndex;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
@@ -219,7 +219,7 @@ public class IntermediateSegment implements MutableSegment {
@Nullable
@Override
- public ValidDocIndexReader getValidDocIndex() {
+ public ThreadSafeMutableRoaringBitmap getValidDocIds() {
return null;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 6a09d63..96f71fe 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -40,7 +40,6 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import
org.apache.pinot.segment.local.io.readerwriter.PinotDataBufferMemoryManager;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
-import
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
import
org.apache.pinot.segment.local.realtime.impl.dictionary.BaseOffHeapMutableDictionary;
import
org.apache.pinot.segment.local.realtime.impl.dictionary.MutableDictionaryFactory;
import
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex;
@@ -54,7 +53,6 @@ import
org.apache.pinot.segment.local.realtime.impl.json.MutableJsonIndex;
import
org.apache.pinot.segment.local.realtime.impl.nullvalue.MutableNullValueVector;
import
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
import
org.apache.pinot.segment.local.segment.index.datasource.MutableDataSource;
-import
org.apache.pinot.segment.local.segment.index.readers.ValidDocIndexReaderImpl;
import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext;
import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProvider;
import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
@@ -67,13 +65,13 @@ import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
import org.apache.pinot.segment.spi.index.reader.MutableDictionary;
import org.apache.pinot.segment.spi.index.reader.MutableForwardIndex;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
@@ -162,7 +160,6 @@ public class MutableSegmentImpl implements MutableSegment {
// consumption with newer timestamp (late event in consuming
segment), the record location will be updated, but
// the valid doc ids won't be updated.
private final ThreadSafeMutableRoaringBitmap _validDocIds;
- private final ValidDocIndexReader _validDocIndex;
public MutableSegmentImpl(RealtimeSegmentConfig config, @Nullable
ServerMetrics serverMetrics) {
_serverMetrics = serverMetrics;
@@ -373,13 +370,12 @@ public class MutableSegmentImpl implements MutableSegment
{
// init upsert-related data structure
_upsertMode = config.getUpsertMode();
if (isUpsertEnabled()) {
+ Preconditions.checkState(!_aggregateMetrics, "Metrics aggregation and
upsert cannot be enabled together");
_partitionUpsertMetadataManager =
config.getPartitionUpsertMetadataManager();
_validDocIds = new ThreadSafeMutableRoaringBitmap();
- _validDocIndex = new ValidDocIndexReaderImpl(_validDocIds);
} else {
_partitionUpsertMetadataManager = null;
_validDocIds = null;
- _validDocIndex = null;
}
}
@@ -470,28 +466,32 @@ public class MutableSegmentImpl implements MutableSegment
{
@Override
public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
throws IOException {
- // Update dictionary first
- updateDictionary(row);
-
- // If metrics aggregation is enabled and if the dimension values were
already seen, this will return existing docId,
- // else this will return a new docId.
- int docId = getOrCreateDocId();
-
boolean canTakeMore;
- if (docId == _numDocsIndexed) {
- // New row
+ if (isUpsertEnabled()) {
+ row = handleUpsert(row, _numDocsIndexed);
+
+ updateDictionary(row);
addNewRow(row);
// Update number of documents indexed at last to make the latest row
queryable
canTakeMore = _numDocsIndexed++ < _capacity;
-
- if (isUpsertEnabled()) {
- handleUpsert(row, docId);
- }
} else {
- Preconditions.checkArgument(!isUpsertEnabled(), "metrics aggregation
cannot be used with upsert");
- assert _aggregateMetrics;
- aggregateMetrics(row, docId);
- canTakeMore = true;
+ // Update dictionary first
+ updateDictionary(row);
+
+ // If metrics aggregation is enabled and if the dimension values were
already seen, this will return existing
+ // docId, else this will return a new docId.
+ int docId = getOrCreateDocId();
+
+ if (docId == _numDocsIndexed) {
+ // New row
+ addNewRow(row);
+ // Update number of documents indexed at last to make the latest row
queryable
+ canTakeMore = _numDocsIndexed++ < _capacity;
+ } else {
+ assert _aggregateMetrics;
+ aggregateMetrics(row, docId);
+ canTakeMore = true;
+ }
}
// Update last indexed time and latest ingestion time
@@ -507,14 +507,13 @@ public class MutableSegmentImpl implements MutableSegment
{
return _upsertMode != UpsertConfig.Mode.NONE;
}
- private void handleUpsert(GenericRow row, int docId) {
+ private GenericRow handleUpsert(GenericRow row, int docId) {
PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
Object timeValue = row.getValue(_timeColumnName);
Preconditions.checkArgument(timeValue instanceof Comparable, "time column
shall be comparable");
long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue);
- _partitionUpsertMetadataManager
- .updateRecord(_segmentName, new
PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, timestamp),
- _validDocIds);
+ return _partitionUpsertMetadataManager
+ .updateRecord(this, new
PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, timestamp), row);
}
private void updateDictionary(GenericRow row) {
@@ -758,8 +757,8 @@ public class MutableSegmentImpl implements MutableSegment {
@Nullable
@Override
- public ValidDocIndexReader getValidDocIndex() {
- return _validDocIndex;
+ public ThreadSafeMutableRoaringBitmap getValidDocIds() {
+ return _validDocIds;
}
@Override
@@ -937,6 +936,14 @@ public class MutableSegmentImpl implements MutableSegment {
return segmentName + ":" + columnName + indexType;
}
+ /**
+ * Helper function that returns docId, depends on the following scenarios.
+ * <ul>
+ * <li> If metrics aggregation is enabled and if the dimension values were
already seen, return existing docIds </li>
+ * <li> Else, this function will create and return a new docId. </li>
+ * </ul>
+ *
+ * */
private int getOrCreateDocId() {
if (!_aggregateMetrics) {
return _numDocsIndexed;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/geospatial/MutableH3Index.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/geospatial/MutableH3Index.java
index b026093..7af31e7 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/geospatial/MutableH3Index.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/geospatial/MutableH3Index.java
@@ -22,8 +22,8 @@ import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.local.utils.H3Utils;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
import org.apache.pinot.segment.spi.index.reader.H3IndexResolution;
import org.locationtech.jts.geom.Coordinate;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
index 98056b3..d2d3cf1 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
@@ -21,7 +21,7 @@ package
org.apache.pinot.segment.local.realtime.impl.invertedindex;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/nullvalue/MutableNullValueVector.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/nullvalue/MutableNullValueVector.java
index b3ff3e2..5558857 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/nullvalue/MutableNullValueVector.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/nullvalue/MutableNullValueVector.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.segment.local.realtime.impl.nullvalue;
-import
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ValidDocIndexReaderImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ValidDocIndexReaderImpl.java
index f09ab06..8a2089f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ValidDocIndexReaderImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ValidDocIndexReaderImpl.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.segment.local.segment.index.readers;
-import
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
new file mode 100644
index 0000000..4822339
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handler for partial-upsert.
+ */
+public class PartialUpsertHandler {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PartialUpsertHandler.class);
+
+ // _column2Mergers maintains the mapping of merge strategies per columns.
+ private final Map<String, PartialUpsertMerger> _column2Mergers = new
HashMap<>();
+
+ private final HelixManager _helixManager;
+ private final String _tableNameWithType;
+ private boolean _allSegmentsLoaded;
+
+ public PartialUpsertHandler(HelixManager helixManager, String
tableNameWithType,
+ Map<String, UpsertConfig.Strategy> partialUpsertStrategies) {
+ _helixManager = helixManager;
+ _tableNameWithType = tableNameWithType;
+ for (Map.Entry<String, UpsertConfig.Strategy> entry :
partialUpsertStrategies.entrySet()) {
+ _column2Mergers.put(entry.getKey(),
PartialUpsertMergerFactory.getMerger(entry.getValue()));
+ }
+ }
+
+ /**
+ * Returns {@code true} if all segments assigned to the current instance are
loaded, {@code false} otherwise.
+ * Consuming segment should perform this check to ensure all previous
records are loaded before inserting new records.
+ */
+ public synchronized boolean isAllSegmentsLoaded() {
+ if (_allSegmentsLoaded) {
+ return true;
+ }
+
+ HelixDataAccessor dataAccessor = _helixManager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+ IdealState idealState =
dataAccessor.getProperty(keyBuilder.idealStates(_tableNameWithType));
+ if (idealState == null) {
+ LOGGER.warn("Failed to find ideal state for table: {}",
_tableNameWithType);
+ return false;
+ }
+ String instanceName = _helixManager.getInstanceName();
+ LiveInstance liveInstance =
dataAccessor.getProperty(keyBuilder.liveInstance(instanceName));
+ if (liveInstance == null) {
+ LOGGER.warn("Failed to find live instance for instance: {}",
instanceName);
+ return false;
+ }
+ String sessionId = liveInstance.getEphemeralOwner();
+ CurrentState currentState =
+ dataAccessor.getProperty(keyBuilder.currentState(instanceName,
sessionId, _tableNameWithType));
+ if (currentState == null) {
+ LOGGER.warn("Failed to find current state for instance: {}, sessionId:
{}, table: {}", instanceName, sessionId,
+ _tableNameWithType);
+ return false;
+ }
+
+ // Check if ideal state and current state matches for all segments
assigned to the current instance
+ Map<String, Map<String, String>> idealStatesMap =
idealState.getRecord().getMapFields();
+ Map<String, String> currentStateMap = currentState.getPartitionStateMap();
+ for (Map.Entry<String, Map<String, String>> entry :
idealStatesMap.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> instanceStateMap = entry.getValue();
+ String expectedState = instanceStateMap.get(instanceName);
+ // Only track ONLINE segments assigned to the current instance
+ if (!SegmentStateModel.ONLINE.equals(expectedState)) {
+ continue;
+ }
+ String actualState = currentStateMap.get(segmentName);
+ if (!SegmentStateModel.ONLINE.equals(actualState)) {
+ if (SegmentStateModel.ERROR.equals(actualState)) {
+ LOGGER
+ .error("Find ERROR segment: {}, table: {}, expected: {}",
segmentName, _tableNameWithType, expectedState);
+ } else {
+ LOGGER.info("Find unloaded segment: {}, table: {}, expected: {},
actual: {}", segmentName, _tableNameWithType,
+ expectedState, actualState);
+ }
+ return false;
+ }
+ }
+
+ LOGGER.info("All segments loaded for table: {}", _tableNameWithType);
+ _allSegmentsLoaded = true;
+ return true;
+ }
+
+ /**
+ * Merges 2 records and returns the merged record.
+ *
+ * @param previousRecord the last derived full record during ingestion.
+ * @param newRecord the new consumed record.
+ * @return a new row after merge
+ */
+ public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) {
+ for (Map.Entry<String, PartialUpsertMerger> entry :
_column2Mergers.entrySet()) {
+ String column = entry.getKey();
+ if (!previousRecord.isNullValue(column)) {
+ if (newRecord.isNullValue(column)) {
+ newRecord.putValue(column, previousRecord.getValue(column));
+ newRecord.removeNullValueField(column);
+ } else {
+ newRecord
+ .putValue(column,
entry.getValue().merge(previousRecord.getValue(column),
newRecord.getValue(column)));
+ }
+ }
+ }
+ return newRecord;
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 8e02f85..335d658 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -21,11 +21,14 @@ package org.apache.pinot.segment.local.upsert;
import com.google.common.annotations.VisibleForTesting;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
-import
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,92 +63,117 @@ public class PartitionUpsertMetadataManager {
private final String _tableNameWithType;
private final int _partitionId;
private final ServerMetrics _serverMetrics;
+ private final PartialUpsertHandler _partialUpsertHandler;
- public PartitionUpsertMetadataManager(String tableNameWithType, int
partitionId, ServerMetrics serverMetrics) {
+ // TODO(upsert): consider an off-heap KV store to persist this mapping to
improve the recovery speed.
+ @VisibleForTesting
+ final ConcurrentHashMap<PrimaryKey, RecordLocation>
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+ // Reused for reading previous record during partial upsert
+ private final GenericRow _reuse = new GenericRow();
+ // Stores the result of updateRecord()
+ private GenericRow _result;
+
+ public PartitionUpsertMetadataManager(String tableNameWithType, int
partitionId, ServerMetrics serverMetrics,
+ @Nullable PartialUpsertHandler partialUpsertHandler) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
_serverMetrics = serverMetrics;
+ _partialUpsertHandler = partialUpsertHandler;
}
- public ConcurrentHashMap<PrimaryKey, RecordLocation>
getPrimaryKeyToRecordLocationMap() {
- return _primaryKeyToRecordLocationMap;
- }
-
- // TODO(upset): consider an off-heap KV store to persist this index to
improve the recovery speed.
- @VisibleForTesting
- final ConcurrentHashMap<PrimaryKey, RecordLocation>
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
-
/**
- * Initializes the upsert metadata for the given immutable segment, returns
the valid doc ids for the segment.
+ * Initializes the upsert metadata for the given immutable segment.
*/
- public ThreadSafeMutableRoaringBitmap addSegment(String segmentName,
Iterator<RecordInfo> recordInfoIterator) {
+ public void addSegment(IndexSegment segment, Iterator<RecordInfo>
recordInfoIterator) {
+ String segmentName = segment.getSegmentName();
LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+ ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
+ assert validDocIds != null;
- ThreadSafeMutableRoaringBitmap validDocIds = new
ThreadSafeMutableRoaringBitmap();
while (recordInfoIterator.hasNext()) {
RecordInfo recordInfo = recordInfoIterator.next();
_primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey,
(primaryKey, currentRecordLocation) -> {
if (currentRecordLocation != null) {
// Existing primary key
- if (segmentName.equals(currentRecordLocation.getSegmentName())) {
- // The current record location has the same segment name
-
- // Update the record location when the new timestamp is greater
than or equal to the current timestamp.
- // There are 2 scenarios:
- // 1. The current record location is pointing to the same
segment (the segment being added). In this case,
- // we want to update the record location when there is a tie
to keep the newer record. Note that the
- // record info iterator will return records with incremental
doc ids.
- // 2. The current record location is pointing to the old segment
being replaced. This could happen when
- // committing a consuming segment, or reloading a completed
segment. In this case, we want to update
- // the record location when there is a tie because the record
locations should point to the new added
- // segment instead of the old segment being replaced. Also,
do not update the valid doc ids for the old
- // segment because it has not been replaced yet.
+ // The current record is in the same segment
+ // Update the record location when there is a tie to keep the newer
record. Note that the record info iterator
+ // will return records with incremental doc ids.
+ IndexSegment currentSegment = currentRecordLocation.getSegment();
+ if (segment == currentSegment) {
if (recordInfo._timestamp >= currentRecordLocation.getTimestamp())
{
- // Only update the valid doc ids for the new segment
- if (validDocIds == currentRecordLocation.getValidDocIds()) {
- validDocIds.remove(currentRecordLocation.getDocId());
- }
+ validDocIds.remove(currentRecordLocation.getDocId());
validDocIds.add(recordInfo._docId);
- return new RecordLocation(segmentName, recordInfo._docId,
recordInfo._timestamp, validDocIds);
+ return new RecordLocation(segment, recordInfo._docId,
recordInfo._timestamp);
} else {
return currentRecordLocation;
}
- } else {
- // The current record location is pointing to a different segment
-
- // Update the record location when getting a newer timestamp, or
the timestamp is the same as the current
- // timestamp, but the segment has a larger sequence number (the
segment is newer than the current segment).
- if (recordInfo._timestamp > currentRecordLocation.getTimestamp()
|| (
- recordInfo._timestamp == currentRecordLocation.getTimestamp()
- &&
LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)
- &&
LLCSegmentName.isLowLevelConsumerSegmentName(currentRecordLocation.getSegmentName())
- && LLCSegmentName.getSequenceNumber(segmentName) >
LLCSegmentName
-
.getSequenceNumber(currentRecordLocation.getSegmentName()))) {
-
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+ }
+
+ // The current record is in an old segment being replaced
+ // This could happen when committing a consuming segment, or
reloading a completed segment. In this case, we
+ // want to update the record location when there is a tie because
the record locations should point to the new
+ // added segment instead of the old segment being replaced. Also, do
not update the valid doc ids for the old
+ // segment because it has not been replaced yet.
+ String currentSegmentName = currentSegment.getSegmentName();
+ if (segmentName.equals(currentSegmentName)) {
+ if (recordInfo._timestamp >= currentRecordLocation.getTimestamp())
{
validDocIds.add(recordInfo._docId);
- return new RecordLocation(segmentName, recordInfo._docId,
recordInfo._timestamp, validDocIds);
+ return new RecordLocation(segment, recordInfo._docId,
recordInfo._timestamp);
} else {
return currentRecordLocation;
}
}
+
+ // The current record is in a different segment
+ // Update the record location when getting a newer timestamp, or the
timestamp is the same as the current
+ // timestamp, but the segment has a larger sequence number (the
segment is newer than the current segment).
+ if (recordInfo._timestamp > currentRecordLocation.getTimestamp() || (
+ recordInfo._timestamp == currentRecordLocation.getTimestamp() &&
LLCSegmentName
+ .isLowLevelConsumerSegmentName(segmentName) && LLCSegmentName
+ .isLowLevelConsumerSegmentName(currentSegmentName)
+ && LLCSegmentName.getSequenceNumber(segmentName) >
LLCSegmentName
+ .getSequenceNumber(currentSegmentName))) {
+ assert currentSegment.getValidDocIds() != null;
+
currentSegment.getValidDocIds().remove(currentRecordLocation.getDocId());
+ validDocIds.add(recordInfo._docId);
+ return new RecordLocation(segment, recordInfo._docId,
recordInfo._timestamp);
+ } else {
+ return currentRecordLocation;
+ }
} else {
// New primary key
validDocIds.add(recordInfo._docId);
- return new RecordLocation(segmentName, recordInfo._docId,
recordInfo._timestamp, validDocIds);
+ return new RecordLocation(segment, recordInfo._docId,
recordInfo._timestamp);
}
});
}
// Update metrics
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
_primaryKeyToRecordLocationMap.size());
- return validDocIds;
}
/**
- * Updates the upsert metadata for a new consumed record in the given
consuming segment.
+ * Updates the upsert metadata for a new consumed record in the given
consuming segment. Returns the merged record if
+ * partial-upsert is enabled.
*/
- public void updateRecord(String segmentName, RecordInfo recordInfo,
ThreadSafeMutableRoaringBitmap validDocIds) {
+ public GenericRow updateRecord(IndexSegment segment, RecordInfo recordInfo,
GenericRow record) {
+ // For partial-upsert, need to ensure all previous records are loaded
before inserting new records.
+ if (_partialUpsertHandler != null) {
+ while (!_partialUpsertHandler.isAllSegmentsLoaded()) {
+ LOGGER
+ .info("Sleeping 1 second waiting for all segments loaded for
partial-upsert table: {}", _tableNameWithType);
+ try {
+ //noinspection BusyWait
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ _result = record;
_primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey,
(primaryKey, currentRecordLocation) -> {
if (currentRecordLocation != null) {
// Existing primary key
@@ -153,35 +181,52 @@ public class PartitionUpsertMetadataManager {
// Update the record location when the new timestamp is greater than
or equal to the current timestamp. Update
// the record location when there is a tie to keep the newer record.
if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
-
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
- validDocIds.add(recordInfo._docId);
- return new RecordLocation(segmentName, recordInfo._docId,
recordInfo._timestamp, validDocIds);
+ IndexSegment currentSegment = currentRecordLocation.getSegment();
+ if (_partialUpsertHandler != null) {
+ // Partial upsert
+ GenericRow previousRecord =
currentSegment.getRecord(currentRecordLocation.getDocId(), _reuse);
+ _result = _partialUpsertHandler.merge(previousRecord, record);
+ }
+ assert currentSegment.getValidDocIds() != null;
+
currentSegment.getValidDocIds().remove(currentRecordLocation.getDocId());
+ assert segment.getValidDocIds() != null;
+ segment.getValidDocIds().add(recordInfo._docId);
+ return new RecordLocation(segment, recordInfo._docId,
recordInfo._timestamp);
} else {
+ if (_partialUpsertHandler != null) {
+ LOGGER.warn(
+ "Got late event for partial upsert: {} (current timestamp: {},
record timestamp: {}), skipping updating the record",
+ record, currentRecordLocation.getTimestamp(),
recordInfo._timestamp);
+ }
return currentRecordLocation;
}
} else {
// New primary key
- validDocIds.add(recordInfo._docId);
- return new RecordLocation(segmentName, recordInfo._docId,
recordInfo._timestamp, validDocIds);
+ assert segment.getValidDocIds() != null;
+ segment.getValidDocIds().add(recordInfo._docId);
+ return new RecordLocation(segment, recordInfo._docId,
recordInfo._timestamp);
}
});
// Update metrics
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
_primaryKeyToRecordLocationMap.size());
+ return _result;
}
/**
* Removes the upsert metadata for the given immutable segment. No need to
remove the upsert metadata for the
* consuming segment because it should be replaced by the committed segment.
*/
- public void removeSegment(String segmentName, ThreadSafeMutableRoaringBitmap
validDocIds) {
+ public void removeSegment(IndexSegment segment) {
+ String segmentName = segment.getSegmentName();
LOGGER.info("Removing upsert metadata for segment: {}", segmentName);
- if (!validDocIds.getMutableRoaringBitmap().isEmpty()) {
- // Remove all the record locations that point to the valid doc ids of
the removed segment.
+ assert segment.getValidDocIds() != null;
+ if (!segment.getValidDocIds().getMutableRoaringBitmap().isEmpty()) {
+ // Remove all the record locations that point to the removed segment
_primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
- if (recordLocation.getValidDocIds() == validDocIds) {
- // Check and remove to prevent removing the key that is just updated.
+ if (recordLocation.getSegment() == segment) {
+ // Check and remove to prevent removing the key that is just updated
_primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
}
});
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
index b02cfd3..038fb23 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
@@ -18,27 +18,25 @@
*/
package org.apache.pinot.segment.local.upsert;
-import
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.IndexSegment;
/**
* Indicate a record's location on the local host.
*/
public class RecordLocation {
- private final String _segmentName;
+ private final IndexSegment _segment;
private final int _docId;
private final long _timestamp;
- private final ThreadSafeMutableRoaringBitmap _validDocIds;
- public RecordLocation(String segmentName, int docId, long timestamp,
ThreadSafeMutableRoaringBitmap validDocIds) {
- _segmentName = segmentName;
+ public RecordLocation(IndexSegment indexSegment, int docId, long timestamp) {
+ _segment = indexSegment;
_docId = docId;
_timestamp = timestamp;
- _validDocIds = validDocIds;
}
- public String getSegmentName() {
- return _segmentName;
+ public IndexSegment getSegment() {
+ return _segment;
}
public int getDocId() {
@@ -48,8 +46,4 @@ public class RecordLocation {
public long getTimestamp() {
return _timestamp;
}
-
- public ThreadSafeMutableRoaringBitmap getValidDocIds() {
- return _validDocIds;
- }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index a43a3b2..ae8fb3f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.upsert;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -32,14 +33,17 @@ public class TableUpsertMetadataManager {
private final Map<Integer, PartitionUpsertMetadataManager>
_partitionMetadataManagerMap = new ConcurrentHashMap<>();
private final String _tableNameWithType;
private final ServerMetrics _serverMetrics;
+ private final PartialUpsertHandler _partialUpsertHandler;
- public TableUpsertMetadataManager(String tableNameWithType, ServerMetrics
serverMetrics) {
+ public TableUpsertMetadataManager(String tableNameWithType, ServerMetrics
serverMetrics,
+ @Nullable PartialUpsertHandler partialUpsertHandler) {
_tableNameWithType = tableNameWithType;
_serverMetrics = serverMetrics;
+ _partialUpsertHandler = partialUpsertHandler;
}
public PartitionUpsertMetadataManager getOrCreatePartitionManager(int
partitionId) {
- return _partitionMetadataManagerMap
- .computeIfAbsent(partitionId, k -> new
PartitionUpsertMetadataManager(_tableNameWithType, k, _serverMetrics));
+ return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
+ k -> new PartitionUpsertMetadataManager(_tableNameWithType, k,
_serverMetrics, _partialUpsertHandler));
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ValidDocIndexReaderImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
similarity index 54%
copy from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ValidDocIndexReaderImpl.java
copy to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
index f09ab06..6d38ae2 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ValidDocIndexReaderImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
@@ -16,22 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.segment.local.segment.index.readers;
+package org.apache.pinot.segment.local.upsert.merger;
-import
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
-import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
-
-
-public class ValidDocIndexReaderImpl implements ValidDocIndexReader {
- private final ThreadSafeMutableRoaringBitmap _validDocBitmap;
-
- public ValidDocIndexReaderImpl(ThreadSafeMutableRoaringBitmap
validDocBitmap) {
- _validDocBitmap = validDocBitmap;
+public class IncrementMerger implements PartialUpsertMerger {
+ IncrementMerger() {
}
+ /**
+ * Increment the new value from incoming row to the given field of previous
record.
+ */
@Override
- public ImmutableRoaringBitmap getValidDocBitmap() {
- return _validDocBitmap.getMutableRoaringBitmap();
+ public Object merge(Object previousValue, Object currentValue) {
+ return addNumbers((Number) previousValue, (Number) currentValue);
+ }
+
+ private static Number addNumbers(Number a, Number b) {
+ if (a instanceof Integer) {
+ return (Integer) a + (Integer) b;
+ } else if (a instanceof Long) {
+ return (Long) a + (Long) b;
+ } else if (a instanceof Float) {
+ return (Float) a + (Float) b;
+ } else {
+ return (Double) a + (Double) b;
+ }
}
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java
similarity index 59%
copy from
pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
copy to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java
index 9664a8e..a317d45 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java
@@ -16,27 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.config.table;
+package org.apache.pinot.segment.local.upsert.merger;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
-
-
-public class UpsertConfigTest {
-
- @Test
- public void testUpsertConfig() {
- UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
- assertEquals(upsertConfig.getMode(), UpsertConfig.Mode.FULL);
+public class OverwriteMerger implements PartialUpsertMerger {
+ OverwriteMerger() {
+ }
- // Test illegal arguments
- try {
- new UpsertConfig(UpsertConfig.Mode.PARTIAL);
- fail();
- } catch (IllegalArgumentException e) {
- // Expected
- }
+ @Override
+ public Object merge(Object previousValue, Object currentValue) {
+ return currentValue;
}
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java
similarity index 59%
copy from
pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
copy to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java
index 9664a8e..817d953 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java
@@ -16,27 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.config.table;
+package org.apache.pinot.segment.local.upsert.merger;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
-
-
-public class UpsertConfigTest {
-
- @Test
- public void testUpsertConfig() {
- UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
- assertEquals(upsertConfig.getMode(), UpsertConfig.Mode.FULL);
-
- // Test illegal arguments
- try {
- new UpsertConfig(UpsertConfig.Mode.PARTIAL);
- fail();
- } catch (IllegalArgumentException e) {
- // Expected
- }
- }
+public interface PartialUpsertMerger {
+ /**
+ * Handle partial upsert merge.
+ *
+ * @param previousValue the value of given field from the last derived full
record during ingestion.
+ * @param currentValue the value of given field from the new consumed record.
+ * @return a new value after merge
+ */
+ Object merge(Object previousValue, Object currentValue);
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
similarity index 54%
copy from
pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
copy to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
index 9664a8e..36112c4 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
@@ -16,27 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.config.table;
+package org.apache.pinot.segment.local.upsert.merger;
-import org.testng.annotations.Test;
+import org.apache.pinot.spi.config.table.UpsertConfig;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
+public class PartialUpsertMergerFactory {
+ private PartialUpsertMergerFactory() {
+ }
-public class UpsertConfigTest {
-
- @Test
- public void testUpsertConfig() {
- UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
- assertEquals(upsertConfig.getMode(), UpsertConfig.Mode.FULL);
+ private static final OverwriteMerger OVERWRITE_MERGER = new
OverwriteMerger();
+ private static final IncrementMerger INCREMENT_MERGER = new
IncrementMerger();
- // Test illegal arguments
- try {
- new UpsertConfig(UpsertConfig.Mode.PARTIAL);
- fail();
- } catch (IllegalArgumentException e) {
- // Expected
+ public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) {
+ switch (strategy) {
+ case OVERWRITE:
+ return OVERWRITE_MERGER;
+ case INCREMENT:
+ return INCREMENT_MERGER;
+ default:
+ throw new IllegalStateException("Unsupported partial upsert strategy:
" + strategy);
}
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 65cf5f9..ce88c15 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -103,6 +103,7 @@ public final class TableConfigUtils {
validateIndexingConfig(tableConfig.getIndexingConfig(), schema);
validateFieldConfigList(tableConfig.getFieldConfigList(),
tableConfig.getIndexingConfig(), schema);
validateUpsertConfig(tableConfig, schema);
+ validatePartialUpsertStrategies(tableConfig, schema);
validateTaskConfigs(tableConfig);
}
@@ -324,7 +325,7 @@ public final class TableConfigUtils {
* - consumer type must be low-level
*/
@VisibleForTesting
- public static void validateUpsertConfig(TableConfig tableConfig, Schema
schema) {
+ static void validateUpsertConfig(TableConfig tableConfig, Schema schema) {
if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE) {
return;
}
@@ -351,6 +352,43 @@ public final class TableConfigUtils {
}
/**
+ * Validates the partial upsert-related configurations:
+ * - Null handling must be enabled
+ * - Merger cannot be applied to private key columns
+ * - Merger cannot be applied to non-existing columns
+ * - INCREMENT merger must be applied to numeric columns
+ */
+ @VisibleForTesting
+ static void validatePartialUpsertStrategies(TableConfig tableConfig, Schema
schema) {
+ if (tableConfig.getUpsertMode() != UpsertConfig.Mode.PARTIAL) {
+ return;
+ }
+
+
Preconditions.checkState(tableConfig.getIndexingConfig().isNullHandlingEnabled(),
+ "Null handling must be enabled for partial upsert tables");
+
+ UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+ assert upsertConfig != null;
+ Map<String, UpsertConfig.Strategy> partialUpsertStrategies =
upsertConfig.getPartialUpsertStrategies();
+
+ List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
+ for (Map.Entry<String, UpsertConfig.Strategy> entry :
partialUpsertStrategies.entrySet()) {
+ String column = entry.getKey();
+ Preconditions.checkState(!primaryKeyColumns.contains(column), "Merger
cannot be applied to primary key columns");
+
+ FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+ Preconditions.checkState(fieldSpec != null, "Merger cannot be applied to
non-existing column: %s", column);
+
+ if (entry.getValue() == UpsertConfig.Strategy.INCREMENT) {
+
Preconditions.checkState(fieldSpec.getDataType().getStoredType().isNumeric(),
+ "INCREMENT merger cannot be applied to non-numeric column: %s",
column);
+ Preconditions.checkState(!schema.getDateTimeNames().contains(column),
+ "INCREMENT merger cannot be applied to date time column: %s",
column);
+ }
+ }
+ }
+
+ /**
* Validates the tier configs
* Checks for the right segmentSelectorType and its required properties
* Checks for the right storageType and its required properties
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index 2e5e6b3..1372cfa 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -61,7 +61,7 @@ public class MutableSegmentImplUpsertTest {
_recordTransformer =
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
_partitionUpsertMetadataManager =
- new TableUpsertMetadataManager("testTable_REALTIME",
Mockito.mock(ServerMetrics.class))
+ new TableUpsertMetadataManager("testTable_REALTIME",
Mockito.mock(ServerMetrics.class), null)
.getOrCreatePartitionManager(0);
_mutableSegmentImpl = MutableSegmentImplTestUtils
.createMutableSegmentImpl(_schema, Collections.emptySet(),
Collections.emptySet(), Collections.emptySet(),
@@ -81,7 +81,7 @@ public class MutableSegmentImplUpsertTest {
@Test
public void testUpsertIngestion() {
- ImmutableRoaringBitmap bitmap =
_mutableSegmentImpl.getValidDocIndex().getValidDocBitmap();
+ ImmutableRoaringBitmap bitmap =
_mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap();
Assert.assertFalse(bitmap.contains(0));
Assert.assertTrue(bitmap.contains(1));
Assert.assertTrue(bitmap.contains(2));
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
new file mode 100644
index 0000000..5335d51
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class PartialUpsertHandlerTest {
+
+ @Test
+ public void testMerge() {
+ HelixManager helixManager = Mockito.mock(HelixManager.class);
+ String realtimeTableName = "testTable_REALTIME";
+ Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new
HashMap<>();
+ partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT);
+ PartialUpsertHandler handler = new PartialUpsertHandler(helixManager,
realtimeTableName, partialUpsertStrategies);
+
+ // both records are null.
+ GenericRow previousRecord = new GenericRow();
+ GenericRow incomingRecord = new GenericRow();
+
+ previousRecord.putDefaultNullValue("field1", 1);
+ incomingRecord.putDefaultNullValue("field1", 2);
+ GenericRow newRecord = handler.merge(previousRecord, incomingRecord);
+ assertTrue(newRecord.isNullValue("field1"));
+ assertEquals(newRecord.getValue("field1"), 2);
+
+ // previousRecord is null default value, while newRecord is not.
+ previousRecord.clear();
+ incomingRecord.clear();
+ previousRecord.putDefaultNullValue("field1", 1);
+ incomingRecord.putValue("field1", 2);
+ newRecord = handler.merge(previousRecord, incomingRecord);
+ assertFalse(newRecord.isNullValue("field1"));
+ assertEquals(newRecord.getValue("field1"), 2);
+
+ // newRecord is default null value, while previousRecord is not.
+ previousRecord.clear();
+ incomingRecord.clear();
+ previousRecord.putValue("field1", 1);
+ incomingRecord.putDefaultNullValue("field1", 2);
+ newRecord = handler.merge(previousRecord, incomingRecord);
+ assertFalse(newRecord.isNullValue("field1"));
+ assertEquals(newRecord.getValue("field1"), 1);
+
+ // neither of records is null.
+ previousRecord.clear();
+ incomingRecord.clear();
+ previousRecord.putValue("field1", 1);
+ incomingRecord.putValue("field1", 2);
+ newRecord = handler.merge(previousRecord, incomingRecord);
+ assertFalse(newRecord.isNullValue("field1"));
+ assertEquals(newRecord.getValue("field1"), 3);
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
similarity index 76%
rename from
pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
rename to
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
index 65a45ed..eefa175 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
@@ -16,21 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.upsert;
+package org.apache.pinot.segment.local.upsert;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
-import
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
-import org.apache.pinot.segment.local.upsert.RecordLocation;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.mockito.Mockito;
import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -44,11 +46,10 @@ public class PartitionUpsertMetadataManagerTest {
@Test
public void testAddSegment() {
PartitionUpsertMetadataManager upsertMetadataManager =
- new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
Mockito.mock(ServerMetrics.class));
- Map<PrimaryKey, RecordLocation> recordLocationMap =
upsertMetadataManager.getPrimaryKeyToRecordLocationMap();
+ new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
mock(ServerMetrics.class), null);
+ Map<PrimaryKey, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
- String segment1 = getSegmentName(1);
List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new
ArrayList<>();
recordInfoList1.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList1.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 100));
@@ -56,8 +57,9 @@ public class PartitionUpsertMetadataManagerTest {
recordInfoList1.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 80));
recordInfoList1.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 4, 120));
recordInfoList1.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 5, 100));
- ThreadSafeMutableRoaringBitmap validDocIds1 =
- upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
+ upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
// segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 5, 100);
checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
@@ -65,15 +67,15 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4, 5});
// Add the second segment
- String segment2 = getSegmentName(2);
ArrayList<PartitionUpsertMetadataManager.RecordInfo> recordInfoList2 = new
ArrayList<>();
recordInfoList2.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList2.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 100));
recordInfoList2.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, 120));
recordInfoList2.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 3, 80));
recordInfoList2.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 4, 80));
- ThreadSafeMutableRoaringBitmap validDocIds2 =
- upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment2 = mockSegment(2, validDocIds2);
+ upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
// segment1: 1 -> {4, 120}
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
@@ -84,31 +86,40 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
// Replace (reload) the first segment
- ThreadSafeMutableRoaringBitmap newValidDocIds1 =
- upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
+ ThreadSafeMutableRoaringBitmap newValidDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl newSegment1 = mockSegment(1, newValidDocIds1);
+ upsertMetadataManager.addSegment(newSegment1, recordInfoList1.iterator());
// original segment1: 1 -> {4, 120}
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
// new segment1: 1 -> {4, 120}
checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
- checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120);
checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
- assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(),
newValidDocIds1);
+ assertSame(recordLocationMap.get(getPrimaryKey(1)).getSegment(),
newSegment1);
// Remove the original segment1
- upsertMetadataManager.removeSegment(segment1, validDocIds1);
+ upsertMetadataManager.removeSegment(segment1);
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
// new segment1: 1 -> {4, 120}
checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
- checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120);
checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
- assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(),
newValidDocIds1);
+ assertSame(recordLocationMap.get(getPrimaryKey(1)).getSegment(),
newSegment1);
+ }
+
+ private static ImmutableSegmentImpl mockSegment(int sequenceNumber,
ThreadSafeMutableRoaringBitmap validDocIds) {
+ ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
+ String segmentName = getSegmentName(sequenceNumber);
+ when(segment.getSegmentName()).thenReturn(segmentName);
+ when(segment.getValidDocIds()).thenReturn(validDocIds);
+ return segment;
}
private static String getSegmentName(int sequenceNumber) {
@@ -120,10 +131,10 @@ public class PartitionUpsertMetadataManagerTest {
}
private static void checkRecordLocation(Map<PrimaryKey, RecordLocation>
recordLocationMap, int keyValue,
- String segmentName, int docId, long timestamp) {
+ IndexSegment segment, int docId, long timestamp) {
RecordLocation recordLocation =
recordLocationMap.get(getPrimaryKey(keyValue));
assertNotNull(recordLocation);
- assertEquals(recordLocation.getSegmentName(), segmentName);
+ assertSame(recordLocation.getSegment(), segment);
assertEquals(recordLocation.getDocId(), docId);
assertEquals(recordLocation.getTimestamp(), timestamp);
}
@@ -131,25 +142,26 @@ public class PartitionUpsertMetadataManagerTest {
@Test
public void testUpdateRecord() {
PartitionUpsertMetadataManager upsertMetadataManager =
- new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
Mockito.mock(ServerMetrics.class));
- Map<PrimaryKey, RecordLocation> recordLocationMap =
upsertMetadataManager.getPrimaryKeyToRecordLocationMap();
+ new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
mock(ServerMetrics.class), null);
+ Map<PrimaryKey, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
- String segment1 = getSegmentName(1);
List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new
ArrayList<>();
recordInfoList1.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList1.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 120));
recordInfoList1.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, 100));
- ThreadSafeMutableRoaringBitmap validDocIds1 =
- upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
+ upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
// Update records from the second segment
- String segment2 = getSegmentName(2);
ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ IndexSegment segment2 = mockSegment(1, validDocIds2);
+ GenericRow row = mock(GenericRow.class);
upsertMetadataManager
- .updateRecord(segment2, new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 100),
validDocIds2);
+ .updateRecord(segment2, new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 100), row);
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
// segment2: 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
@@ -160,7 +172,7 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0});
upsertMetadataManager
- .updateRecord(segment2, new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 120),
validDocIds2);
+ .updateRecord(segment2, new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 120), row);
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
@@ -171,7 +183,7 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
upsertMetadataManager
- .updateRecord(segment2, new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 100),
validDocIds2);
+ .updateRecord(segment2, new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 100), row);
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
@@ -182,7 +194,7 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
upsertMetadataManager
- .updateRecord(segment2, new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 100),
validDocIds2);
+ .updateRecord(segment2, new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 100), row);
// segment1: 1 -> {1, 120}
// segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment2, 3, 100);
@@ -196,27 +208,27 @@ public class PartitionUpsertMetadataManagerTest {
@Test
public void testRemoveSegment() {
PartitionUpsertMetadataManager upsertMetadataManager =
- new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
Mockito.mock(ServerMetrics.class));
- Map<PrimaryKey, RecordLocation> recordLocationMap =
upsertMetadataManager.getPrimaryKeyToRecordLocationMap();
+ new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
mock(ServerMetrics.class), null);
+ Map<PrimaryKey, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add 2 segments
// segment1: 0 -> {0, 100}, 1 -> {1, 100}
// segment2: 2 -> {0, 100}, 3 -> {0, 100}
- String segment1 = getSegmentName(1);
List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new
ArrayList<>();
recordInfoList1.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList1.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 100));
- ThreadSafeMutableRoaringBitmap validDocIds1 =
- upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
- String segment2 = getSegmentName(2);
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
+ upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList2 = new
ArrayList<>();
recordInfoList2.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 0, 100));
recordInfoList2.add(new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 1, 100));
- ThreadSafeMutableRoaringBitmap validDocIds2 =
- upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment2 = mockSegment(2, validDocIds2);
+ upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
// Remove the first segment
- upsertMetadataManager.removeSegment(segment1, validDocIds1);
+ upsertMetadataManager.removeSegment(segment1);
// segment2: 2 -> {0, 100}, 3 -> {0, 100}
assertNull(recordLocationMap.get(getPrimaryKey(0)));
assertNull(recordLocationMap.get(getPrimaryKey(1)));
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
similarity index 90%
rename from
pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
rename to
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index b3620a9..6d76c88 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.util;
+package org.apache.pinot.segment.local.utils;
import com.google.common.collect.Lists;
import java.util.Arrays;
@@ -25,8 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.tier.TierFactory;
-import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
-import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
@@ -243,7 +241,8 @@ public class TableConfigUtilsTest {
// valid filterFunction
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .setIngestionConfig(new IngestionConfig(null, null, new
FilterConfig("Groovy({x == 10}, x)"), null, null)).build();
+ .setIngestionConfig(new IngestionConfig(null, null, new
FilterConfig("Groovy({x == 10}, x)"), null, null))
+ .build();
TableConfigUtils.validate(tableConfig, schema);
// invalid filter function
@@ -272,8 +271,8 @@ public class TableConfigUtilsTest {
// transformed column not in schema
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("myCol", "reverse(anotherCol)")), null))
- .build();
+ new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("myCol", "reverse(anotherCol)")),
+ null)).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for transformedColumn not present in schema");
@@ -286,8 +285,8 @@ public class TableConfigUtilsTest {
.build();
// valid transform configs
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("myCol", "reverse(anotherCol)")), null))
- .build();
+ new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("myCol", "reverse(anotherCol)")),
+ null)).build();
TableConfigUtils.validate(tableConfig, schema);
schema =
@@ -301,8 +300,8 @@ public class TableConfigUtilsTest {
// null transform column name
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig(null, "reverse(anotherCol)")), null))
- .build();
+ new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig(null, "reverse(anotherCol)")),
+ null)).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for null column name in transform config");
@@ -322,8 +321,8 @@ public class TableConfigUtilsTest {
// invalid function
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("myCol", "fakeFunction(col)")), null))
- .build();
+ new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("myCol", "fakeFunction(col)")),
+ null)).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid transform function in transform
config");
@@ -333,8 +332,8 @@ public class TableConfigUtilsTest {
// invalid function
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("myCol", "Groovy(badExpr)")), null))
- .build();
+ new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("myCol", "Groovy(badExpr)")),
+ null)).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid transform function in transform
config");
@@ -367,8 +366,8 @@ public class TableConfigUtilsTest {
// duplicate transform config
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
new IngestionConfig(null, null, null,
- Lists.newArrayList(new TransformConfig("myCol", "reverse(x)"), new
TransformConfig("myCol", "lower(y)")), null))
- .build();
+ Lists.newArrayList(new TransformConfig("myCol", "reverse(x)"), new
TransformConfig("myCol", "lower(y)")),
+ null)).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail due to duplicate transform config");
@@ -385,9 +384,10 @@ public class TableConfigUtilsTest {
@Test
public void ingestionStreamConfigsTest() {
- Map<String, String> fakeMap =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
+ Map<String, String> streamConfigs = getStreamConfigs();
IngestionConfig ingestionConfig =
- new IngestionConfig(null, new
StreamIngestionConfig(Lists.newArrayList(fakeMap, fakeMap)), null, null, null);
+ new IngestionConfig(null, new
StreamIngestionConfig(Lists.newArrayList(streamConfigs, streamConfigs)), null,
+ null, null);
TableConfig tableConfig =
new
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
.setIngestionConfig(ingestionConfig).build();
@@ -401,15 +401,16 @@ public class TableConfigUtilsTest {
}
// stream config should be valid
- ingestionConfig = new IngestionConfig(null, new
StreamIngestionConfig(Lists.newArrayList(fakeMap)), null, null, null);
+ ingestionConfig =
+ new IngestionConfig(null, new
StreamIngestionConfig(Lists.newArrayList(streamConfigs)), null, null, null);
tableConfig.setIngestionConfig(ingestionConfig);
TableConfigUtils.validateIngestionConfig(tableConfig, null);
- fakeMap.remove(StreamConfigProperties.STREAM_TYPE);
+ streamConfigs.remove(StreamConfigProperties.STREAM_TYPE);
try {
TableConfigUtils.validateIngestionConfig(tableConfig, null);
Assert.fail("Should fail for invalid stream configs map");
- } catch (Exception e) {
+ } catch (IllegalStateException e) {
// expected
}
}
@@ -724,7 +725,8 @@ public class TableConfigUtilsTest {
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
try {
FieldConfig fieldConfig =
- new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, null,
FieldConfig.CompressionCodec.SNAPPY, null);
+ new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, null,
FieldConfig.CompressionCodec.SNAPPY,
+ null);
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail since dictionary encoding does not support
compression codec snappy");
@@ -735,7 +737,8 @@ public class TableConfigUtilsTest {
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
try {
FieldConfig fieldConfig =
- new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, null,
FieldConfig.CompressionCodec.ZSTANDARD, null);
+ new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, null,
FieldConfig.CompressionCodec.ZSTANDARD,
+ null);
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail since dictionary encoding does not support
compression codec zstandard");
@@ -899,8 +902,8 @@ public class TableConfigUtilsTest {
// expected
}
- starTreeIndexConfig =
- new StarTreeIndexConfig(Arrays.asList("multiValCol"),
Arrays.asList("multiValCol"), Arrays.asList("SUM__multiValCol"), 1);
+ starTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("multiValCol"), Arrays.asList("multiValCol"),
+ Arrays.asList("SUM__multiValCol"), 1);
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig)).build();
try {
@@ -948,8 +951,9 @@ public class TableConfigUtilsTest {
// expected
}
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .setJsonIndexColumns(Arrays.asList("intCol")).build();
+ tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setJsonIndexColumns(Arrays.asList("intCol"))
+ .build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for Json index defined on non string column");
@@ -966,8 +970,8 @@ public class TableConfigUtilsTest {
// expected
}
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setRangeIndexColumns(columnList).
- build();
+ tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setRangeIndexColumns(columnList).build();
try {
TableConfigUtils.validate(tableConfig, schema);
} catch (Exception e) {
@@ -975,8 +979,7 @@ public class TableConfigUtilsTest {
}
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setRangeIndexColumns(columnList)
- .setNoDictionaryColumns(columnList).
- build();
+ .setNoDictionaryColumns(columnList).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for range index defined on non
numeric/no-dictionary column");
@@ -984,8 +987,8 @@ public class TableConfigUtilsTest {
// Expected
}
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setVarLengthDictionaryColumns(Arrays.asList("intCol")).
- build();
+ tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setVarLengthDictionaryColumns(Arrays.asList("intCol")).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for Var length dictionary defined for non
string/bytes column");
@@ -993,8 +996,8 @@ public class TableConfigUtilsTest {
// expected
}
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setJsonIndexColumns(Arrays.asList("multiValCol")).
- build();
+ tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setJsonIndexColumns(Arrays.asList("multiValCol")).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for Json Index defined on a multi value
column");
@@ -1036,56 +1039,58 @@ public class TableConfigUtilsTest {
.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
try {
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
- } catch (Exception e) {
+ Assert.fail();
+ } catch (IllegalStateException e) {
Assert.assertEquals(e.getMessage(), "Upsert table is for realtime table
only.");
}
+
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
try {
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
- } catch (Exception e) {
+ Assert.fail();
+ } catch (IllegalStateException e) {
Assert.assertEquals(e.getMessage(), "Upsert table must have primary key
columns in the schema");
}
+
schema =
new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
FieldSpec.DataType.STRING)
.setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
try {
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
- } catch (Exception e) {
+ Assert.fail();
+ } catch (IllegalStateException e) {
Assert
.assertEquals(e.getMessage(), "Could not find streamConfigs for
REALTIME table: " + TABLE_NAME + "_REALTIME");
}
- Map<String, String> streamConfigs = new HashMap<>();
- streamConfigs.put("stream.kafka.consumer.type", "highLevel");
- streamConfigs.put("streamType", "kafka");
- streamConfigs.put("stream.kafka.topic.name", "test");
- streamConfigs
- .put("stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+
+ Map<String, String> streamConfigs = getStreamConfigs();
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
- } catch (Exception e) {
+ Assert.fail();
+ } catch (IllegalStateException e) {
Assert.assertEquals(e.getMessage(), "Upsert table must use low-level
streaming consumer type");
}
+
streamConfigs.put("stream.kafka.consumer.type", "simple");
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
- } catch (Exception e) {
+ Assert.fail();
+ } catch (IllegalStateException e) {
Assert.assertEquals(e.getMessage(),
"Upsert table must use strict replica-group (i.e.
strictReplicaGroup) based routing");
}
+
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
.setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setStreamConfigs(streamConfigs).build();
- try {
- TableConfigUtils.validateUpsertConfig(tableConfig, schema);
- } catch (Exception e) {
- Assert.fail("Should not fail upsert validation");
- }
+ TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+
StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Lists.newArrayList("myCol"), null, Collections
.singletonList(new
AggregationFunctionColumnPair(AggregationFunctionType.COUNT,
"myCol").toColumnName()), 10);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
@@ -1094,8 +1099,80 @@ public class TableConfigUtilsTest {
.setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
- } catch (Exception e) {
+ Assert.fail();
+ } catch (IllegalStateException e) {
Assert.assertEquals(e.getMessage(), "The upsert table cannot have
star-tree index.");
}
}
+
+ @Test
+ public void testValidatePartialUpsertConfig() {
+ Schema schema =
+ new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol1",
FieldSpec.DataType.LONG)
+ .addSingleValueDimension("myCol2", FieldSpec.DataType.STRING)
+ .addDateTime("myTimeCol", FieldSpec.DataType.LONG, "1:DAYS:EPOCH",
"1:DAYS")
+ .setPrimaryKeyColumns(Lists.newArrayList("myCol1")).build();
+
+ Map<String, String> streamConfigs = getStreamConfigs();
+ streamConfigs.put("stream.kafka.consumer.type", "simple");
+ Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new
HashMap<>();
+ partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
+
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.PARTIAL,
partialUpsertStratgies))
+ .setNullHandlingEnabled(false)
+ .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setStreamConfigs(streamConfigs).build();
+ try {
+ TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "Null handling must be enabled for
partial upsert tables");
+ }
+
+ tableConfig.getIndexingConfig().setNullHandlingEnabled(true);
+ try {
+ TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "Merger cannot be applied to primary
key columns");
+ }
+
+ partialUpsertStratgies.clear();
+ partialUpsertStratgies.put("randomCol", UpsertConfig.Strategy.OVERWRITE);
+ try {
+ TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "Merger cannot be applied to
non-existing column: randomCol");
+ }
+
+ partialUpsertStratgies.clear();
+ partialUpsertStratgies.put("myCol2", UpsertConfig.Strategy.INCREMENT);
+ try {
+ TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "INCREMENT merger cannot be applied
to non-numeric column: myCol2");
+ }
+
+ partialUpsertStratgies.clear();
+ partialUpsertStratgies.put("myTimeCol", UpsertConfig.Strategy.INCREMENT);
+ try {
+ TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(), "INCREMENT merger cannot be applied
to date time column: myTimeCol");
+ }
+ }
+
+ private Map<String, String> getStreamConfigs() {
+ Map<String, String> streamConfigs = new HashMap<>();
+ streamConfigs.put("streamType", "kafka");
+ streamConfigs.put("stream.kafka.consumer.type", "highLevel");
+ streamConfigs.put("stream.kafka.topic.name", "test");
+ streamConfigs
+ .put("stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+ return streamConfigs;
+ }
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
index 09ad60f..da2517d 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
@@ -22,7 +22,7 @@ import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -74,7 +74,7 @@ public interface IndexSegment {
// TODO(upsert): solve the coordination problems of getting validDoc across
segments for result consistency
@Nullable
- ValidDocIndexReader getValidDocIndex();
+ ThreadSafeMutableRoaringBitmap getValidDocIds();
/**
* Returns the record for the given document Id. Virtual column values are
not returned.
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/ThreadSafeMutableRoaringBitmap.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ThreadSafeMutableRoaringBitmap.java
similarity index 96%
rename from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/ThreadSafeMutableRoaringBitmap.java
rename to
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ThreadSafeMutableRoaringBitmap.java
index 6875e73..3e008fb 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/ThreadSafeMutableRoaringBitmap.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ThreadSafeMutableRoaringBitmap.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.segment.local.realtime.impl;
+package org.apache.pinot.segment.spi.index;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 4c138a7..cf3e841 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -21,6 +21,8 @@ package org.apache.pinot.spi.config.table;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.pinot.spi.config.BaseJsonConfig;
@@ -30,16 +32,38 @@ public class UpsertConfig extends BaseJsonConfig {
FULL, PARTIAL, NONE
}
+ public enum Strategy {
+ // Todo: add APPEND, CUSTOM strategies
+ OVERWRITE, INCREMENT
+ }
+
private final Mode _mode;
+ private final Map<String, Strategy> _partialUpsertStrategies;
- @JsonCreator
public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode
mode) {
Preconditions.checkArgument(mode != null, "Upsert mode must be
configured");
- Preconditions.checkArgument(mode != Mode.PARTIAL, "Partial upsert mode is
not supported");
_mode = mode;
+ _partialUpsertStrategies = null;
+ }
+
+ @JsonCreator
+ public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode,
+ @JsonProperty(value = "partialUpsertStrategies") Map<String, Strategy>
partialUpsertStrategies) {
+ Preconditions.checkArgument(mode != null, "Upsert mode must be
configured");
+ _mode = mode;
+
+ if (mode == Mode.PARTIAL) {
+ _partialUpsertStrategies = partialUpsertStrategies != null ?
partialUpsertStrategies : new HashMap<>();
+ } else {
+ _partialUpsertStrategies = null;
+ }
}
public Mode getMode() {
return _mode;
}
+
+ public Map<String, Strategy> getPartialUpsertStrategies() {
+ return _partialUpsertStrategies;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index b5b9eb2..a496cf2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -204,6 +204,13 @@ public class GenericRow implements Serializable {
}
/**
+ * Marks a field as {@code non-null}.
+ */
+ public void removeNullValueField(String fieldName) {
+ _nullValueFields.remove(fieldName);
+ }
+
+ /**
* Removes all the fields from the row.
*/
public void clear() {
@@ -223,7 +230,8 @@ public class GenericRow implements Serializable {
}
if (obj instanceof GenericRow) {
GenericRow that = (GenericRow) obj;
- return _nullValueFields.equals(that._nullValueFields) &&
EqualityUtils.isEqual(_fieldToValueMap, that._fieldToValueMap);
+ return _nullValueFields.equals(that._nullValueFields) && EqualityUtils
+ .isEqual(_fieldToValueMap, that._fieldToValueMap);
}
return false;
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
index 9664a8e..2b20849 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
@@ -18,25 +18,23 @@
*/
package org.apache.pinot.spi.config.table;
+import java.util.HashMap;
+import java.util.Map;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
public class UpsertConfigTest {
@Test
public void testUpsertConfig() {
- UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
- assertEquals(upsertConfig.getMode(), UpsertConfig.Mode.FULL);
+ UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL);
+ assertEquals(upsertConfig1.getMode(), UpsertConfig.Mode.FULL);
- // Test illegal arguments
- try {
- new UpsertConfig(UpsertConfig.Mode.PARTIAL);
- fail();
- } catch (IllegalArgumentException e) {
- // Expected
- }
+ Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new
HashMap<>();
+ partialUpsertStratgies.put("myCol", UpsertConfig.Strategy.INCREMENT);
+ UpsertConfig upsertConfig2 = new UpsertConfig(UpsertConfig.Mode.PARTIAL,
partialUpsertStratgies);
+ assertEquals(upsertConfig2.getPartialUpsertStrategies(),
partialUpsertStratgies);
}
}
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 323de82..51c8082 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -436,6 +436,17 @@
</jvmSettings>
</program>
<program>
+
<mainClass>org.apache.pinot.tools.PartialUpsertQuickStart</mainClass>
+ <name>quick-start-partial-upsert-streaming</name>
+ <jvmSettings>
+ <initialMemorySize>1G</initialMemorySize>
+ <maxMemorySize>1G</maxMemorySize>
+ <extraArguments>
+
<extraArgument>-Dlog4j2.configurationFile=conf/quickstart-log4j2.xml</extraArgument>
+ </extraArguments>
+ </jvmSettings>
+ </program>
+ <program>
<mainClass>org.apache.pinot.tools.JsonIndexQuickStart</mainClass>
<name>quick-start-json-index-batch</name>
<jvmSettings>
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
new file mode 100644
index 0000000..b77cd5d
--- /dev/null
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.net.URL;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.tools.Quickstart.Color;
+import org.apache.pinot.tools.admin.command.QuickstartRunner;
+import org.apache.pinot.tools.streams.MeetupRsvpStream;
+import org.apache.pinot.tools.utils.KafkaStarterUtils;
+
+import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
+import static org.apache.pinot.tools.Quickstart.printStatus;
+
+
+public class PartialUpsertQuickStart {
+ private StreamDataServerStartable _kafkaStarter;
+
+ public static void main(String[] args)
+ throws Exception {
+ PluginManager.get().init();
+ new PartialUpsertQuickStart().execute();
+ }
+
+ // Todo: add a quick start demo
+ public void execute()
+ throws Exception {
+ File quickstartTmpDir = new File(FileUtils.getTempDirectory(),
String.valueOf(System.currentTimeMillis()));
+ File bootstrapTableDir = new File(quickstartTmpDir, "meetupRsvp");
+ File dataDir = new File(bootstrapTableDir, "data");
+ Preconditions.checkState(dataDir.mkdirs());
+
+ File schemaFile = new File(bootstrapTableDir, "meetupRsvp_schema.json");
+ File tableConfigFile = new File(bootstrapTableDir,
"meetupRsvp_realtime_table_config.json");
+
+ ClassLoader classLoader = Quickstart.class.getClassLoader();
+ URL resource =
classLoader.getResource("examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json");
+ Preconditions.checkNotNull(resource);
+ FileUtils.copyURLToFile(resource, schemaFile);
+ resource =
+
classLoader.getResource("examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json");
+ Preconditions.checkNotNull(resource);
+ FileUtils.copyURLToFile(resource, tableConfigFile);
+
+ QuickstartTableRequest request = new
QuickstartTableRequest(bootstrapTableDir.getAbsolutePath());
+ final QuickstartRunner runner = new
QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir);
+
+ printStatus(Color.CYAN, "***** Starting Kafka *****");
+ final ZkStarter.ZookeeperInstance zookeeperInstance =
ZkStarter.startLocalZkServer();
+ try {
+ _kafkaStarter =
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
+ KafkaStarterUtils.getDefaultKafkaConfiguration());
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start " +
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
+ }
+ _kafkaStarter.start();
+ _kafkaStarter.createTopic("meetupRSVPEvents",
KafkaStarterUtils.getTopicCreationProps(2));
+ printStatus(Color.CYAN, "***** Starting meetup data stream and publishing
to Kafka *****");
+ MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true);
+ meetupRSVPProvider.run();
+ printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and
broker *****");
+ runner.startAll();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ printStatus(Color.GREEN, "***** Shutting down realtime quick start
*****");
+ runner.stop();
+ meetupRSVPProvider.stopPublishing();
+ _kafkaStarter.stop();
+ ZkStarter.stopLocalZkServer(zookeeperInstance);
+ FileUtils.deleteDirectory(quickstartTmpDir);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }));
+ printStatus(Color.CYAN, "***** Bootstrap meetupRSVP(upsert) table *****");
+ runner.bootstrapTable();
+ printStatus(Color.CYAN, "***** Waiting for 15 seconds for a few events to
get populated *****");
+ Thread.sleep(15000);
+
+ printStatus(Color.YELLOW, "***** Upsert quickstart setup complete *****");
+ printStatus(Color.YELLOW, "***** The expected behavior for total number of
documents per PK should be 1 *****");
+ printStatus(Color.YELLOW,
+ "***** The expected behavior for total number of rsvp_counts per PK
should >=1 since it's incremented and updated. *****");
+
+ // The expected behavior for total number of documents per PK should be 1.
+ // The expected behavior for total number of rsvp_counts per PK should >=1
since it's incremented and updated.
+ String q1 =
+ "select event_id, count(*), sum(rsvp_count) from meetupRsvp group by
event_id order by sum(rsvp_count) desc limit 10";
+ printStatus(Color.YELLOW, "Total number of documents, total number of
rsvp_counts per event_id in the table");
+ printStatus(Color.CYAN, "Query : " + q1);
+ printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
+ printStatus(Color.GREEN,
"***************************************************");
+
+ printStatus(Color.GREEN, "You can always go to http://localhost:9000 to
play around in the query console");
+ }
+}
diff --git
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
new file mode 100644
index 0000000..62eb14a
--- /dev/null
+++
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
@@ -0,0 +1,53 @@
+{
+ "tableName": "meetupRsvp",
+ "tableType": "REALTIME",
+ "segmentsConfig": {
+ "timeColumnName": "mtime",
+ "timeType": "MILLISECONDS",
+ "retentionTimeUnit": "DAYS",
+ "retentionTimeValue": "1",
+ "segmentPushType": "APPEND",
+ "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+ "schemaName": "meetupRsvp",
+ "replicasPerPartition": "1"
+ },
+ "tenants": {},
+ "tableIndexConfig": {
+ "loadMode": "MMAP",
+ "nullHandlingEnabled": true,
+ "streamConfigs": {
+ "streamType": "kafka",
+ "stream.kafka.consumer.type": "lowLevel",
+ "stream.kafka.topic.name": "meetupRSVPEvents",
+ "stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+ "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
+ "stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
+ "stream.kafka.zk.broker.url": "localhost:2191/kafka",
+ "stream.kafka.broker.list": "localhost:19092",
+ "realtime.segment.flush.threshold.size": 30,
+ "realtime.segment.flush.threshold.rows": 30
+ }
+ },
+ "fieldConfigList": [
+ {
+ "name": "location",
+ "encodingType":"RAW",
+ "indexType":"H3",
+ "properties": {
+ "resolutions": "5"
+ }
+ }
+ ],
+ "metadata": {
+ "customConfigs": {}
+ },
+ "routing": {
+ "instanceSelectorType": "strictReplicaGroup"
+ },
+ "upsertConfig": {
+ "mode": "PARTIAL",
+ "partialUpsertStrategies":{
+ "rsvp_count": "INCREMENT"
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]