github-advanced-security[bot] commented on code in PR #19193: URL: https://github.com/apache/druid/pull/19193#discussion_r2970997731
########## multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByFrameCombiner.java: ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit.groupby; + +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.processor.FrameCombiner; +import org.apache.druid.frame.processor.FrameProcessors; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.segment.FrameCursor; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.BaseSingleValueDimensionSelector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Implementation of {@link FrameCombiner} for groupBy queries. Combines aggregate values for rows with + * identical dimension keys using {@link AggregatorFactory#combine}. + */ +public class GroupByFrameCombiner implements FrameCombiner +{ + private final RowSignature signature; + private final int aggregatorStart; + private final Object[] aggregateValues; + private final List<AggregatorFactory> aggregatorFactories; + private final CombinedColumnSelectorFactory combinedColumnSelectorFactory; + + private FrameReader frameReader; + + /** + * Frame for {@link #cachedCursor} and {@link #cachedAggregatorSelectors}. Updated by {@link #getCursor(Frame)}. + */ + @Nullable + private Frame cachedFrame; + + /** + * Cached cursor for the current frame. Updated by {@link #getCursor(Frame)}. + */ + @Nullable + private FrameCursor cachedCursor; + + /** + * Cached aggregator selectors for the current frame. Updated by {@link #getCursor(Frame)}. + * Same length as {@link #aggregatorFactories}. + */ + @Nullable + private ColumnValueSelector<?>[] cachedAggregatorSelectors; + + public GroupByFrameCombiner( + final RowSignature signature, + final List<AggregatorFactory> aggregatorFactories, + final int aggregatorStart + ) + { + this.signature = signature; + this.aggregatorStart = aggregatorStart; + this.aggregatorFactories = aggregatorFactories; + this.aggregateValues = new Object[aggregatorFactories.size()]; + this.combinedColumnSelectorFactory = new CombinedColumnSelectorFactory(); + } + + @Override + public void init(final FrameReader frameReader) + { + this.frameReader = frameReader; + } + + @Override + public void reset(final Frame frame, final int row) + { + final FrameCursor cursor = getCursor(frame); + cursor.setCurrentRow(row); + + // Read aggregate values from this row using cached selectors. + for (int i = 0; i < aggregatorFactories.size(); i++) { + aggregateValues[i] = cachedAggregatorSelectors[i].getObject(); + } + } + + @Override + public void combine(final Frame frame, final int row) + { + final FrameCursor cursor = getCursor(frame); + cursor.setCurrentRow(row); + + // Read and combine aggregate values using cached selectors. + for (int i = 0; i < aggregatorFactories.size(); i++) { + final Object newValue = cachedAggregatorSelectors[i].getObject(); + aggregateValues[i] = aggregatorFactories.get(i).combine(aggregateValues[i], newValue); + } + } + + @Override + public ColumnSelectorFactory getCombinedColumnSelectorFactory() + { + return combinedColumnSelectorFactory; + } + + /** + * Returns a cursor for the given frame, reusing a cached cursor if the frame has not changed. + * Also rebuilds {@link #cachedAggregatorSelectors} when the cursor changes. + */ + private FrameCursor getCursor(final Frame frame) + { + //noinspection ObjectEquality + if (frame != cachedFrame) { + cachedFrame = frame; + cachedCursor = FrameProcessors.makeCursor(frame, frameReader); + + // Reset dimension selectors, they need to be recreated for the new cursor. + combinedColumnSelectorFactory.resetSelectorCache(); + + // Rebuild aggregator selectors for the new cursor. + final ColumnSelectorFactory columnSelectorFactory = cachedCursor.getColumnSelectorFactory(); + cachedAggregatorSelectors = new ColumnValueSelector<?>[aggregatorFactories.size()]; + for (int i = 0; i < aggregatorFactories.size(); i++) { + cachedAggregatorSelectors[i] = + columnSelectorFactory.makeColumnValueSelector(signature.getColumnName(aggregatorStart + i)); + } + } + return cachedCursor; + } + + /** + * ColumnSelectorFactory that reads dimension columns from the cached frame cursor, and aggregate columns from + * {@link #aggregateValues}. Key columns can be read from any row in the current group, since + * all rows in a group share the same key. The cached cursor is always positioned at the most recent row passed to + * {@link #reset} or {@link #combine}. + */ + private class CombinedColumnSelectorFactory implements ColumnSelectorFactory + { + /** + * Cached dimension value selectors from {@link #cachedCursor}. + */ + private final Map<String, ColumnValueSelector<?>> valueDimensionSelectorCache = new HashMap<>(); + + /** + * Cached dimension string selectors from {@link #cachedCursor}. + */ + private final Map<DimensionSpec, DimensionSelector> stringDimensionSelectorCache = new HashMap<>(); + + @Override + public DimensionSelector makeDimensionSelector(final DimensionSpec dimensionSpec) + { + final int columnIndex = signature.indexOf(dimensionSpec.getDimension()); + + if (columnIndex < 0) { + return DimensionSelector.constant(null, dimensionSpec.getExtractionFn()); Review Comment: ## Deprecated method or constructor invocation Invoking [DimensionSpec.getExtractionFn](1) should be avoided because it has been deprecated. [Show more details](https://github.com/apache/druid/security/code-scanning/10917) ########## processing/src/test/java/org/apache/druid/frame/processor/SummingFrameCombiner.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.segment.FrameCursor; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; + +/** + * Simple test combiner that sums a long column at {@link #sumColumnNumber}. + * All columns before {@link #sumColumnNumber} are treated as key columns. + */ +public class SummingFrameCombiner implements FrameCombiner +{ + private final RowSignature signature; + private final int sumColumnNumber; + + private FrameReader frameReader; + private FrameCursor keyCursor; + private long summedValue; + + public SummingFrameCombiner(final RowSignature signature, final int sumColumnNumber) + { + this.signature = signature; + this.sumColumnNumber = sumColumnNumber; + } + + @Override + public void init(final FrameReader frameReader) + { + this.frameReader = frameReader; + } + + @Override + public void reset(final Frame frame, final int row) + { + this.keyCursor = FrameProcessors.makeCursor(frame, frameReader); + this.keyCursor.setCurrentRow(row); + this.summedValue = readLongValue(frame, row); + } + + @Override + public void combine(final Frame frame, final int row) + { + this.summedValue += readLongValue(frame, row); + } + + @Override + public ColumnSelectorFactory getCombinedColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(final DimensionSpec dimensionSpec) + { + final int columnNumber = signature.indexOf(dimensionSpec.getDimension()); + if (columnNumber < 0) { + return DimensionSelector.constant(null, dimensionSpec.getExtractionFn()); Review Comment: ## Deprecated method or constructor invocation Invoking [DimensionSpec.getExtractionFn](1) should be avoided because it has been deprecated. [Show more details](https://github.com/apache/druid/security/code-scanning/10918) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
