leventov commented on a change in pull request #7838: Improve IncrementalIndex 
concurrency scalability
URL: https://github.com/apache/incubator-druid/pull/7838#discussion_r295934142
 
 

 ##########
 File path: 
processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
 ##########
 @@ -624,46 +626,67 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow 
row)
     if (row.getTimestampFromEpoch() < minTimestamp) {
       throw new IAE("Cannot add row[%s] because it is below the 
minTimestamp[%s]", row, DateTimes.utc(minTimestamp));
     }
-    final List<String> rowDimensions = row.getDimensions();
+    DimensionData prevDimensionData = this.dimensions.get();
+    RowDimsKeyComponents dimsKeyComponents = getRowDimsKeyComponents(row, 
prevDimensionData);
+    DimensionData dimensionData = dimsKeyComponents.getUpdatedDimensionData();
+    while (dimensionData != null && 
!dimensions.compareAndSet(prevDimensionData, dimensionData)) {
+      prevDimensionData = dimensions.get();
+      dimsKeyComponents = getRowDimsKeyComponents(row, prevDimensionData);
+      dimensionData = dimsKeyComponents.getUpdatedDimensionData();
+    }
 
-    Map<String, Object> rowDimKeys = new HashMap<>();
+    dimensionData = dimensions.get();
     long dimsKeySize = 0;
-    List<String> parseExceptionMessages = new ArrayList<>();
+    Object[] dims = new Object[dimensionData.size()];
+    for (Map.Entry<String, Object> dimension : 
dimsKeyComponents.getRowDimKeys().entrySet()) {
+      DimensionDesc desc = dimensionData.getDimensionDesc(dimension.getKey());
+      DimensionIndexer indexer = desc.getIndexer();
+      Object dimsKey = dimension.getValue();
+      dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey);
+      dims[desc.getIndex()] = dimsKey;
+    }
 
-    DimensionData prevDimensionData = this.dimensions.get();
+    long truncated = 0;
+    if (row.getTimestamp() != null) {
+      truncated = gran.bucketStart(row.getTimestamp()).getMillis();
+    }
+
+    IncrementalIndexRow incrementalIndexRow = 
IncrementalIndexRow.createTimeAndDimswithDimsKeySize(
+        Math.max(truncated, minTimestamp),
+        dims,
+        dimensionData.getDimensionDescsList(),
+        dimsKeySize
+    );
+    return new IncrementalIndexRowResult(incrementalIndexRow, 
dimsKeyComponents.getParseExceptionMessages());
+  }
+
+  // Note: This method might be called  from multiple parallel threads without 
synchronization. Making thread-unsafe
 
 Review comment:
   Please turn this into a Javadoc comment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to