clintropolis commented on code in PR #15700: URL: https://github.com/apache/druid/pull/15700#discussion_r1470939661
########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import org.apache.druid.error.DruidException; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ValueType; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +@JsonTypeName("singleValue") +public class SingleValueAggregatorFactory extends AggregatorFactory Review Comment: this might be nice to add javadocs for since its primarily to support SQL planner and probably doesn't have many direct use cases ########## sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java: ########## @@ -1042,4 +1049,312 @@ public void testJoinWithSubqueries() results ); } + + @Test + public void testSingleValueFloatAgg() + { + msqIncompatible(); + skipVectorize(); + cannotVectorize(); + testQuery( + "SELECT count(*) FROM foo where m1 <= (select min(m1) + 4 from foo)", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(join( + new TableDataSource(CalciteTests.DATASOURCE1), + new QueryDataSource(GroupByQuery.builder() + .setDataSource(new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .granularity(Granularities.ALL) + .aggregators(new FloatMinAggregatorFactory("a0", "m1")) + .build() + )) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setVirtualColumns(expressionVirtualColumn( + "v0", + "(\"a0\" + 4)", + ColumnType.FLOAT + ) + ) + .setAggregatorSpecs( + aggregators( + new SingleValueAggregatorFactory( + "_a0", + "v0", + ColumnType.FLOAT + ) + ) + ) + .setLimitSpec(NoopLimitSpec.instance()) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + "j0.", + "1", + JoinType.INNER + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .granularity(Granularities.ALL) + .aggregators(aggregators(new CountAggregatorFactory("a0"))) + .filters(expressionFilter("(\"m1\" <= \"j0._a0\")")) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{5L} + ) + ); + } + + @Test + public void testSingleValueDoubleAgg() + { + msqIncompatible(); + skipVectorize(); + cannotVectorize(); + testQuery( + "SELECT count(*) FROM foo where m1 >= (select max(m1) - 3.5 from foo)", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(join( + new TableDataSource(CalciteTests.DATASOURCE1), + new QueryDataSource(GroupByQuery.builder() + .setDataSource(new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .granularity(Granularities.ALL) + .aggregators(new FloatMaxAggregatorFactory("a0", "m1")) + .build() + )) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setVirtualColumns(expressionVirtualColumn( + "v0", + "(\"a0\" - 3.5)", + ColumnType.DOUBLE + ) + ) + .setAggregatorSpecs( + aggregators( + new SingleValueAggregatorFactory( + "_a0", + "v0", + ColumnType.DOUBLE + ) + ) + ) + .setLimitSpec(NoopLimitSpec.instance()) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() Review Comment: hmm, this doesn't need to be part of this PR, but it seems like there is room for improvement in the planner here... Like in this case i think the planner should just collapse this subquery into a single timeseries query with an expression post-aggregator, instead of a group by on a timeseries ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.error.InvalidInput; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +/** + * + */ +public class SingleValueAggregator implements Aggregator +{ + final ColumnValueSelector selector; + + @Nullable + Object value; + + private boolean isNullResult = false; + + private boolean isAggregateInvoked = false; + + public SingleValueAggregator(ColumnValueSelector selector) + { + this.selector = selector; + } + + @Override + public void aggregate() + { + if (isAggregateInvoked) { + throw InvalidInput.exception("Single Value Aggregator would not be applied to more than one row"); + } + boolean isNotNull = !selector.isNull(); Review Comment: like in the buffer agg, this is probably only accurate for numeric primitive selectors, since typically things that call `getObject` do not check `isNull`. `isNull` is mainly used when you plan to call `getLong` or the like (since java primitives cannot be null, so they use this method instead). You could potentially just always call getObject, or could have a method similar to the method for the buffer agg though not sure its as useful here since getObject usually always works (i think) ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.error.InvalidInput; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +/** + * + */ +public class SingleValueAggregator implements Aggregator +{ + final ColumnValueSelector selector; + + @Nullable + Object value; + + private boolean isNullResult = false; + + private boolean isAggregateInvoked = false; + + public SingleValueAggregator(ColumnValueSelector selector) + { + this.selector = selector; + } + + @Override + public void aggregate() + { + if (isAggregateInvoked) { + throw InvalidInput.exception("Single Value Aggregator would not be applied to more than one row"); + } + boolean isNotNull = !selector.isNull(); + if (isNotNull) { + isNullResult = false; + value = selector.getObject(); + } + isAggregateInvoked = true; + } + + @Override + public Object get() + { + if (isNullResult) { + return null; + } + return value; Review Comment: this can just return value directly? ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.error.InvalidInput; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +/** + * + */ Review Comment: nit: can just delete if not filling out javadocs ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.NullableTypeStrategy; +import org.apache.druid.segment.column.TypeStrategies; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * + */ +public class SingleValueBufferAggregator implements BufferAggregator +{ + final ColumnValueSelector selector; + + final ColumnType columnType; + + final NullableTypeStrategy typeStrategy; + + private boolean isAggregateInvoked = false; + + SingleValueBufferAggregator(ColumnValueSelector selector, ColumnType columnType) + { + this.selector = selector; + this.columnType = columnType; + this.typeStrategy = columnType.getNullableStrategy(); + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.put(position, NullHandling.IS_NULL_BYTE); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + if (isAggregateInvoked) { + throw InvalidInput.exception("Single Value Aggregator would not be applied to more than one row"); + } + + int written = typeStrategy.write( + buf, + position, + getSelectorObject(), + SingleValueAggregatorFactory.DEFAULT_MAX_STRING_SIZE + Byte.BYTES + ); + if (written < 0) { + throw new ISE("Single Value Aggregator value exceeds buffer limit"); + } + isAggregateInvoked = true; + } + + @Nullable + private Object getSelectorObject() + { + if (columnType.isNumeric() && selector.isNull()) { + return null; + } + switch (columnType.getType()) { + case LONG: + return selector.getLong(); + case FLOAT: + return selector.getFloat(); + case DOUBLE: + return selector.getDouble(); + default: + return selector.getObject(); + } + } + + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + return typeStrategy.read(buf, position); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + if (TypeStrategies.isNullableNull(buf, position)) { + throw new IllegalStateException("Cannot return float for Null Value"); Review Comment: nit: can use `DruidException.defensive` since this shouldn't happen in practice because callers should know to call `isNull` if values can be null (if it happens it is a coding error probably) ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.error.InvalidInput; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +/** + * + */ +public class SingleValueAggregator implements Aggregator +{ + final ColumnValueSelector selector; + + @Nullable + Object value; + + private boolean isNullResult = false; + + private boolean isAggregateInvoked = false; + + public SingleValueAggregator(ColumnValueSelector selector) + { + this.selector = selector; + } + + @Override + public void aggregate() + { + if (isAggregateInvoked) { + throw InvalidInput.exception("Single Value Aggregator would not be applied to more than one row"); Review Comment: Since this is unlikely to be used directly, I wonder if this error message should mention sub-queries? I was playing around with postgres and it returns something like ```error: more than one row returned by a subquery used as an expression``` so i wonder if we should do something similar (probably don't copy it exactly :p). It might be nice to include the name of the input column. Be sure to update the buffer agg error if we change this too ########## sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/SingleValueSqlAggregator.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.aggregation.builtin; + +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.SingleValueAggregatorFactory; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.planner.Calcites; + +import javax.annotation.Nullable; + +public class SingleValueSqlAggregator extends SimpleSqlAggregator +{ + + @Override + public SqlAggFunction calciteFunction() + { + return SqlStdOperatorTable.SINGLE_VALUE; + } + + @Override + @Nullable + Aggregation getAggregation( + final String name, + final AggregateCall aggregateCall, + final ExprMacroTable macroTable, + final String fieldName + ) + { + final ColumnType valueType = Calcites.getColumnTypeForRelDataType(aggregateCall.getType()); + if (valueType == null) { + return null; + } + return Aggregation.create(createSingleValueAggregatorFactory( + valueType, + name, + fieldName + )); + } + + static AggregatorFactory createSingleValueAggregatorFactory( + final ColumnType aggregationType, + final String name, + final String fieldName + ) + { + switch (aggregationType.getType()) { + case LONG: + case FLOAT: + case DOUBLE: + case STRING: + return new SingleValueAggregatorFactory(name, fieldName, aggregationType); + default: + // This error refers to the Druid type. But, we're in SQL validation. + // It should refer to the SQL type. + throw SimpleSqlAggregator.badTypeException(fieldName, "SINGLE_VALUE", aggregationType); + } Review Comment: i think i already asked this, but is this needed or can it just always make a `SingleValueAggregatorFactory` with any `aggregationType`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
