eranmeir commented on a change in pull request #7838: Improve IncrementalIndex 
concurrency scalability
URL: https://github.com/apache/incubator-druid/pull/7838#discussion_r294179263
 
 

 ##########
 File path: 
processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
 ##########
 @@ -627,98 +624,96 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow 
row)
     if (row.getTimestampFromEpoch() < minTimestamp) {
       throw new IAE("Cannot add row[%s] because it is below the 
minTimestamp[%s]", row, DateTimes.utc(minTimestamp));
     }
-
     final List<String> rowDimensions = row.getDimensions();
 
-    Object[] dims;
-    List<Object> overflow = null;
+    Map<String, Object> rowDimKeys = new HashMap<>();
     long dimsKeySize = 0;
     List<String> parseExceptionMessages = new ArrayList<>();
-    synchronized (dimensionDescs) {
-      dims = new Object[dimensionDescs.size()];
-      for (String dimension : rowDimensions) {
-        if (Strings.isNullOrEmpty(dimension)) {
-          continue;
-        }
-        boolean wasNewDim = false;
-        ColumnCapabilitiesImpl capabilities;
-        DimensionDesc desc = dimensionDescs.get(dimension);
-        if (desc != null) {
-          capabilities = desc.getCapabilities();
-        } else {
-          wasNewDim = true;
-          capabilities = columnCapabilities.get(dimension);
-          if (capabilities == null) {
-            capabilities = new ColumnCapabilitiesImpl();
-            // For schemaless type discovery, assume everything is a String 
for now, can change later.
-            capabilities.setType(ValueType.STRING);
-            capabilities.setDictionaryEncoded(true);
-            capabilities.setHasBitmapIndexes(true);
-            columnCapabilities.put(dimension, capabilities);
-          }
-          DimensionHandler handler = 
DimensionHandlerUtils.getHandlerFromCapabilities(dimension, capabilities, null);
-          desc = addNewDimension(dimension, capabilities, handler);
-        }
-        DimensionHandler handler = desc.getHandler();
-        DimensionIndexer indexer = desc.getIndexer();
-        Object dimsKey = null;
-        try {
-          dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(
-              row.getRaw(dimension),
-              true
-          );
+
+    DimensionData prevDimensionData = this.dimensions.get();
+    DimensionData dimensionData = null;
+    for (String dimension : rowDimensions) {
+      if (Strings.isNullOrEmpty(dimension)) {
+        continue;
+      }
+
+      if (rowDimKeys.containsKey(dimension)) {
+        // If the dims map already contains a mapping at this index, it means 
we have seen this dimension already on this input row.
+        throw new ISE("Dimension[%s] occurred more than once in InputRow", 
dimension);
+      }
+      ColumnCapabilitiesImpl capabilities;
+      DimensionDesc desc = prevDimensionData.getDimensionDesc(dimension);
+      if (desc != null) {
+        capabilities = desc.getCapabilities();
+      } else {
+        if (dimensionData == null) {
+          dimensionData = prevDimensionData.clone();
         }
-        catch (ParseException pe) {
-          parseExceptionMessages.add(pe.getMessage());
+        capabilities = dimensionData.getDimensionCapabilities(dimension);
+        if (capabilities == null) {
+          capabilities = new ColumnCapabilitiesImpl();
+          // For schemaless type discovery, assume everything is a String for 
now, can change later.
+          capabilities.setType(ValueType.STRING);
+          capabilities.setDictionaryEncoded(true);
+          capabilities.setHasBitmapIndexes(true);
+          dimensionData.putCapabilities(dimension, capabilities);
         }
-        dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey);
-        // Set column capabilities as data is coming in
-        if (!capabilities.hasMultipleValues() &&
-            dimsKey != null &&
-            handler.getLengthOfEncodedKeyComponent(dimsKey) > 1) {
-          capabilities.setHasMultipleValues(true);
+        DimensionHandler handler = 
DimensionHandlerUtils.getHandlerFromCapabilities(dimension, capabilities, null);
+        if (dimensionData == null) {
+          dimensionData = prevDimensionData.clone();
         }
-
-        if (wasNewDim) {
-          if (overflow == null) {
-            overflow = new ArrayList<>();
-          }
-          overflow.add(dimsKey);
-        } else if (desc.getIndex() > dims.length || dims[desc.getIndex()] != 
null) {
-          /*
-           * index > dims.length requires that we saw this dimension and added 
it to the dimensionOrder map,
-           * otherwise index is null. Since dims is initialized based on the 
size of dimensionOrder on each call to add,
-           * it must have been added to dimensionOrder during this InputRow.
-           *
-           * if we found an index for this dimension it means we've seen it 
already. If !(index > dims.length) then
-           * we saw it on a previous input row (this its safe to index into 
dims). If we found a value in
-           * the dims array for this index, it means we have seen this 
dimension already on this input row.
-           */
-          throw new ISE("Dimension[%s] occurred more than once in InputRow", 
dimension);
-        } else {
-          dims[desc.getIndex()] = dimsKey;
+        desc = dimensionData.addNewDimension(dimension, capabilities, handler);
+      }
+      DimensionHandler handler = desc.getHandler();
+      DimensionIndexer indexer = desc.getIndexer();
+      Object dimsKey = null;
+      try {
+        dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(
+            row.getRaw(dimension),
+            true
+        );
+      }
+      catch (ParseException pe) {
+        parseExceptionMessages.add(pe.getMessage());
+      }
+      dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey);
+      // Set column capabilities as data is coming in
+      if (!capabilities.hasMultipleValues() &&
+          dimsKey != null &&
+          handler.getLengthOfEncodedKeyComponent(dimsKey) > 1) {
+        if (dimensionData == null) {
+          dimensionData = prevDimensionData.clone();
         }
+        capabilities = dimensionData.getDimensionCapabilities(dimension);
+        capabilities.setHasMultipleValues(true);
       }
+      rowDimKeys.put(dimension, dimsKey);
     }
 
-    if (overflow != null) {
-      // Merge overflow and non-overflow
-      Object[] newDims = new Object[dims.length + overflow.size()];
-      System.arraycopy(dims, 0, newDims, 0, dims.length);
-      for (int i = 0; i < overflow.size(); ++i) {
-        newDims[dims.length + i] = overflow.get(i);
+
+    if (dimensionData != null) {
+      while (!dimensions.compareAndSet(prevDimensionData, dimensionData)) {
 
 Review comment:
   From what I gather, there's not a lot of contention in updating dimension 
data. The motivation to use CAS rather than locking was to save thread state 
changes, which may result in more costly context switching. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to