mcvsubbu commented on a change in pull request #5722:
URL: https://github.com/apache/incubator-pinot/pull/5722#discussion_r458231984



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -432,202 +409,193 @@ public void addExtraColumns(Schema newSchema) {
     _logger.info("Newly added columns: " + 
_newlyAddedColumnsFieldMap.toString());
   }
 
+  // NOTE: Okay for single-writer
+  @SuppressWarnings("NonAtomicOperationOnVolatileField")
   @Override
   public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) {
-    boolean canTakeMore;
     // Update dictionary first
-    Map<String, Object> dictIdMap = updateDictionary(row);
-
-    int numDocs = _numDocsIndexed;
+    updateDictionary(row);
 
     // If metrics aggregation is enabled and if the dimension values were 
already seen, this will return existing docId,
     // else this will return a new docId.
-    int docId = getOrCreateDocId(dictIdMap);
-
-    // docId == numDocs implies new docId.
-    if (docId == numDocs) {
-      // Add forward and inverted indices for new document.
-      addForwardIndex(row, docId, dictIdMap);
-      addInvertedIndex(row, docId, dictIdMap);
-      if (_nullHandlingEnabled) {
-        handleNullValues(row, docId);
-      }
+    int docId = getOrCreateDocId();
 
+    boolean canTakeMore;
+    if (docId == _numDocsIndexed) {
+      // New document
+      addNewDocument(row, docId);
       // Update number of document indexed at last to make the latest record 
queryable
       canTakeMore = _numDocsIndexed++ < _capacity;
     } else {
-      Preconditions
-          .checkState(_aggregateMetrics, "Invalid document-id during indexing: 
" + docId + " expected: " + numDocs);
-      // Update metrics for existing document.
-      canTakeMore = aggregateMetrics(row, docId);
+      // Aggregate metrics for existing document
+      assert _aggregateMetrics;
+      aggregateMetrics(row, docId);
+      canTakeMore = true;
     }
 
+    // Update last indexed time and latest ingestion time
     _lastIndexedTimeMs = System.currentTimeMillis();
-
-    if (rowMetadata != null && rowMetadata.getIngestionTimeMs() != 
Long.MIN_VALUE) {
+    if (rowMetadata != null) {
       _latestIngestionTimeMs = Math.max(_latestIngestionTimeMs, 
rowMetadata.getIngestionTimeMs());
     }
+
     return canTakeMore;
   }
 
-  private Map<String, Object> updateDictionary(GenericRow row) {
-    Map<String, Object> dictIdMap = new HashMap<>();
-    for (FieldSpec fieldSpec : _physicalFieldSpecs) {
-      String column = fieldSpec.getName();
+  private void updateDictionary(GenericRow row) {
+    for (Map.Entry<String, IndexContainer> entry : 
_indexContainerMap.entrySet()) {
+      String column = entry.getKey();
+      IndexContainer indexContainer = entry.getValue();
       Object value = row.getValue(column);
-
-      BaseMutableDictionary dictionary = _dictionaryMap.get(column);
+      BaseMutableDictionary dictionary = indexContainer._dictionary;
       if (dictionary != null) {
-        if (fieldSpec.isSingleValueField()) {
-          dictIdMap.put(column, dictionary.index(value));
+        if (indexContainer._fieldSpec.isSingleValueField()) {
+          indexContainer._dictId = dictionary.index(value);
         } else {
-          int[] dictIds = dictionary.index((Object[]) value);
-          dictIdMap.put(column, dictIds);
-
-          // No need to update min/max time value as time column cannot be 
multi-valued
-          continue;
+          indexContainer._dictIds = dictionary.index((Object[]) value);
         }
+
+        // Update min/max value from dictionary
+        indexContainer._minValue = dictionary.getMinVal();
+        indexContainer._maxValue = dictionary.getMaxVal();
       }
     }
-    return dictIdMap;
   }
 
-  private void addForwardIndex(GenericRow row, int docId, Map<String, Object> 
dictIdMap) {
-    // Store dictionary Id(s) for columns with dictionary
-    for (FieldSpec fieldSpec : _physicalFieldSpecs) {
-      String column = fieldSpec.getName();
+  private void addNewDocument(GenericRow row, int docId) {
+    for (Map.Entry<String, IndexContainer> entry : 
_indexContainerMap.entrySet()) {
+      String column = entry.getKey();
+      IndexContainer indexContainer = entry.getValue();
       Object value = row.getValue(column);
-      NumValuesInfo numValuesInfo = _numValuesInfoMap.get(column);
+      FieldSpec fieldSpec = indexContainer._fieldSpec;
       if (fieldSpec.isSingleValueField()) {
-        // SV column
-        MutableForwardIndex mutableForwardIndex = _forwardIndexMap.get(column);
-        Integer dictId = (Integer) dictIdMap.get(column);
-        if (dictId != null) {
-          // SV Column with dictionary
-          mutableForwardIndex.setDictId(docId, dictId);
+        // Single-value column
+
+        // Update numValues info
+        indexContainer._numValuesInfo.updateSVEntry();
+
+        // Update indexes
+        MutableForwardIndex forwardIndex = indexContainer._forwardIndex;
+        int dictId = indexContainer._dictId;
+        if (dictId >= 0) {
+          // Dictionary-encoded single-value column
+
+          // Update forward index
+          forwardIndex.setDictId(docId, dictId);
+
+          // Update inverted index
+          RealtimeInvertedIndexReader invertedIndex = 
indexContainer._invertedIndex;
+          if (invertedIndex != null) {
+            invertedIndex.add(dictId, docId);
+          }
         } else {
-          // No-dictionary SV column
+          // Single-value column with raw index
+
+          // Update forward index
           DataType dataType = fieldSpec.getDataType();
           switch (dataType) {
             case INT:
-              mutableForwardIndex.setInt(docId, (Integer) value);
+              forwardIndex.setInt(docId, (Integer) value);
               break;
             case LONG:
-              mutableForwardIndex.setLong(docId, (Long) value);
+              forwardIndex.setLong(docId, (Long) value);
               break;
             case FLOAT:
-              mutableForwardIndex.setFloat(docId, (Float) value);
+              forwardIndex.setFloat(docId, (Float) value);
               break;
             case DOUBLE:
-              mutableForwardIndex.setDouble(docId, (Double) value);
+              forwardIndex.setDouble(docId, (Double) value);
               break;
             case STRING:
-              mutableForwardIndex.setString(docId, (String) value);
+              forwardIndex.setString(docId, (String) value);
               break;
             case BYTES:
-              mutableForwardIndex.setBytes(docId, (byte[]) value);
+              forwardIndex.setBytes(docId, (byte[]) value);
               break;
             default:
               throw new UnsupportedOperationException(
                   "Unsupported data type: " + dataType + " for no-dictionary 
column: " + column);
           }
+
+          // Update min/max value from raw value
+          // NOTE: Skip updating min/max value for aggregated metrics because 
the value will change over time.
+          if (!_aggregateMetrics || fieldSpec.getFieldType() != 
FieldSpec.FieldType.METRIC) {

Review comment:
       Is the check for aggregateMetrics redundant here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to