LakshSingla commented on code in PR #15700:
URL: https://github.com/apache/druid/pull/15700#discussion_r1479315297


##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+public class SingleValueAggregator implements Aggregator
+{
+  final ColumnValueSelector selector;
+  @Nullable
+  Object value;
+  private boolean isNullResult = true;
+  private boolean isAggregateInvoked = false;
+
+  public SingleValueAggregator(ColumnValueSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public void aggregate()
+  {
+    if (isAggregateInvoked) {
+      throw InvalidInput.exception("Subquery expression returned more than one 
row");
+    }
+    Object selectorObject = selector.getObject();
+    boolean isNotNull = (selectorObject != null);
+    if (isNotNull) {
+      isNullResult = false;
+      value = selectorObject;
+    }

Review Comment:
   nit: Coding style:
   
   Now that it's written this way, it seems redundant to have `isNullResult` 
and `isNotNull`.
   Following seems a much cleaner way
   ```suggestion
     value = selector.getObject();
       }
   ```
   We don't use any null check here.
   
   The method `isNull` will do something like:
   ```java
     public boolean isNull()
     {
       return value == null;
     }
   ```
   Then the methods relying on the `isNullResult` can use `isNull()` instead.
   ```java
     public float getFloat()
     {
       return isNull() ? NullHandling.ZERO_FLOAT : ((Number) 
value).floatValue();
     }
   ```
   This will prevent the code from maintaining two variables denoting same 
information. 



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+public class SingleValueAggregator implements Aggregator
+{
+  final ColumnValueSelector selector;
+  @Nullable
+  Object value;

Review Comment:
   nit: `private` for consistency. 



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.TypeStrategies;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class SingleValueBufferAggregator implements BufferAggregator
+{
+  final ColumnValueSelector selector;
+  final ColumnType columnType;
+  final NullableTypeStrategy typeStrategy;
+  private boolean isAggregateInvoked = false;
+
+  SingleValueBufferAggregator(ColumnValueSelector selector, ColumnType 
columnType)
+  {
+    this.selector = selector;
+    this.columnType = columnType;
+    this.typeStrategy = columnType.getNullableStrategy();
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, NullHandling.IS_NULL_BYTE);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    if (isAggregateInvoked) {
+      throw InvalidInput.exception("Subquery expression returned more than one 
row");
+    }
+
+    int written = typeStrategy.write(
+        buf,
+        position,
+        getSelectorObject(),
+        columnType.isNumeric() ? Double.BYTES + Byte.BYTES : 
SingleValueAggregatorFactory.DEFAULT_MAX_BUFFER_SIZE
+    );
+    if (written < 0) {
+      throw DruidException.forPersona(DruidException.Persona.ADMIN)
+                          .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                          .build("Single Value Aggregator value exceeds buffer 
limit");

Review Comment:
   Error message isn't denoting what happens. This should also be along the 
lines of "subquery's results exceed buffer limit [%s]", BUFFER_LIMIT".
   Also, the error should mention the buffer size, so that the user knows what 
might be going on.



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This AggregatorFactory is meant to wrap the subquery used as an expression 
into a single value
+ * and is expected to throw an exception when the subquery results in more 
than one row
+ *
+ * <p>
+ * This consumes columnType as well along with name and fieldName to pass it 
on to underlying
+ * {@link SingleValueBufferAggregator} to work with different ColumnTypes
+ */
+@JsonTypeName("singleValue")
+public class SingleValueAggregatorFactory extends AggregatorFactory
+{
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String name;
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String fieldName;
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final ColumnType columnType;
+  public static final int DEFAULT_MAX_BUFFER_SIZE = 1025;
+
+  @JsonCreator
+  public SingleValueAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("columnType") final ColumnType columnType
+  )
+  {
+    Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator 
name");
+    Preconditions.checkNotNull(fieldName, "Must have a valid, non-null 
fieldName");
+
+    this.name = name;
+    this.fieldName = fieldName;

Review Comment:
   nit: can combine statements. Also, the error message and class thrown by the 
check is sufficient, so perhaps we don't need a verbose message, and can 
directly refer to the name
   ```suggestion
       this.name = Preconditions.checkNotNull(name, "name");
       this.fieldName = Preconditions.checkNotNull(fieldName, "fieldName");
   ```



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This AggregatorFactory is meant to wrap the subquery used as an expression 
into a single value
+ * and is expected to throw an exception when the subquery results in more 
than one row
+ *
+ * <p>
+ * This consumes columnType as well along with name and fieldName to pass it 
on to underlying
+ * {@link SingleValueBufferAggregator} to work with different ColumnTypes
+ */
+@JsonTypeName("singleValue")
+public class SingleValueAggregatorFactory extends AggregatorFactory
+{
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String name;
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String fieldName;
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final ColumnType columnType;
+
+  public static final int DEFAULT_MAX_STRING_SIZE = 1024;
+
+  @JsonCreator
+  public SingleValueAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("columnType") final ColumnType columnType
+  )
+  {
+    Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator 
name");
+    Preconditions.checkNotNull(fieldName, "Must have a valid, non-null 
fieldName");
+
+    this.name = name;
+    this.fieldName = fieldName;
+    this.columnType = columnType;
+  }
+
+  @Override
+  public Aggregator factorize(ColumnSelectorFactory metricFactory)
+  {
+    ColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
+    return new SingleValueAggregator(selector);
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
+  {
+    ColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
+    ColumnCapabilities columnCapabilities = 
metricFactory.getColumnCapabilities(fieldName);
+    ColumnType columnType = new ColumnType(columnCapabilities.getType(), null, 
null);
+    return new SingleValueBufferAggregator(selector, columnType);
+  }
+
+  @Override
+  public Comparator getComparator()
+  {
+    throw DruidException.defensive("Single Value Aggregator would not have 
more than one row to compare");
+  }
+
+  @Override
+  @Nullable
+  public Object combine(@Nullable Object lhs, @Nullable Object rhs)

Review Comment:
   Thanks! I think we should add a comment then, stating the combine is never 
called, and the combining aggregator is called for the fieldName only.



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This AggregatorFactory is meant to wrap the subquery used as an expression 
into a single value
+ * and is expected to throw an exception when the subquery results in more 
than one row
+ *
+ * <p>
+ * This consumes columnType as well along with name and fieldName to pass it 
on to underlying
+ * {@link SingleValueBufferAggregator} to work with different ColumnTypes
+ */
+@JsonTypeName("singleValue")
+public class SingleValueAggregatorFactory extends AggregatorFactory
+{
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)

Review Comment:
   This JsonInclude is not required, name and fieldName are never null (check 
the preconditions in the constructor). Also, will columnType ever be null? Will 
the aggregator work correctly in that case? 



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.TypeStrategies;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class SingleValueBufferAggregator implements BufferAggregator
+{
+  final ColumnValueSelector selector;
+  final ColumnType columnType;
+  final NullableTypeStrategy typeStrategy;
+  private boolean isAggregateInvoked = false;
+
+  SingleValueBufferAggregator(ColumnValueSelector selector, ColumnType 
columnType)
+  {
+    this.selector = selector;
+    this.columnType = columnType;
+    this.typeStrategy = columnType.getNullableStrategy();
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, NullHandling.IS_NULL_BYTE);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    if (isAggregateInvoked) {
+      throw InvalidInput.exception("Subquery expression returned more than one 
row");
+    }
+
+    int written = typeStrategy.write(
+        buf,
+        position,
+        getSelectorObject(),
+        columnType.isNumeric() ? Double.BYTES + Byte.BYTES : 
SingleValueAggregatorFactory.DEFAULT_MAX_BUFFER_SIZE

Review Comment:
   Not very relevant, but I think we don't take care of longs and floats 
properly. 



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This AggregatorFactory is meant to wrap the subquery used as an expression 
into a single value
+ * and is expected to throw an exception when the subquery results in more 
than one row
+ *
+ * <p>
+ * This consumes columnType as well along with name and fieldName to pass it 
on to underlying
+ * {@link SingleValueBufferAggregator} to work with different ColumnTypes
+ */
+@JsonTypeName("singleValue")
+public class SingleValueAggregatorFactory extends AggregatorFactory
+{
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String name;
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String fieldName;
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final ColumnType columnType;
+  public static final int DEFAULT_MAX_BUFFER_SIZE = 1025;
+
+  @JsonCreator
+  public SingleValueAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("columnType") final ColumnType columnType
+  )
+  {
+    Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator 
name");
+    Preconditions.checkNotNull(fieldName, "Must have a valid, non-null 
fieldName");
+
+    this.name = name;
+    this.fieldName = fieldName;
+    this.columnType = columnType;
+  }
+
+  @Override
+  public Aggregator factorize(ColumnSelectorFactory metricFactory)
+  {
+    ColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
+    return new SingleValueAggregator(selector);
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
+  {
+    ColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
+    ColumnCapabilities columnCapabilities = 
metricFactory.getColumnCapabilities(fieldName);
+    ColumnType columnType = new ColumnType(columnCapabilities.getType(), null, 
null);
+    return new SingleValueBufferAggregator(selector, columnType);
+  }
+
+  @Override
+  public Comparator getComparator()
+  {
+    throw DruidException.defensive("Single Value Aggregator would not have 
more than one row to compare");
+  }
+
+  @Override
+  @Nullable
+  public Object combine(@Nullable Object lhs, @Nullable Object rhs)
+  {
+    throw DruidException.defensive("Single Value Aggregator would not have 
more than one row to combine");
+  }
+
+  @Override
+  public AggregatorFactory getCombiningFactory()
+  {
+    return new SingleValueAggregatorFactory(name, name, columnType);
+  }
+
+  @Override
+  public Object deserialize(Object object)
+  {
+    return object;
+  }
+
+  @Nullable
+  @Override
+  public Object finalizeComputation(@Nullable Object object)
+  {
+    return object;
+  }
+
+  @Override
+  public ColumnType getIntermediateType()
+  {
+    return columnType;
+  }
+
+  @Override
+  public ColumnType getResultType()
+  {
+    return columnType;
+  }
+
+  @Override
+  @JsonProperty
+  public String getName()
+  {
+    return name;
+  }
+
+  @JsonProperty
+  public String getFieldName()
+  {
+    return fieldName;
+  }
+
+  @Override
+  public List<String> requiredFields()
+  {
+    return Collections.singletonList(fieldName);
+  }
+
+  @Override
+  public int getMaxIntermediateSize()
+  {
+    if (columnType.isNumeric()) {
+      return Byte.BYTES + Double.BYTES;
+    }
+    return DEFAULT_MAX_BUFFER_SIZE;

Review Comment:
   nit: Is this correct, for long values?



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.TypeStrategies;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class SingleValueBufferAggregator implements BufferAggregator
+{
+  final ColumnValueSelector selector;
+  final ColumnType columnType;
+  final NullableTypeStrategy typeStrategy;
+  private boolean isAggregateInvoked = false;

Review Comment:
   nit: mark private for consistency 



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+public class SingleValueAggregator implements Aggregator
+{
+  final ColumnValueSelector selector;
+  @Nullable
+  Object value;
+  private boolean isNullResult = true;
+  private boolean isAggregateInvoked = false;
+
+  public SingleValueAggregator(ColumnValueSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public void aggregate()
+  {
+    if (isAggregateInvoked) {
+      throw InvalidInput.exception("Subquery expression returned more than one 
row");
+    }
+    Object selectorObject = selector.getObject();
+    boolean isNotNull = (selectorObject != null);
+    if (isNotNull) {
+      isNullResult = false;
+      value = selectorObject;
+    }
+    isAggregateInvoked = true;
+  }
+
+  @Override
+  public Object get()
+  {
+    return value;
+  }
+
+  @Override
+  public float getFloat()
+  {
+    return isNullResult ? NullHandling.ZERO_FLOAT : ((Number) 
value).floatValue();
+  }
+
+  @Override
+  public long getLong()
+  {
+    return isNullResult ? NullHandling.ZERO_LONG : ((Number) 
value).longValue();
+  }
+
+  @Override
+  public double getDouble()
+  {
+    return isNullResult ? NullHandling.ZERO_DOUBLE : ((Number) 
value).doubleValue();
+  }
+
+  @Override
+  public boolean isNull()
+  {
+    return isNullResult;

Review Comment:
   Let's keep isNull() true in sql compatible mode. Again, its more of 
defensive coding, and calling code should do their due dilligence
   ```suggestion
       return NullHandling.sqlCompatible() && value == null;
   ```



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This AggregatorFactory is meant to wrap the subquery used as an expression 
into a single value
+ * and is expected to throw an exception when the subquery results in more 
than one row
+ *
+ * <p>
+ * This consumes columnType as well along with name and fieldName to pass it 
on to underlying
+ * {@link SingleValueBufferAggregator} to work with different ColumnTypes
+ */
+@JsonTypeName("singleValue")
+public class SingleValueAggregatorFactory extends AggregatorFactory
+{
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String name;
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String fieldName;
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final ColumnType columnType;
+  public static final int DEFAULT_MAX_BUFFER_SIZE = 1025;

Review Comment:
   Why 1025, and not 1024? (Latter seems more "correct")



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This AggregatorFactory is meant to wrap the subquery used as an expression 
into a single value
+ * and is expected to throw an exception when the subquery results in more 
than one row
+ *
+ * <p>
+ * This consumes columnType as well along with name and fieldName to pass it 
on to underlying
+ * {@link SingleValueBufferAggregator} to work with different ColumnTypes
+ */
+@JsonTypeName("singleValue")
+public class SingleValueAggregatorFactory extends AggregatorFactory
+{
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String name;
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String fieldName;
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final ColumnType columnType;
+  public static final int DEFAULT_MAX_BUFFER_SIZE = 1025;
+
+  @JsonCreator
+  public SingleValueAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("columnType") final ColumnType columnType
+  )
+  {
+    Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator 
name");
+    Preconditions.checkNotNull(fieldName, "Must have a valid, non-null 
fieldName");
+
+    this.name = name;
+    this.fieldName = fieldName;
+    this.columnType = columnType;

Review Comment:
   Can columnType be null?



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+public class SingleValueAggregator implements Aggregator
+{
+  final ColumnValueSelector selector;
+  @Nullable
+  Object value;
+  private boolean isNullResult = true;
+  private boolean isAggregateInvoked = false;
+
+  public SingleValueAggregator(ColumnValueSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public void aggregate()
+  {
+    if (isAggregateInvoked) {
+      throw InvalidInput.exception("Subquery expression returned more than one 
row");
+    }
+    Object selectorObject = selector.getObject();
+    boolean isNotNull = (selectorObject != null);
+    if (isNotNull) {
+      isNullResult = false;
+      value = selectorObject;
+    }
+    isAggregateInvoked = true;
+  }
+
+  @Override
+  public Object get()
+  {
+    return value;
+  }
+
+  @Override
+  public float getFloat()
+  {
+    return isNullResult ? NullHandling.ZERO_FLOAT : ((Number) 
value).floatValue();
+  }
+
+  @Override
+  public long getLong()
+  {
+    return isNullResult ? NullHandling.ZERO_LONG : ((Number) 
value).longValue();
+  }
+
+  @Override
+  public double getDouble()
+  {
+    return isNullResult ? NullHandling.ZERO_DOUBLE : ((Number) 
value).doubleValue();
+  }
+
+  @Override
+  public boolean isNull()
+  {
+    return isNullResult;
+  }
+
+  @Override
+  public void close()
+  {
+    // no resources to cleanup
+  }
+
+  @Override
+  public String toString()
+  {
+    return "SingleValueAggregator{" +
+           "selector=" + selector +
+           '}';

Review Comment:
   Shouldn't this also print the `value` and `aggregateInvoked`? 



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+public class SingleValueAggregator implements Aggregator
+{
+  final ColumnValueSelector selector;
+  @Nullable
+  Object value;
+  private boolean isNullResult = true;
+  private boolean isAggregateInvoked = false;
+
+  public SingleValueAggregator(ColumnValueSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public void aggregate()
+  {
+    if (isAggregateInvoked) {
+      throw InvalidInput.exception("Subquery expression returned more than one 
row");
+    }
+    Object selectorObject = selector.getObject();
+    boolean isNotNull = (selectorObject != null);
+    if (isNotNull) {
+      isNullResult = false;
+      value = selectorObject;
+    }
+    isAggregateInvoked = true;
+  }
+
+  @Override
+  public Object get()
+  {
+    return value;
+  }
+
+  @Override
+  public float getFloat()
+  {
+    return isNullResult ? NullHandling.ZERO_FLOAT : ((Number) 
value).floatValue();
+  }
+
+  @Override
+  public long getLong()
+  {
+    return isNullResult ? NullHandling.ZERO_LONG : ((Number) 
value).longValue();

Review Comment:
   Let's add an assertion in the primitive selectors that if the mode is sql 
compatible, then the selector cannot be null. 
   ```suggestion
         assert NullHandling.replaceWithDefault() || !isNull();
         return isNull() ? NullHandling.ZERO_LONG : ((Number) 
value).longValue();
   ```



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.TypeStrategies;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class SingleValueBufferAggregator implements BufferAggregator
+{
+  final ColumnValueSelector selector;
+  final ColumnType columnType;
+  final NullableTypeStrategy typeStrategy;
+  private boolean isAggregateInvoked = false;
+
+  SingleValueBufferAggregator(ColumnValueSelector selector, ColumnType 
columnType)

Review Comment:
   `public`, since the `SingleValueAggregator` is also public scope



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.TypeStrategies;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class SingleValueBufferAggregator implements BufferAggregator
+{
+  final ColumnValueSelector selector;
+  final ColumnType columnType;
+  final NullableTypeStrategy typeStrategy;
+  private boolean isAggregateInvoked = false;
+
+  SingleValueBufferAggregator(ColumnValueSelector selector, ColumnType 
columnType)
+  {
+    this.selector = selector;
+    this.columnType = columnType;
+    this.typeStrategy = columnType.getNullableStrategy();
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, NullHandling.IS_NULL_BYTE);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    if (isAggregateInvoked) {
+      throw InvalidInput.exception("Subquery expression returned more than one 
row");
+    }
+
+    int written = typeStrategy.write(
+        buf,
+        position,
+        getSelectorObject(),
+        columnType.isNumeric() ? Double.BYTES + Byte.BYTES : 
SingleValueAggregatorFactory.DEFAULT_MAX_BUFFER_SIZE
+    );
+    if (written < 0) {
+      throw DruidException.forPersona(DruidException.Persona.ADMIN)
+                          .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                          .build("Single Value Aggregator value exceeds buffer 
limit");
+    }
+    isAggregateInvoked = true;
+  }
+
+  @Nullable
+  private Object getSelectorObject()
+  {
+    if (columnType.isNumeric() && selector.isNull()) {
+      return null;
+    }
+    switch (columnType.getType()) {
+      case LONG:
+        return selector.getLong();
+      case FLOAT:
+        return selector.getFloat();
+      case DOUBLE:
+        return selector.getDouble();
+      default:
+        return selector.getObject();
+    }
+  }
+
+  @Nullable
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    return typeStrategy.read(buf, position);
+  }
+
+  @Override
+  public float getFloat(ByteBuffer buf, int position)
+  {
+    return TypeStrategies.isNullableNull(buf, position)

Review Comment:
   I wonder if two calls are required, since we already have the nullable type 
strategy, so perhaps we can read it once, and check if that is null. However, 
not important, since its called for a single row only.



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.TypeStrategies;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class SingleValueBufferAggregator implements BufferAggregator
+{
+  final ColumnValueSelector selector;
+  final ColumnType columnType;
+  final NullableTypeStrategy typeStrategy;
+  private boolean isAggregateInvoked = false;
+
+  SingleValueBufferAggregator(ColumnValueSelector selector, ColumnType 
columnType)
+  {
+    this.selector = selector;
+    this.columnType = columnType;
+    this.typeStrategy = columnType.getNullableStrategy();

Review Comment:
   This would NPE if columnType is null, hence we should add the null check in 
the aggregator factory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to