clintropolis commented on code in PR #15700: URL: https://github.com/apache/druid/pull/15700#discussion_r1465686070
########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatoractory.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import org.apache.druid.error.DruidException; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ValueType; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +@JsonTypeName("singleValue") +public class SingleValueAggregatoractory extends AggregatorFactory Review Comment: typo, should be `SingleValueAggregatorFactory` instead of `SingleValueAggregatoractory` ########## sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/SingleValueSqlAggregator.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.aggregation.builtin; + +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.SingleValueAggregatoractory; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.planner.Calcites; + +import javax.annotation.Nullable; + +public class SingleValueSqlAggregator extends SimpleSqlAggregator +{ + + @Override + public SqlAggFunction calciteFunction() + { + return SqlStdOperatorTable.SINGLE_VALUE; + } + + @Override + @Nullable + Aggregation getAggregation( + final String name, + final AggregateCall aggregateCall, + final ExprMacroTable macroTable, + final String fieldName + ) + { + final ColumnType valueType = Calcites.getColumnTypeForRelDataType(aggregateCall.getType()); + if (valueType == null) { + return null; + } + return Aggregation.create(createSingleValueAggregatorFactory( + valueType, + name, + fieldName + )); + } + + static AggregatorFactory createSingleValueAggregatorFactory( + final ColumnType aggregationType, + final String name, + final String fieldName + ) + { + switch (aggregationType.getType()) { + case LONG: + case FLOAT: + case DOUBLE: + case STRING: + return new SingleValueAggregatoractory(name, fieldName, aggregationType); + default: + // This error refers to the Druid type. But, we're in SQL validation. + // It should refer to the SQL type. + throw SimpleSqlAggregator.badTypeException(fieldName, "SINGLE_VALUE", aggregationType); + } Review Comment: do we actually need this check at all? Thinking of things like `ARRAY_AGG` which can aggregate an array of values... ########## sql/src/test/java/org/apache/druid/sql/calcite/CalciteSingleValueAggregatorTest.java: ########## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.FloatMaxAggregatorFactory; +import org.apache.druid.query.aggregation.FloatMinAggregatorFactory; +import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; +import org.apache.druid.query.aggregation.SingleValueAggregatoractory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.filter.InDimFilter; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.orderby.NoopLimitSpec; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.join.JoinType; +import org.apache.druid.sql.calcite.filtration.Filtration; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashSet; + +public class CalciteSingleValueAggregatorTest extends BaseCalciteQueryTest Review Comment: any reason not to just put these test cases in `CalciteSubqueryTest`? ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ValueType; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +/** + * + */ +public class SingleValueBufferAggregator implements BufferAggregator +{ + final ColumnValueSelector selector; + + final ColumnType columnType; + + private int stringByteArrayLength = 0; + + private boolean isAggregateInvoked = false; + + SingleValueBufferAggregator(ColumnValueSelector selector, ColumnType columnType) + { + this.selector = selector; + this.columnType = columnType; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.put(position, NullHandling.IS_NULL_BYTE); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + if (isAggregateInvoked) { + throw InvalidInput.exception("Single Value Aggregator would not be applied to more than one row"); + } + boolean isNotNull; + if (columnType.is(ValueType.STRING)) { + isNotNull = (selector.getObject() != null); + } else { + isNotNull = !selector.isNull(); + } + if (isNotNull) { + if (buf.get(position) == NullHandling.IS_NULL_BYTE) { + buf.put(position, NullHandling.IS_NOT_NULL_BYTE); + } + updatevalue(buf, position + Byte.BYTES); + } Review Comment: Part of the potentially attractive part about pushing the `ColumnType` in here is that we can also use the `NullableTypeStrategy` to implement `aggregate` and `get` methods. I pulled your branch and tried this and it runs into some problems with floats ending up as doubles, though I think this is actually a problem with either the underlying expression selector (expressions don't support floats so I think this is likely the culprit) because `testSingleValueFloatAgg` fails, so we need a helper function to get the object from the underlying selector to ensure that the thing we get from the selector matches the `columnType`. ``` @Override public void aggregate(ByteBuffer buf, int position) { if (isAggregateInvoked) { throw InvalidInput.exception("Single Value Aggregator would not be applied to more than one row"); } int written = typeStrategy.write( buf, position, getSelectorObject(), SingleValueAggregatoractory.DEFAULT_MAX_STRING_SIZE ); if (written < 0) { throw InvalidInput.exception("Single Value Aggregator value too big for buffer"); } isAggregateInvoked = true; } ``` (invalid input might not be the actual right exception here for exceeding size limit, since its only a single row we might want to figure out how to pass this in, or make a much larger limit since the user isn't calling this function directly..) ``` @Nullable @Override public Object get(ByteBuffer buf, int position) { return typeStrategy.read(buf, position); } ``` ``` @Nullable private Object getSelectorObject() { if (selector.isNull()) { return null; } switch (columnType.getType()) { case LONG: return selector.getLong(); case FLOAT: return selector.getFloat(); case DOUBLE: return selector.getDouble(); default: return selector.getObject(); } } ``` The `getLong`/`getFloat`/`getDouble` methods could also be updated, there are some static methods in `TypeStrategies` which can help with this `isNullableNull`, `readNotNullNullableLong`, etc. ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.error.InvalidInput; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +/** + * + */ +public class SingleValueAggregator implements Aggregator +{ + final ColumnValueSelector selector; + + @Nullable + Object value; + + private boolean isNullResult = false; + + private boolean isAggregateInvoked = false; + + public SingleValueAggregator(ColumnValueSelector selector) + { + this.selector = selector; + } + + @Override + public void aggregate() + { + if (isAggregateInvoked) { Review Comment: a null row is still a row in the context of this agg i think so it needs to set `isAggregateInvoked ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
