Jackie-Jiang commented on a change in pull request #5722:
URL: https://github.com/apache/incubator-pinot/pull/5722#discussion_r458973805
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -432,202 +409,193 @@ public void addExtraColumns(Schema newSchema) {
_logger.info("Newly added columns: " +
_newlyAddedColumnsFieldMap.toString());
}
+ // NOTE: Okay for single-writer
+ @SuppressWarnings("NonAtomicOperationOnVolatileField")
@Override
public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) {
- boolean canTakeMore;
// Update dictionary first
- Map<String, Object> dictIdMap = updateDictionary(row);
-
- int numDocs = _numDocsIndexed;
+ updateDictionary(row);
// If metrics aggregation is enabled and if the dimension values were
already seen, this will return existing docId,
// else this will return a new docId.
- int docId = getOrCreateDocId(dictIdMap);
-
- // docId == numDocs implies new docId.
- if (docId == numDocs) {
- // Add forward and inverted indices for new document.
- addForwardIndex(row, docId, dictIdMap);
- addInvertedIndex(row, docId, dictIdMap);
- if (_nullHandlingEnabled) {
- handleNullValues(row, docId);
- }
+ int docId = getOrCreateDocId();
+ boolean canTakeMore;
+ if (docId == _numDocsIndexed) {
+ // New document
+ addNewDocument(row, docId);
// Update number of document indexed at last to make the latest record
queryable
canTakeMore = _numDocsIndexed++ < _capacity;
} else {
- Preconditions
- .checkState(_aggregateMetrics, "Invalid document-id during indexing:
" + docId + " expected: " + numDocs);
- // Update metrics for existing document.
- canTakeMore = aggregateMetrics(row, docId);
+ // Aggregate metrics for existing document
+ assert _aggregateMetrics;
+ aggregateMetrics(row, docId);
+ canTakeMore = true;
}
+ // Update last indexed time and latest ingestion time
_lastIndexedTimeMs = System.currentTimeMillis();
-
- if (rowMetadata != null && rowMetadata.getIngestionTimeMs() !=
Long.MIN_VALUE) {
+ if (rowMetadata != null) {
_latestIngestionTimeMs = Math.max(_latestIngestionTimeMs,
rowMetadata.getIngestionTimeMs());
}
+
return canTakeMore;
}
- private Map<String, Object> updateDictionary(GenericRow row) {
- Map<String, Object> dictIdMap = new HashMap<>();
- for (FieldSpec fieldSpec : _physicalFieldSpecs) {
- String column = fieldSpec.getName();
+ private void updateDictionary(GenericRow row) {
+ for (Map.Entry<String, IndexContainer> entry :
_indexContainerMap.entrySet()) {
+ String column = entry.getKey();
+ IndexContainer indexContainer = entry.getValue();
Object value = row.getValue(column);
-
- BaseMutableDictionary dictionary = _dictionaryMap.get(column);
+ BaseMutableDictionary dictionary = indexContainer._dictionary;
if (dictionary != null) {
- if (fieldSpec.isSingleValueField()) {
- dictIdMap.put(column, dictionary.index(value));
+ if (indexContainer._fieldSpec.isSingleValueField()) {
+ indexContainer._dictId = dictionary.index(value);
} else {
- int[] dictIds = dictionary.index((Object[]) value);
- dictIdMap.put(column, dictIds);
-
- // No need to update min/max time value as time column cannot be
multi-valued
- continue;
+ indexContainer._dictIds = dictionary.index((Object[]) value);
}
+
+ // Update min/max value from dictionary
+ indexContainer._minValue = dictionary.getMinVal();
+ indexContainer._maxValue = dictionary.getMaxVal();
}
}
- return dictIdMap;
}
- private void addForwardIndex(GenericRow row, int docId, Map<String, Object>
dictIdMap) {
- // Store dictionary Id(s) for columns with dictionary
- for (FieldSpec fieldSpec : _physicalFieldSpecs) {
- String column = fieldSpec.getName();
+ private void addNewDocument(GenericRow row, int docId) {
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]