rohangarg commented on code in PR #13458: URL: https://github.com/apache/druid/pull/13458#discussion_r1039007818
########## processing/src/main/java/org/apache/druid/query/rowsandcols/DefaultGroupPartitioner.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols; + +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; + +import java.util.List; + +@SuppressWarnings("unused") +public class DefaultGroupPartitioner implements GroupPartitioner +{ + private final RowsAndColumns rac; + + public DefaultGroupPartitioner( + RowsAndColumns rac + ) + { + this.rac = rac; + } + + @Override + public int[] computeGroupings(List<String> columns) + { + int[] retVal = new int[rac.numRows()]; + + for (String column : columns) { + final Column theCol = rac.findColumn(column); + if (theCol == null) { + // The column doesn't exist. In this case, we assume it's always the same value: null. If it's always + // the same, then it doesn't impact grouping at all and can be entirely skipped. + continue; + } + final ColumnAccessor accessor = theCol.toAccessor(); + + int currGroup = 0; + int prevGroupVal = 0; + for (int i = 1; i < retVal.length; ++i) { + if (retVal[i] == prevGroupVal) { Review Comment: this seems incorrect, should we always just do `accessor.compareCells(i - 1, i)` and then check the result of that comparison? I think that `retVal[i]` will always be 0 in this check ########## processing/src/main/java/org/apache/druid/query/rowsandcols/DefaultGroupPartitioner.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols; + +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; + +import java.util.List; + +@SuppressWarnings("unused") +public class DefaultGroupPartitioner implements GroupPartitioner Review Comment: Currently this and the default sorted partitioner are same right? Are you thinking that in future, the `GroupPartitioner` may also allow hash based partitioning? ########## processing/src/main/java/org/apache/druid/query/rowsandcols/DefaultOnHeapAggregatable.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols; + +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.segment.BaseSingleValueDimensionSelector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.serde.ComplexMetricSerde; +import org.apache.druid.segment.serde.ComplexMetrics; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +public class DefaultOnHeapAggregatable implements OnHeapAggregatable, OnHeapCumulativeAggregatable +{ + private final RowsAndColumns rac; + + public DefaultOnHeapAggregatable( + RowsAndColumns rac + ) + { + this.rac = rac; + } + + @Override + public ArrayList<Object> aggregateAll( + List<AggregatorFactory> aggFactories + ) + { + Aggregator[] aggs = new Aggregator[aggFactories.size()]; + + AtomicInteger currRow = new AtomicInteger(0); + int index = 0; + for (AggregatorFactory aggFactory : aggFactories) { + aggs[index++] = aggFactory.factorize(new ColumnAccessorBasedColumnSelectorFactory(currRow)); + } + + int numRows = rac.numRows(); + int rowId = currRow.get(); + while (rowId < numRows) { + for (Aggregator agg : aggs) { + agg.aggregate(); + } + rowId = currRow.incrementAndGet(); + } + + ArrayList<Object> retVal = new ArrayList<>(aggs.length); + for (Aggregator agg : aggs) { + retVal.add(agg.get()); + } + return retVal; + } + + @Override + public ArrayList<Object[]> aggregateCumulative(List<AggregatorFactory> aggFactories) + { + Aggregator[] aggs = new Aggregator[aggFactories.size()]; + ArrayList<Object[]> retVal = new ArrayList<>(aggFactories.size()); + + int numRows = rac.numRows(); + AtomicInteger currRow = new AtomicInteger(0); + int index = 0; + for (AggregatorFactory aggFactory : aggFactories) { + aggs[index++] = aggFactory.factorize(new ColumnAccessorBasedColumnSelectorFactory(currRow)); + retVal.add(new Object[numRows]); + } + + int rowId = currRow.get(); + while (rowId < numRows) { + for (int i = 0; i < aggs.length; ++i) { + aggs[i].aggregate(); Review Comment: maybe could add a new method which returns the current state upon calling `aggregate` - the default impl. would do `aggregate + get`. shouldn't do it incase seems no future use-case for it. ########## processing/src/main/java/org/apache/druid/query/rowsandcols/DefaultOnHeapAggregatable.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols; + +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.segment.BaseSingleValueDimensionSelector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.serde.ComplexMetricSerde; +import org.apache.druid.segment.serde.ComplexMetrics; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +public class DefaultOnHeapAggregatable implements OnHeapAggregatable, OnHeapCumulativeAggregatable +{ + private final RowsAndColumns rac; + + public DefaultOnHeapAggregatable( Review Comment: I'm not sure what the correct name would be, but this name seems a bit off. The methods of this class independently build aggregate tables on the `RowsAndColumns` object. I mean that this class is mostly stateless wrt the aggregate table. ########## processing/src/main/java/org/apache/druid/query/rowsandcols/DefaultOnHeapAggregatable.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols; + +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.segment.BaseSingleValueDimensionSelector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.serde.ComplexMetricSerde; +import org.apache.druid.segment.serde.ComplexMetrics; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +public class DefaultOnHeapAggregatable implements OnHeapAggregatable, OnHeapCumulativeAggregatable +{ + private final RowsAndColumns rac; + + public DefaultOnHeapAggregatable( + RowsAndColumns rac + ) + { + this.rac = rac; + } + + @Override + public ArrayList<Object> aggregateAll( + List<AggregatorFactory> aggFactories + ) + { + Aggregator[] aggs = new Aggregator[aggFactories.size()]; + + AtomicInteger currRow = new AtomicInteger(0); + int index = 0; + for (AggregatorFactory aggFactory : aggFactories) { + aggs[index++] = aggFactory.factorize(new ColumnAccessorBasedColumnSelectorFactory(currRow)); + } + + int numRows = rac.numRows(); + int rowId = currRow.get(); + while (rowId < numRows) { + for (Aggregator agg : aggs) { + agg.aggregate(); + } + rowId = currRow.incrementAndGet(); + } + + ArrayList<Object> retVal = new ArrayList<>(aggs.length); + for (Aggregator agg : aggs) { + retVal.add(agg.get()); + } + return retVal; Review Comment: we should close the aggregators before returning ########## processing/src/main/java/org/apache/druid/query/rowsandcols/GroupPartitioner.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols; + +import java.util.List; + +/** + * A semantic interface used to partition a data set based on a given set of dimensions. + */ +@SuppressWarnings("unused") +public interface GroupPartitioner +{ + /** + * Computes the groupings of the underlying rows based on the columns passed in for grouping. The grouping is + * returned as an int[], the length of the array will be equal to the number of rows of data and the values of + * the elements of the array will be the same when the rows are part of the same group and different when the + * rows are part of different groups. This is contrasted with the SortedGroupPartitioner in that, the + * groupings returned are not necessarily contiguous. There is also no sort-order implied by the `int` values + * assigned to each grouping. + * + * @param columns the columns to group with + * @return the groupings, rows with the same int value are in the same group. There is no sort-order implied by the + * int values. + */ + int[] computeGroupings(List<String> columns); Review Comment: just for reminder, that `int[]` would limit the processing to INT_MAX values even if we have memory left. if we want to support partitioning more than that values, we could consider using `BigArrays` from fast-util or something like that. ########## processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableRowsAndColumns.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols; + +import org.apache.druid.query.rowsandcols.column.Column; + +/** + * A RowsAndColumns that supposed appending columns. This interface is particularly useful because even if there is Review Comment: ```suggestion * A RowsAndColumns that supports appending columns. This interface is particularly useful because even if there is ``` ########## processing/src/main/java/org/apache/druid/query/rowsandcols/DefaultOnHeapAggregatable.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols; + +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.segment.BaseSingleValueDimensionSelector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.serde.ComplexMetricSerde; +import org.apache.druid.segment.serde.ComplexMetrics; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +public class DefaultOnHeapAggregatable implements OnHeapAggregatable, OnHeapCumulativeAggregatable +{ + private final RowsAndColumns rac; + + public DefaultOnHeapAggregatable( + RowsAndColumns rac + ) + { + this.rac = rac; + } + + @Override + public ArrayList<Object> aggregateAll( + List<AggregatorFactory> aggFactories + ) + { + Aggregator[] aggs = new Aggregator[aggFactories.size()]; + + AtomicInteger currRow = new AtomicInteger(0); + int index = 0; + for (AggregatorFactory aggFactory : aggFactories) { + aggs[index++] = aggFactory.factorize(new ColumnAccessorBasedColumnSelectorFactory(currRow)); + } + + int numRows = rac.numRows(); + int rowId = currRow.get(); + while (rowId < numRows) { + for (Aggregator agg : aggs) { + agg.aggregate(); + } + rowId = currRow.incrementAndGet(); + } + + ArrayList<Object> retVal = new ArrayList<>(aggs.length); + for (Aggregator agg : aggs) { + retVal.add(agg.get()); + } + return retVal; + } + + @Override + public ArrayList<Object[]> aggregateCumulative(List<AggregatorFactory> aggFactories) + { + Aggregator[] aggs = new Aggregator[aggFactories.size()]; + ArrayList<Object[]> retVal = new ArrayList<>(aggFactories.size()); + + int numRows = rac.numRows(); + AtomicInteger currRow = new AtomicInteger(0); + int index = 0; + for (AggregatorFactory aggFactory : aggFactories) { + aggs[index++] = aggFactory.factorize(new ColumnAccessorBasedColumnSelectorFactory(currRow)); + retVal.add(new Object[numRows]); + } + + int rowId = currRow.get(); + while (rowId < numRows) { + for (int i = 0; i < aggs.length; ++i) { + aggs[i].aggregate(); + retVal.get(i)[rowId] = aggs[i].get(); + } + rowId = currRow.incrementAndGet(); + } + + return retVal; Review Comment: we should close the aggregators before returning ########## processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols; + +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.column.ObjectColumnAccessorBase; +import org.apache.druid.segment.RowAdapter; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Optional; +import java.util.function.Function; + +public class ArrayListRowsAndColumns<RowType> implements RowsAndColumns Review Comment: This is very similar to `RowBasedStorageAdapter` - but I guess we can't reuse it since there are lot of differences in the interface ########## processing/src/main/java/org/apache/druid/query/rowsandcols/OnHeapCumulativeAggregatable.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols; + +import org.apache.druid.query.aggregation.AggregatorFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * A semantic interface used to cumulatively aggregate a list of AggregatorFactories across a given set of data + * <p> + * The aggregation specifically happens on-heap and should be used in places where it is known that the data + * set can be worked with entirely on-heap. + * <p> + * Note, as we implement frame-handling for window aggregations, it is expected that this interface will undergo a + * transformation. It might be deleted and replaced with something else, or might just see a change done in place. + * Either way, there is no assumption of enforced compatibility with this interface at this point in time. + */ +public interface OnHeapCumulativeAggregatable +{ + /** + * Cumulatively aggregates the data using the {@code List<AggregatorFactory} objects. + * + * @param aggFactories definition of aggregations to be done + * @return a list of objects, one per AggregatorFactory. That is, the length of the return list should be equal to + * the length of the aggFactories list passed as an argument, while the length of the internal {@code Object[]} will + * be equivalent to the number of rows Review Comment: can we transpose the dimensions (aggs, rows) of this 2D structure to (rows, aggs)? that could also potentially allow for returning a streaming structure from `aggregateCumulative` to lessen memory reqs. The stream could be consumed by the higher operators. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
