github-advanced-security[bot] commented on code in PR #19579: URL: https://github.com/apache/druid/pull/19579#discussion_r3408464125
########## processing/src/main/java/org/apache/druid/segment/incremental/OnHeapClusterGroup.java: ########## @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.incremental; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.error.DruidException; +import org.apache.druid.query.OrderBy; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorAndSize; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.EncodedKeyComponent; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.CapabilitiesBasedFormat; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnFormat; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ValueType; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * One cluster group within an {@link OnHeapClusteredBaseTable}: a sub-IncrementalIndex holding the non-clustering + * rows for a single clustering tuple. The clustering values themselves are NOT stored in this group's facts + * holder — they're constants captured on {@link #clusteringValues} (per the share-nothing v1 design, clustering + * values are written once on the on-disk group spec rather than per-row). + * <p> + * Each group owns its own {@link IncrementalIndex.DimensionDesc} list for the non-clustering dimensions, which + * means own {@link org.apache.druid.segment.DimensionIndexer} instances and own dictionaries. That isolation is + * what gives the persisted segment the "share nothing across groups" property — at persist time each group's + * dictionaries are written under its per-group file-bundle prefix. + * <p> + * Aggregator state is also per-group: rollup (when enabled) deduplicates within a group's facts holder, never + * across groups, since two rows with different clustering tuples land in different groups by construction. + */ +public final class OnHeapClusterGroup implements IncrementalIndexRowSelector +{ + private final OnHeapClusteredBaseTable parent; + private final Object[] clusteringValues; + private final List<Integer> clusteringValueIds; + + private final List<IncrementalIndex.DimensionDesc> dimensions; + private final Map<String, IncrementalIndex.DimensionDesc> dimensionsMap; + private final Map<String, IncrementalIndex.MetricDesc> aggregatorsMap; + private final Map<String, ColumnFormat> columnFormats; + private final AggregatorFactory[] aggregatorFactories; + private final FactsHolder factsHolder; + private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>(); + private final AtomicInteger rowCounter = new AtomicInteger(0); + private final AtomicInteger numEntries = new AtomicInteger(0); + // Group-local time position within {@link #getOrdering()}: 0 for the canonical "[__time]" group ordering that + // results from stripping the clustering-column prefix off the segment ordering. + private final int groupTimePosition; + + private final ColumnSelectorFactory virtualSelectorFactory; + private final Map<String, ColumnSelectorFactory> aggSelectors; + + OnHeapClusterGroup( + OnHeapClusteredBaseTable parent, + Object[] clusteringValues, + List<Integer> clusteringValueIds, + List<DimensionSchema> nonClusteringDimensions, + AggregatorFactory[] aggregatorFactories, + VirtualColumns virtualColumns, + IncrementalIndex.InputRowHolder inputRowHolder, + boolean rollup, + int timePosition + ) + { + this.parent = parent; + this.clusteringValues = clusteringValues; + this.clusteringValueIds = Collections.unmodifiableList(new ArrayList<>(clusteringValueIds)); + this.aggregatorFactories = aggregatorFactories; + + this.dimensions = new ArrayList<>(nonClusteringDimensions.size()); + this.dimensionsMap = new LinkedHashMap<>(); + this.columnFormats = new LinkedHashMap<>(); + initializeDimensions(nonClusteringDimensions); + + // Group-local time position. {@code timePosition} arrives from the parent as the SEGMENT-level position of + // __time within the segment's full ordering. Cluster groups strip the clustering-column prefix off that + // ordering, so the per-group position is shifted by the size of the clustering tuple. For the canonical v1 + // ordering [clustering..., __time] this comes out to 0. + final int clusteringCount = clusteringValues.length; + this.groupTimePosition = timePosition < 0 ? -1 : Math.max(0, timePosition - clusteringCount); + final int comparatorTimePosition = groupTimePosition < 0 ? dimensions.size() : groupTimePosition; + + final IncrementalIndex.IncrementalIndexRowComparator rowComparator = + new IncrementalIndex.IncrementalIndexRowComparator(comparatorTimePosition, dimensions); + if (rollup) { + this.factsHolder = new OnheapIncrementalIndex.RollupFactsHolder( + rowComparator, + dimensions, + comparatorTimePosition == 0 + ); + } else if (comparatorTimePosition == 0) { + this.factsHolder = new OnheapIncrementalIndex.PlainTimeOrderedFactsHolder(rowComparator); + } else { + this.factsHolder = new OnheapIncrementalIndex.PlainNonTimeOrderedFactsHolder(rowComparator); + } + + this.virtualSelectorFactory = new OnheapIncrementalIndex.CachingColumnSelectorFactory( + IncrementalIndex.makeColumnSelectorFactory(virtualColumns, inputRowHolder, null) + ); + this.aggregatorsMap = new LinkedHashMap<>(); + this.aggSelectors = new LinkedHashMap<>(); + initializeAggregators(virtualColumns, inputRowHolder); + } + + /** + * The constant clustering tuple for this group, in clustering-column declaration order. + */ + public Object[] getClusteringValues() + { + return clusteringValues; + } + + /** + * Per-clustering-column dictionary IDs assigned by the parent's segment-wide clustering indexers — these + * become {@code TableClusterGroupSpec.clusteringValueIds} at persist time. + */ + public List<Integer> getClusteringValueIds() + { + return clusteringValueIds; + } + + @Override + public List<IncrementalIndex.DimensionDesc> getDimensions() + { + return dimensions; + } + + @Override + public List<String> getDimensionNames(boolean includeTime) + { + if (!includeTime) { + return ImmutableList.copyOf(dimensionsMap.keySet()); + } + final ImmutableList.Builder<String> listBuilder = ImmutableList.builderWithExpectedSize(dimensionsMap.size() + 1); + int i = 0; + if (i == groupTimePosition) { + listBuilder.add(ColumnHolder.TIME_COLUMN_NAME); + } + for (String dimName : dimensionsMap.keySet()) { + listBuilder.add(dimName); + i++; + if (i == groupTimePosition) { + listBuilder.add(ColumnHolder.TIME_COLUMN_NAME); + } + } + return listBuilder.build(); + } + + @Override + public List<String> getMetricNames() + { + return ImmutableList.copyOf(aggregatorsMap.keySet()); + } + + @Override + @Nullable + public IncrementalIndex.DimensionDesc getDimension(String columnName) + { + return dimensionsMap.get(columnName); + } + + @Override + @Nullable + public IncrementalIndex.MetricDesc getMetric(String columnName) + { + return aggregatorsMap.get(columnName); + } + + @Override + @Nullable + public ColumnFormat getColumnFormat(String columnName) + { + if (ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) { + return new CapabilitiesBasedFormat(ColumnCapabilitiesImpl.createDefault().setType(ColumnType.LONG)); + } + return columnFormats.get(columnName); + } + + @Override + public List<OrderBy> getOrdering() + { + // Per-group ordering = segment ordering with the clustering-column prefix dropped (clustering values are + // constants per group, so they don't participate in intra-group sorting). + final List<OrderBy> segmentOrdering = parent.getSpec().getOrdering(); + final int clusteringCount = clusteringValues.length; + if (segmentOrdering.size() <= clusteringCount) { + return List.of(); + } + return List.copyOf(segmentOrdering.subList(clusteringCount, segmentOrdering.size())); + } + + @Override + public int getTimePosition() + { + return groupTimePosition; + } + + @Override + public boolean isEmpty() + { + return numEntries.get() == 0; + } + + @Override + public int numRows() + { + return numEntries.get(); + } + + @Override + public FactsHolder getFacts() + { + return factsHolder; + } + + @Override + public int getLastRowIndex() + { + // rowCounter is post-incremented only when a new row is added (see addToFacts), so it holds the count of rows; + // the last assigned 0-based index is one less. Mirrors OnheapIncrementalIndex.getLastRowIndex(). Returning the + // count instead would let a row appended after a cursor captured this value leak into that in-flight query. + return rowCounter.get() - 1; + } + + @Override + public float getMetricFloatValue(int rowOffset, int aggOffset) + { + return aggregators.get(rowOffset)[aggOffset].getFloat(); + } + + @Override + public long getMetricLongValue(int rowOffset, int aggOffset) + { + return aggregators.get(rowOffset)[aggOffset].getLong(); + } + + @Override + public double getMetricDoubleValue(int rowOffset, int aggOffset) + { + return aggregators.get(rowOffset)[aggOffset].getDouble(); + } + + @Override + @Nullable + public Object getMetricObjectValue(int rowOffset, int aggOffset) + { + return aggregators.get(rowOffset)[aggOffset].get(); + } + + @Override + public boolean isNull(int rowOffset, int aggOffset) + { + return aggregators.get(rowOffset)[aggOffset].isNull(); + } + + @Override + @Nullable + public ColumnCapabilities getColumnCapabilities(String column) + { + if (ColumnHolder.TIME_COLUMN_NAME.equals(column)) { + return ColumnCapabilitiesImpl.createDefault().setType(ColumnType.LONG).setHasNulls(false); + } + final IncrementalIndex.DimensionDesc dim = dimensionsMap.get(column); + if (dim != null) { + return dim.getCapabilities(); + } + final IncrementalIndex.MetricDesc metric = aggregatorsMap.get(column); + if (metric != null) { + return metric.getCapabilities(); + } + return null; + } + + public AggregatorFactory[] getAggregatorFactories() + { + return aggregatorFactories; + } + + /** + * Add a row's non-clustering content to this group's facts holder. Clustering values aren't re-processed here — + * they've already been resolved + encoded by the parent {@link OnHeapClusteredBaseTable} as part of selecting + * this group. Returns true when the row created a new fact entry (vs. rolling up into an existing one), so the + * parent index can keep its segment-wide entry count in sync. + */ + boolean addToFacts( + InputRow row, + long bucketedTimestamp, + List<String> parseExceptionMessages, + AtomicLong totalSizeInBytes + ) + { + final Object[] groupDims = new Object[dimensions.size()]; + long dimsKeySize = 0L; + for (int i = 0; i < dimensions.size(); i++) { + final IncrementalIndex.DimensionDesc desc = dimensions.get(i); + try { + @SuppressWarnings({"unchecked", "rawtypes"}) + final EncodedKeyComponent<?> k = ((org.apache.druid.segment.DimensionIndexer) desc.getIndexer()) + .processRowValsToUnsortedEncodedKeyComponent(row.getRaw(desc.getName()), true); + groupDims[i] = k.getComponent(); + dimsKeySize += k.getEffectiveSizeBytes(); + } + catch (org.apache.druid.java.util.common.parsers.ParseException pe) { + parseExceptionMessages.add(pe.getMessage()); + } + } + totalSizeInBytes.addAndGet(dimsKeySize); + + final IncrementalIndexRow subKey = IncrementalIndexRow.createTimeAndDimswithDimsKeySize( + bucketedTimestamp, + groupDims, + dimensions, + dimsKeySize + ); + + final int priorIndex = factsHolder.getPriorIndex(subKey); + final Aggregator[] aggs; + if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) { + aggs = aggregators.get(priorIndex); + final long aggSizeDelta = OnheapIncrementalIndex.doAggregate( + aggregatorFactories, + aggs, + parent.getInputRowHolder(), + parseExceptionMessages, + false + ); + totalSizeInBytes.addAndGet(aggSizeDelta); + return false; + } else { + aggs = new Aggregator[aggregatorFactories.length]; + long aggSizeForRow = factorizeAggs(aggs); + aggSizeForRow += OnheapIncrementalIndex.doAggregate( + aggregatorFactories, + aggs, + parent.getInputRowHolder(), + parseExceptionMessages, + false + ); + final int rowIndex = rowCounter.getAndIncrement(); + aggregators.put(rowIndex, aggs); + final int prev = factsHolder.putIfAbsent(subKey, rowIndex); + if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) { + numEntries.incrementAndGet(); + } else { + throw DruidException.defensive("Encountered existing fact entry for new key in cluster group"); + } + final long rowSize = subKey.estimateBytesInMemory() + + aggSizeForRow + + OnheapIncrementalIndex.ROUGH_OVERHEAD_PER_MAP_ENTRY; + totalSizeInBytes.addAndGet(rowSize); + return true; + } + } + + private void initializeDimensions(List<DimensionSchema> nonClusteringDimensions) + { + int i = 0; + for (DimensionSchema schema : nonClusteringDimensions) { + if (ColumnHolder.TIME_COLUMN_NAME.equals(schema.getName())) { + // __time is handled via the row timestamp, not as a dim desc + continue; + } + final IncrementalIndex.DimensionDesc desc = new IncrementalIndex.DimensionDesc( + i++, + schema.getName(), + schema.getDimensionHandler() + ); + dimensions.add(desc); + dimensionsMap.put(schema.getName(), desc); + columnFormats.put(schema.getName(), desc.getIndexer().getFormat()); + } + } + + private void initializeAggregators( + VirtualColumns virtualColumns, Review Comment: ## CodeQL / Useless parameter The parameter 'virtualColumns' is never used. [Show more details](https://github.com/apache/druid/security/code-scanning/11308) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
