LakshSingla commented on code in PR #15700: URL: https://github.com/apache/druid/pull/15700#discussion_r1479315297
########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +public class SingleValueAggregator implements Aggregator +{ + final ColumnValueSelector selector; + @Nullable + Object value; + private boolean isNullResult = true; + private boolean isAggregateInvoked = false; + + public SingleValueAggregator(ColumnValueSelector selector) + { + this.selector = selector; + } + + @Override + public void aggregate() + { + if (isAggregateInvoked) { + throw InvalidInput.exception("Subquery expression returned more than one row"); + } + Object selectorObject = selector.getObject(); + boolean isNotNull = (selectorObject != null); + if (isNotNull) { + isNullResult = false; + value = selectorObject; + } Review Comment: nit: Coding style: Now that it's written this way, it seems redundant to have `isNullResult` and `isNotNull`. Following seems a much cleaner way ```suggestion value = selector.getObject(); } ``` We don't use any null check here. The method `isNull` will do something like: ```java public boolean isNull() { return value == null; } ``` Then the methods relying on the `isNullResult` can use `isNull()` instead. ```java public float getFloat() { return isNull() ? NullHandling.ZERO_FLOAT : ((Number) value).floatValue(); } ``` This will prevent the code from maintaining two variables denoting same information. ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +public class SingleValueAggregator implements Aggregator +{ + final ColumnValueSelector selector; + @Nullable + Object value; Review Comment: nit: `private` for consistency. ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.NullableTypeStrategy; +import org.apache.druid.segment.column.TypeStrategies; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class SingleValueBufferAggregator implements BufferAggregator +{ + final ColumnValueSelector selector; + final ColumnType columnType; + final NullableTypeStrategy typeStrategy; + private boolean isAggregateInvoked = false; + + SingleValueBufferAggregator(ColumnValueSelector selector, ColumnType columnType) + { + this.selector = selector; + this.columnType = columnType; + this.typeStrategy = columnType.getNullableStrategy(); + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.put(position, NullHandling.IS_NULL_BYTE); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + if (isAggregateInvoked) { + throw InvalidInput.exception("Subquery expression returned more than one row"); + } + + int written = typeStrategy.write( + buf, + position, + getSelectorObject(), + columnType.isNumeric() ? Double.BYTES + Byte.BYTES : SingleValueAggregatorFactory.DEFAULT_MAX_BUFFER_SIZE + ); + if (written < 0) { + throw DruidException.forPersona(DruidException.Persona.ADMIN) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Single Value Aggregator value exceeds buffer limit"); Review Comment: Error message isn't denoting what happens. This should also be along the lines of "subquery's results exceed buffer limit [%s]", BUFFER_LIMIT". Also, the error should mention the buffer size, so that the user knows what might be going on. ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import org.apache.druid.error.DruidException; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnType; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * This AggregatorFactory is meant to wrap the subquery used as an expression into a single value + * and is expected to throw an exception when the subquery results in more than one row + * + * <p> + * This consumes columnType as well along with name and fieldName to pass it on to underlying + * {@link SingleValueBufferAggregator} to work with different ColumnTypes + */ +@JsonTypeName("singleValue") +public class SingleValueAggregatorFactory extends AggregatorFactory +{ + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + private final String name; + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + private final String fieldName; + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + private final ColumnType columnType; + public static final int DEFAULT_MAX_BUFFER_SIZE = 1025; + + @JsonCreator + public SingleValueAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") final String fieldName, + @JsonProperty("columnType") final ColumnType columnType + ) + { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + + this.name = name; + this.fieldName = fieldName; Review Comment: nit: can combine statements. Also, the error message and class thrown by the check is sufficient, so perhaps we don't need a verbose message, and can directly refer to the name ```suggestion this.name = Preconditions.checkNotNull(name, "name"); this.fieldName = Preconditions.checkNotNull(fieldName, "fieldName"); ``` ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java: ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import org.apache.druid.error.DruidException; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ValueType; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * This AggregatorFactory is meant to wrap the subquery used as an expression into a single value + * and is expected to throw an exception when the subquery results in more than one row + * + * <p> + * This consumes columnType as well along with name and fieldName to pass it on to underlying + * {@link SingleValueBufferAggregator} to work with different ColumnTypes + */ +@JsonTypeName("singleValue") +public class SingleValueAggregatorFactory extends AggregatorFactory +{ + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + private final String name; + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + private final String fieldName; + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + private final ColumnType columnType; + + public static final int DEFAULT_MAX_STRING_SIZE = 1024; + + @JsonCreator + public SingleValueAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") final String fieldName, + @JsonProperty("columnType") final ColumnType columnType + ) + { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + + this.name = name; + this.fieldName = fieldName; + this.columnType = columnType; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + return new SingleValueAggregator(selector); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + ColumnCapabilities columnCapabilities = metricFactory.getColumnCapabilities(fieldName); + ColumnType columnType = new ColumnType(columnCapabilities.getType(), null, null); + return new SingleValueBufferAggregator(selector, columnType); + } + + @Override + public Comparator getComparator() + { + throw DruidException.defensive("Single Value Aggregator would not have more than one row to compare"); + } + + @Override + @Nullable + public Object combine(@Nullable Object lhs, @Nullable Object rhs) Review Comment: Thanks! I think we should add a comment then, stating the combine is never called, and the combining aggregator is called for the fieldName only. ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import org.apache.druid.error.DruidException; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnType; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * This AggregatorFactory is meant to wrap the subquery used as an expression into a single value + * and is expected to throw an exception when the subquery results in more than one row + * + * <p> + * This consumes columnType as well along with name and fieldName to pass it on to underlying + * {@link SingleValueBufferAggregator} to work with different ColumnTypes + */ +@JsonTypeName("singleValue") +public class SingleValueAggregatorFactory extends AggregatorFactory +{ + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) Review Comment: This JsonInclude is not required, name and fieldName are never null (check the preconditions in the constructor). Also, will columnType ever be null? Will the aggregator work correctly in that case? ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.NullableTypeStrategy; +import org.apache.druid.segment.column.TypeStrategies; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class SingleValueBufferAggregator implements BufferAggregator +{ + final ColumnValueSelector selector; + final ColumnType columnType; + final NullableTypeStrategy typeStrategy; + private boolean isAggregateInvoked = false; + + SingleValueBufferAggregator(ColumnValueSelector selector, ColumnType columnType) + { + this.selector = selector; + this.columnType = columnType; + this.typeStrategy = columnType.getNullableStrategy(); + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.put(position, NullHandling.IS_NULL_BYTE); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + if (isAggregateInvoked) { + throw InvalidInput.exception("Subquery expression returned more than one row"); + } + + int written = typeStrategy.write( + buf, + position, + getSelectorObject(), + columnType.isNumeric() ? Double.BYTES + Byte.BYTES : SingleValueAggregatorFactory.DEFAULT_MAX_BUFFER_SIZE Review Comment: Not very relevant, but I think we don't take care of longs and floats properly. ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import org.apache.druid.error.DruidException; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnType; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * This AggregatorFactory is meant to wrap the subquery used as an expression into a single value + * and is expected to throw an exception when the subquery results in more than one row + * + * <p> + * This consumes columnType as well along with name and fieldName to pass it on to underlying + * {@link SingleValueBufferAggregator} to work with different ColumnTypes + */ +@JsonTypeName("singleValue") +public class SingleValueAggregatorFactory extends AggregatorFactory +{ + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + private final String name; + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + private final String fieldName; + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + private final ColumnType columnType; + public static final int DEFAULT_MAX_BUFFER_SIZE = 1025; + + @JsonCreator + public SingleValueAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") final String fieldName, + @JsonProperty("columnType") final ColumnType columnType + ) + { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + + this.name = name; + this.fieldName = fieldName; + this.columnType = columnType; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + return new SingleValueAggregator(selector); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + ColumnCapabilities columnCapabilities = metricFactory.getColumnCapabilities(fieldName); + ColumnType columnType = new ColumnType(columnCapabilities.getType(), null, null); + return new SingleValueBufferAggregator(selector, columnType); + } + + @Override + public Comparator getComparator() + { + throw DruidException.defensive("Single Value Aggregator would not have more than one row to compare"); + } + + @Override + @Nullable + public Object combine(@Nullable Object lhs, @Nullable Object rhs) + { + throw DruidException.defensive("Single Value Aggregator would not have more than one row to combine"); + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new SingleValueAggregatorFactory(name, name, columnType); + } + + @Override + public Object deserialize(Object object) + { + return object; + } + + @Nullable + @Override + public Object finalizeComputation(@Nullable Object object) + { + return object; + } + + @Override + public ColumnType getIntermediateType() + { + return columnType; + } + + @Override + public ColumnType getResultType() + { + return columnType; + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @Override + public List<String> requiredFields() + { + return Collections.singletonList(fieldName); + } + + @Override + public int getMaxIntermediateSize() + { + if (columnType.isNumeric()) { + return Byte.BYTES + Double.BYTES; + } + return DEFAULT_MAX_BUFFER_SIZE; Review Comment: nit: Is this correct, for long values? ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.NullableTypeStrategy; +import org.apache.druid.segment.column.TypeStrategies; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class SingleValueBufferAggregator implements BufferAggregator +{ + final ColumnValueSelector selector; + final ColumnType columnType; + final NullableTypeStrategy typeStrategy; + private boolean isAggregateInvoked = false; Review Comment: nit: mark private for consistency ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +public class SingleValueAggregator implements Aggregator +{ + final ColumnValueSelector selector; + @Nullable + Object value; + private boolean isNullResult = true; + private boolean isAggregateInvoked = false; + + public SingleValueAggregator(ColumnValueSelector selector) + { + this.selector = selector; + } + + @Override + public void aggregate() + { + if (isAggregateInvoked) { + throw InvalidInput.exception("Subquery expression returned more than one row"); + } + Object selectorObject = selector.getObject(); + boolean isNotNull = (selectorObject != null); + if (isNotNull) { + isNullResult = false; + value = selectorObject; + } + isAggregateInvoked = true; + } + + @Override + public Object get() + { + return value; + } + + @Override + public float getFloat() + { + return isNullResult ? NullHandling.ZERO_FLOAT : ((Number) value).floatValue(); + } + + @Override + public long getLong() + { + return isNullResult ? NullHandling.ZERO_LONG : ((Number) value).longValue(); + } + + @Override + public double getDouble() + { + return isNullResult ? NullHandling.ZERO_DOUBLE : ((Number) value).doubleValue(); + } + + @Override + public boolean isNull() + { + return isNullResult; Review Comment: Let's keep isNull() true in sql compatible mode. Again, its more of defensive coding, and calling code should do their due dilligence ```suggestion return NullHandling.sqlCompatible() && value == null; ``` ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import org.apache.druid.error.DruidException; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnType; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * This AggregatorFactory is meant to wrap the subquery used as an expression into a single value + * and is expected to throw an exception when the subquery results in more than one row + * + * <p> + * This consumes columnType as well along with name and fieldName to pass it on to underlying + * {@link SingleValueBufferAggregator} to work with different ColumnTypes + */ +@JsonTypeName("singleValue") +public class SingleValueAggregatorFactory extends AggregatorFactory +{ + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + private final String name; + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + private final String fieldName; + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + private final ColumnType columnType; + public static final int DEFAULT_MAX_BUFFER_SIZE = 1025; Review Comment: Why 1025, and not 1024? (Latter seems more "correct") ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import org.apache.druid.error.DruidException; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnType; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * This AggregatorFactory is meant to wrap the subquery used as an expression into a single value + * and is expected to throw an exception when the subquery results in more than one row + * + * <p> + * This consumes columnType as well along with name and fieldName to pass it on to underlying + * {@link SingleValueBufferAggregator} to work with different ColumnTypes + */ +@JsonTypeName("singleValue") +public class SingleValueAggregatorFactory extends AggregatorFactory +{ + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + private final String name; + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + private final String fieldName; + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + private final ColumnType columnType; + public static final int DEFAULT_MAX_BUFFER_SIZE = 1025; + + @JsonCreator + public SingleValueAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") final String fieldName, + @JsonProperty("columnType") final ColumnType columnType + ) + { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + + this.name = name; + this.fieldName = fieldName; + this.columnType = columnType; Review Comment: Can columnType be null? ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +public class SingleValueAggregator implements Aggregator +{ + final ColumnValueSelector selector; + @Nullable + Object value; + private boolean isNullResult = true; + private boolean isAggregateInvoked = false; + + public SingleValueAggregator(ColumnValueSelector selector) + { + this.selector = selector; + } + + @Override + public void aggregate() + { + if (isAggregateInvoked) { + throw InvalidInput.exception("Subquery expression returned more than one row"); + } + Object selectorObject = selector.getObject(); + boolean isNotNull = (selectorObject != null); + if (isNotNull) { + isNullResult = false; + value = selectorObject; + } + isAggregateInvoked = true; + } + + @Override + public Object get() + { + return value; + } + + @Override + public float getFloat() + { + return isNullResult ? NullHandling.ZERO_FLOAT : ((Number) value).floatValue(); + } + + @Override + public long getLong() + { + return isNullResult ? NullHandling.ZERO_LONG : ((Number) value).longValue(); + } + + @Override + public double getDouble() + { + return isNullResult ? NullHandling.ZERO_DOUBLE : ((Number) value).doubleValue(); + } + + @Override + public boolean isNull() + { + return isNullResult; + } + + @Override + public void close() + { + // no resources to cleanup + } + + @Override + public String toString() + { + return "SingleValueAggregator{" + + "selector=" + selector + + '}'; Review Comment: Shouldn't this also print the `value` and `aggregateInvoked`? ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +public class SingleValueAggregator implements Aggregator +{ + final ColumnValueSelector selector; + @Nullable + Object value; + private boolean isNullResult = true; + private boolean isAggregateInvoked = false; + + public SingleValueAggregator(ColumnValueSelector selector) + { + this.selector = selector; + } + + @Override + public void aggregate() + { + if (isAggregateInvoked) { + throw InvalidInput.exception("Subquery expression returned more than one row"); + } + Object selectorObject = selector.getObject(); + boolean isNotNull = (selectorObject != null); + if (isNotNull) { + isNullResult = false; + value = selectorObject; + } + isAggregateInvoked = true; + } + + @Override + public Object get() + { + return value; + } + + @Override + public float getFloat() + { + return isNullResult ? NullHandling.ZERO_FLOAT : ((Number) value).floatValue(); + } + + @Override + public long getLong() + { + return isNullResult ? NullHandling.ZERO_LONG : ((Number) value).longValue(); Review Comment: Let's add an assertion in the primitive selectors that if the mode is sql compatible, then the selector cannot be null. ```suggestion assert NullHandling.replaceWithDefault() || !isNull(); return isNull() ? NullHandling.ZERO_LONG : ((Number) value).longValue(); ``` ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.NullableTypeStrategy; +import org.apache.druid.segment.column.TypeStrategies; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class SingleValueBufferAggregator implements BufferAggregator +{ + final ColumnValueSelector selector; + final ColumnType columnType; + final NullableTypeStrategy typeStrategy; + private boolean isAggregateInvoked = false; + + SingleValueBufferAggregator(ColumnValueSelector selector, ColumnType columnType) Review Comment: `public`, since the `SingleValueAggregator` is also public scope ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.NullableTypeStrategy; +import org.apache.druid.segment.column.TypeStrategies; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class SingleValueBufferAggregator implements BufferAggregator +{ + final ColumnValueSelector selector; + final ColumnType columnType; + final NullableTypeStrategy typeStrategy; + private boolean isAggregateInvoked = false; + + SingleValueBufferAggregator(ColumnValueSelector selector, ColumnType columnType) + { + this.selector = selector; + this.columnType = columnType; + this.typeStrategy = columnType.getNullableStrategy(); + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.put(position, NullHandling.IS_NULL_BYTE); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + if (isAggregateInvoked) { + throw InvalidInput.exception("Subquery expression returned more than one row"); + } + + int written = typeStrategy.write( + buf, + position, + getSelectorObject(), + columnType.isNumeric() ? Double.BYTES + Byte.BYTES : SingleValueAggregatorFactory.DEFAULT_MAX_BUFFER_SIZE + ); + if (written < 0) { + throw DruidException.forPersona(DruidException.Persona.ADMIN) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Single Value Aggregator value exceeds buffer limit"); + } + isAggregateInvoked = true; + } + + @Nullable + private Object getSelectorObject() + { + if (columnType.isNumeric() && selector.isNull()) { + return null; + } + switch (columnType.getType()) { + case LONG: + return selector.getLong(); + case FLOAT: + return selector.getFloat(); + case DOUBLE: + return selector.getDouble(); + default: + return selector.getObject(); + } + } + + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + return typeStrategy.read(buf, position); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + return TypeStrategies.isNullableNull(buf, position) Review Comment: I wonder if two calls are required, since we already have the nullable type strategy, so perhaps we can read it once, and check if that is null. However, not important, since its called for a single row only. ########## processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.NullableTypeStrategy; +import org.apache.druid.segment.column.TypeStrategies; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class SingleValueBufferAggregator implements BufferAggregator +{ + final ColumnValueSelector selector; + final ColumnType columnType; + final NullableTypeStrategy typeStrategy; + private boolean isAggregateInvoked = false; + + SingleValueBufferAggregator(ColumnValueSelector selector, ColumnType columnType) + { + this.selector = selector; + this.columnType = columnType; + this.typeStrategy = columnType.getNullableStrategy(); Review Comment: This would NPE if columnType is null, hence we should add the null check in the aggregator factory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
