eranmeir commented on a change in pull request #7838: Improve IncrementalIndex
concurrency scalability
URL: https://github.com/apache/incubator-druid/pull/7838#discussion_r295760092
##########
File path:
processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -627,98 +624,96 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow
row)
if (row.getTimestampFromEpoch() < minTimestamp) {
throw new IAE("Cannot add row[%s] because it is below the
minTimestamp[%s]", row, DateTimes.utc(minTimestamp));
}
-
final List<String> rowDimensions = row.getDimensions();
- Object[] dims;
- List<Object> overflow = null;
+ Map<String, Object> rowDimKeys = new HashMap<>();
long dimsKeySize = 0;
List<String> parseExceptionMessages = new ArrayList<>();
- synchronized (dimensionDescs) {
- dims = new Object[dimensionDescs.size()];
- for (String dimension : rowDimensions) {
- if (Strings.isNullOrEmpty(dimension)) {
- continue;
- }
- boolean wasNewDim = false;
- ColumnCapabilitiesImpl capabilities;
- DimensionDesc desc = dimensionDescs.get(dimension);
- if (desc != null) {
- capabilities = desc.getCapabilities();
- } else {
- wasNewDim = true;
- capabilities = columnCapabilities.get(dimension);
- if (capabilities == null) {
- capabilities = new ColumnCapabilitiesImpl();
- // For schemaless type discovery, assume everything is a String
for now, can change later.
- capabilities.setType(ValueType.STRING);
- capabilities.setDictionaryEncoded(true);
- capabilities.setHasBitmapIndexes(true);
- columnCapabilities.put(dimension, capabilities);
- }
- DimensionHandler handler =
DimensionHandlerUtils.getHandlerFromCapabilities(dimension, capabilities, null);
- desc = addNewDimension(dimension, capabilities, handler);
- }
- DimensionHandler handler = desc.getHandler();
- DimensionIndexer indexer = desc.getIndexer();
- Object dimsKey = null;
- try {
- dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(
- row.getRaw(dimension),
- true
- );
+
+ DimensionData prevDimensionData = this.dimensions.get();
+ DimensionData dimensionData = null;
+ for (String dimension : rowDimensions) {
+ if (Strings.isNullOrEmpty(dimension)) {
+ continue;
+ }
+
+ if (rowDimKeys.containsKey(dimension)) {
+ // If the dims map already contains a mapping at this index, it means
we have seen this dimension already on this input row.
+ throw new ISE("Dimension[%s] occurred more than once in InputRow",
dimension);
+ }
+ ColumnCapabilitiesImpl capabilities;
+ DimensionDesc desc = prevDimensionData.getDimensionDesc(dimension);
+ if (desc != null) {
+ capabilities = desc.getCapabilities();
+ } else {
+ if (dimensionData == null) {
+ dimensionData = prevDimensionData.clone();
}
- catch (ParseException pe) {
- parseExceptionMessages.add(pe.getMessage());
+ capabilities = dimensionData.getDimensionCapabilities(dimension);
+ if (capabilities == null) {
+ capabilities = new ColumnCapabilitiesImpl();
+ // For schemaless type discovery, assume everything is a String for
now, can change later.
+ capabilities.setType(ValueType.STRING);
+ capabilities.setDictionaryEncoded(true);
+ capabilities.setHasBitmapIndexes(true);
+ dimensionData.putCapabilities(dimension, capabilities);
}
- dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey);
- // Set column capabilities as data is coming in
- if (!capabilities.hasMultipleValues() &&
- dimsKey != null &&
- handler.getLengthOfEncodedKeyComponent(dimsKey) > 1) {
- capabilities.setHasMultipleValues(true);
+ DimensionHandler handler =
DimensionHandlerUtils.getHandlerFromCapabilities(dimension, capabilities, null);
+ if (dimensionData == null) {
+ dimensionData = prevDimensionData.clone();
}
-
- if (wasNewDim) {
- if (overflow == null) {
- overflow = new ArrayList<>();
- }
- overflow.add(dimsKey);
- } else if (desc.getIndex() > dims.length || dims[desc.getIndex()] !=
null) {
- /*
- * index > dims.length requires that we saw this dimension and added
it to the dimensionOrder map,
- * otherwise index is null. Since dims is initialized based on the
size of dimensionOrder on each call to add,
- * it must have been added to dimensionOrder during this InputRow.
- *
- * if we found an index for this dimension it means we've seen it
already. If !(index > dims.length) then
- * we saw it on a previous input row (this its safe to index into
dims). If we found a value in
- * the dims array for this index, it means we have seen this
dimension already on this input row.
- */
- throw new ISE("Dimension[%s] occurred more than once in InputRow",
dimension);
- } else {
- dims[desc.getIndex()] = dimsKey;
+ desc = dimensionData.addNewDimension(dimension, capabilities, handler);
+ }
+ DimensionHandler handler = desc.getHandler();
+ DimensionIndexer indexer = desc.getIndexer();
+ Object dimsKey = null;
+ try {
+ dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(
+ row.getRaw(dimension),
+ true
+ );
+ }
+ catch (ParseException pe) {
+ parseExceptionMessages.add(pe.getMessage());
+ }
+ dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey);
+ // Set column capabilities as data is coming in
+ if (!capabilities.hasMultipleValues() &&
+ dimsKey != null &&
+ handler.getLengthOfEncodedKeyComponent(dimsKey) > 1) {
+ if (dimensionData == null) {
+ dimensionData = prevDimensionData.clone();
}
+ capabilities = dimensionData.getDimensionCapabilities(dimension);
+ capabilities.setHasMultipleValues(true);
}
+ rowDimKeys.put(dimension, dimsKey);
}
- if (overflow != null) {
- // Merge overflow and non-overflow
- Object[] newDims = new Object[dims.length + overflow.size()];
- System.arraycopy(dims, 0, newDims, 0, dims.length);
- for (int i = 0; i < overflow.size(); ++i) {
- newDims[dims.length + i] = overflow.get(i);
+
+ if (dimensionData != null) {
+ while (!dimensions.compareAndSet(prevDimensionData, dimensionData)) {
+ prevDimensionData = dimensions.get();
+ dimensionData.rebase(prevDimensionData);
}
- dims = newDims;
+ } else {
+ dimensionData = prevDimensionData;
+ }
+ Object[] dims = new Object[dimensionData.size()];
+ for (String dimension : rowDimKeys.keySet()) {
Review comment:
Fixed
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]