LakshSingla commented on code in PR #15700:
URL: https://github.com/apache/druid/pull/15700#discussion_r1475643830


##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+
+public class SingleValueAggregator implements Aggregator
+{
+  final ColumnValueSelector selector;
+
+  @Nullable
+  Object value;
+
+  private boolean isNullResult = true;
+
+  private boolean isAggregateInvoked = false;
+
+  public SingleValueAggregator(ColumnValueSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public void aggregate()
+  {
+    if (isAggregateInvoked) {
+      throw InvalidInput.exception("Subquery expression returned more than one 
row");
+    }
+    boolean isNotNull = (selector.getObject() != null);
+    if (isNotNull) {
+      isNullResult = false;
+      value = selector.getObject();

Review Comment:
   Should probably reuse the `.getObject()` called above, because for 
primitives that would cause autoboxing.
   
   I was going to suggest using `.isNull()` check at the top instead, however, 
I don't think they might behave correctly for object selectors, therefore 
probably refraining from the suggestion. 



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+
+public class SingleValueAggregator implements Aggregator
+{
+  final ColumnValueSelector selector;
+
+  @Nullable
+  Object value;
+
+  private boolean isNullResult = true;
+

Review Comment:
   nit: Spacing not needed



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregator.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+
+public class SingleValueAggregator implements Aggregator
+{
+  final ColumnValueSelector selector;
+
+  @Nullable
+  Object value;
+
+  private boolean isNullResult = true;
+
+  private boolean isAggregateInvoked = false;
+
+  public SingleValueAggregator(ColumnValueSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public void aggregate()
+  {
+    if (isAggregateInvoked) {
+      throw InvalidInput.exception("Subquery expression returned more than one 
row");
+    }
+    boolean isNotNull = (selector.getObject() != null);
+    if (isNotNull) {
+      isNullResult = false;
+      value = selector.getObject();
+    }
+    isAggregateInvoked = true;
+  }
+
+  @Override
+  public Object get()
+  {
+    return value;
+  }
+
+  @Override
+  public float getFloat()
+  {
+    if (isNullResult) {
+      throw DruidException.defensive("Cannot return float for Null Value");
+    }
+    return (float) value;
+  }
+
+  @Override
+  public long getLong()
+  {
+    if (isNullResult) {
+      throw DruidException.defensive("Cannot return long for Null Value");
+    }
+    return (long) value;
+  }
+
+  @Override
+  public double getDouble()
+  {
+    if (isNullResult) {
+      throw DruidException.defensive("Cannot return double for Null Value");
+    }

Review Comment:
   Probably for SQL-incompatible behavior, let's just return the default value. 
It's the onus of the caller to call `.isNull` and then the `.getDouble`. I am 
wondering if any SQL-compatible path that just calls `.getDouble` without 
calling .isNull() can cause it to throw. Anyways, it will also prevent an 
additional check in this path. Same goes for other selectors.



##########
sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/SingleValueSqlAggregator.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.aggregation.builtin;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.SingleValueAggregatorFactory;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.sql.calcite.aggregation.Aggregation;
+import org.apache.druid.sql.calcite.planner.Calcites;
+
+import javax.annotation.Nullable;
+
+public class SingleValueSqlAggregator extends SimpleSqlAggregator

Review Comment:
   nit: Can add javadoc as to how this will be called by the SQL query, since 
its not something that the user will supply. 



##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java:
##########
@@ -1042,4 +1049,312 @@ public void testJoinWithSubqueries()
         results
     );
   }
+
+  @Test
+  public void testSingleValueFloatAgg()
+  {
+    msqIncompatible();
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT count(*) FROM foo where m1 <= (select min(m1) + 4 from foo)",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(join(
+                      new TableDataSource(CalciteTests.DATASOURCE1),
+                      new QueryDataSource(GroupByQuery.builder()
+                                                      .setDataSource(new 
QueryDataSource(
+                                                          
Druids.newTimeseriesQueryBuilder()
+                                                                
.dataSource(CalciteTests.DATASOURCE1)
+                                                                
.intervals(querySegmentSpec(Filtration.eternity()))
+                                                                
.granularity(Granularities.ALL)
+                                                                
.aggregators(new FloatMinAggregatorFactory("a0", "m1"))
+                                                                .build()
+                                                      ))
+                                                      
.setInterval(querySegmentSpec(Filtration.eternity()))
+                                                      
.setGranularity(Granularities.ALL)
+                                                      
.setVirtualColumns(expressionVirtualColumn(
+                                                                             
"v0",
+                                                                             
"(\"a0\" + 4)",
+                                                                             
ColumnType.FLOAT
+                                                                         )
+                                                      )
+                                                      .setAggregatorSpecs(
+                                                          aggregators(
+                                                              new 
SingleValueAggregatorFactory(
+                                                                  "_a0",
+                                                                  "v0",
+                                                                  
ColumnType.FLOAT
+                                                              )
+                                                          )
+                                                      )
+                                                      
.setLimitSpec(NoopLimitSpec.instance())
+                                                      
.setContext(QUERY_CONTEXT_DEFAULT)
+                                                      .build()
+                      ),
+                      "j0.",
+                      "1",
+                      NullHandling.replaceWithDefault() ? JoinType.LEFT : 
JoinType.INNER
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .aggregators(aggregators(new CountAggregatorFactory("a0")))
+                  .filters(expressionFilter("(\"m1\" <= \"j0._a0\")"))
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{5L}
+        )
+    );
+  }
+
+  @Test
+  public void testSingleValueDoubleAgg()
+  {
+    msqIncompatible();
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT count(*) FROM foo where m1 >= (select max(m1) - 3.5 from foo)",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(join(
+                      new TableDataSource(CalciteTests.DATASOURCE1),
+                      new QueryDataSource(GroupByQuery.builder()
+                                                      .setDataSource(new 
QueryDataSource(
+                                                          
Druids.newTimeseriesQueryBuilder()
+                                                                
.dataSource(CalciteTests.DATASOURCE1)
+                                                                
.intervals(querySegmentSpec(Filtration.eternity()))
+                                                                
.granularity(Granularities.ALL)
+                                                                
.aggregators(new FloatMaxAggregatorFactory("a0", "m1"))
+                                                                .build()
+                                                      ))
+                                                      
.setInterval(querySegmentSpec(Filtration.eternity()))
+                                                      
.setGranularity(Granularities.ALL)
+                                                      
.setVirtualColumns(expressionVirtualColumn(
+                                                                             
"v0",
+                                                                             
"(\"a0\" - 3.5)",
+                                                                             
ColumnType.DOUBLE
+                                                                         )
+                                                      )
+                                                      .setAggregatorSpecs(
+                                                          aggregators(
+                                                              new 
SingleValueAggregatorFactory(
+                                                                  "_a0",
+                                                                  "v0",
+                                                                  
ColumnType.DOUBLE
+                                                              )
+                                                          )
+                                                      )
+                                                      
.setLimitSpec(NoopLimitSpec.instance())
+                                                      
.setContext(QUERY_CONTEXT_DEFAULT)
+                                                      .build()
+                      ),
+                      "j0.",
+                      "1",
+                      NullHandling.replaceWithDefault() ? JoinType.LEFT : 
JoinType.INNER
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .aggregators(aggregators(new CountAggregatorFactory("a0")))
+                  .filters(expressionFilter("(\"m1\" >= \"j0._a0\")"))
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{4L}
+        )
+    );
+  }
+
+  @Test
+  public void testSingleValueLongAgg()
+  {
+    msqIncompatible();
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT count(*) FROM wikipedia where __time >= (select max(__time) - 
INTERVAL '10' MINUTE from wikipedia)",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(join(
+                      new TableDataSource(CalciteTests.WIKIPEDIA),
+                      new QueryDataSource(GroupByQuery.builder()
+                                                      .setDataSource(new 
QueryDataSource(
+                                                          
Druids.newTimeseriesQueryBuilder()
+                                                                
.dataSource(CalciteTests.WIKIPEDIA)
+                                                                
.intervals(querySegmentSpec(Filtration.eternity()))
+                                                                
.granularity(Granularities.ALL)
+                                                                
.aggregators(new LongMaxAggregatorFactory(
+                                                                    "a0",
+                                                                    "__time"
+                                                                ))
+                                                                .build()
+                                                      ))
+                                                      
.setInterval(querySegmentSpec(Filtration.eternity()))
+                                                      
.setGranularity(Granularities.ALL)
+                                                      
.setVirtualColumns(expressionVirtualColumn(
+                                                                             
"v0",
+                                                                             
"(\"a0\" - 600000)",
+                                                                             
ColumnType.LONG
+                                                                         )
+                                                      )
+                                                      .setAggregatorSpecs(
+                                                          aggregators(
+                                                              new 
SingleValueAggregatorFactory(
+                                                                  "_a0",
+                                                                  "v0",
+                                                                  
ColumnType.LONG
+                                                              )
+                                                          )
+                                                      )
+                                                      
.setLimitSpec(NoopLimitSpec.instance())
+                                                      
.setContext(QUERY_CONTEXT_DEFAULT)
+                                                      .build()
+                      ),
+                      "j0.",
+                      "1",
+                      NullHandling.replaceWithDefault() ? JoinType.LEFT : 
JoinType.INNER
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .aggregators(aggregators(new CountAggregatorFactory("a0")))
+                  .filters(expressionFilter("(\"__time\" >= \"j0._a0\")"))
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{220L}
+        )
+    );
+  }
+
+  @Test
+  public void testSingleValueStringAgg()
+  {
+    msqIncompatible();
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT  count(*) FROM wikipedia where channel = (select channel from 
wikipedia order by __time desc LIMIT 1 OFFSET 6)",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(join(
+                      new TableDataSource(CalciteTests.WIKIPEDIA),
+                      new QueryDataSource(GroupByQuery.builder()
+                                                      .setDataSource(new 
QueryDataSource(
+                                                          
Druids.newScanQueryBuilder()
+                                                                
.dataSource(CalciteTests.WIKIPEDIA)
+                                                                
.intervals(querySegmentSpec(Filtration.eternity()))
+                                                                
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                                                                .offset(6L)
+                                                                .limit(1L)
+                                                                
.order(ScanQuery.Order.DESCENDING)
+                                                                
.columns("__time", "channel")
+                                                                .legacy(false)
+                                                                
.context(QUERY_CONTEXT_DEFAULT)
+                                                                .build()
+                                                      ))
+                                                      
.setInterval(querySegmentSpec(Filtration.eternity()))
+                                                      
.setGranularity(Granularities.ALL)
+                                                      
.setVirtualColumns(expressionVirtualColumn(
+                                                                             
"v0",
+                                                                             
"\"channel\"",
+                                                                             
ColumnType.STRING
+                                                                         )
+                                                      )
+                                                      .setAggregatorSpecs(
+                                                          aggregators(
+                                                              new 
SingleValueAggregatorFactory(
+                                                                  "a0",
+                                                                  "v0",
+                                                                  
ColumnType.STRING
+                                                              )
+                                                          )
+                                                      )
+                                                      
.setLimitSpec(NoopLimitSpec.instance())
+                                                      
.setContext(QUERY_CONTEXT_DEFAULT)
+                                                      .build()
+                      ),
+                      "j0.",
+                      "(\"channel\" == \"j0.a0\")",
+                      JoinType.INNER
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .aggregators(aggregators(new CountAggregatorFactory("a0")))
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{1256L}
+        )
+    );
+  }
+
+  @Test
+  public void testSingleValueStringMultipleRowsAgg()
+  {
+    msqIncompatible();
+    skipVectorize();
+    cannotVectorize();
+    testQueryThrows(
+        "SELECT  count(*) FROM wikipedia where channel = (select channel from 
wikipedia order by __time desc LIMIT 2 OFFSET 6)",
+        exception -> exception.expectMessage("Subquery expression returned 
more than one row")
+    );
+  }
+
+  @Test
+  public void testSingleValueEmptyInnerAgg()
+  {
+    msqIncompatible();

Review Comment:
   Why `msqIncompatible()`? I think this test isn't subclassed by MSQ anyways, 
so we can get rid of these calls, however, I do expect it to be MSQ-compatible. 



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.TypeStrategies;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class SingleValueBufferAggregator implements BufferAggregator
+{
+  final ColumnValueSelector selector;
+
+  final ColumnType columnType;
+
+  final NullableTypeStrategy typeStrategy;
+
+  private boolean isAggregateInvoked = false;
+
+  SingleValueBufferAggregator(ColumnValueSelector selector, ColumnType 
columnType)
+  {
+    this.selector = selector;
+    this.columnType = columnType;
+    this.typeStrategy = columnType.getNullableStrategy();
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.put(position, NullHandling.IS_NULL_BYTE);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    if (isAggregateInvoked) {
+      throw InvalidInput.exception("Subquery expression returned more than one 
row");
+    }
+
+    int written = typeStrategy.write(
+        buf,
+        position,
+        getSelectorObject(),
+        SingleValueAggregatorFactory.DEFAULT_MAX_STRING_SIZE + Byte.BYTES
+    );
+    if (written < 0) {
+      throw new ISE("Single Value Aggregator value exceeds buffer limit");

Review Comment:
   It shouldn't be an ISE, this is something that will be encountered by the 
user, if they return a lengthy string, in the result. The error message should 
be targeted towards the `ADMIN` (perhaps), stating the inadequacy of the 
aggregator.



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueBufferAggregator.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+import org.apache.druid.segment.column.TypeStrategies;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class SingleValueBufferAggregator implements BufferAggregator
+{
+  final ColumnValueSelector selector;
+
+  final ColumnType columnType;
+
+  final NullableTypeStrategy typeStrategy;
+
+  private boolean isAggregateInvoked = false;

Review Comment:
   nit: too much spacing



##########
processing/src/test/java/org/apache/druid/query/aggregation/SingleValueAggregationTest.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+/**
+  */
+public class SingleValueAggregationTest
+{
+  private SingleValueAggregatorFactory longAggFatory;
+  private ColumnSelectorFactory colSelectorFactoryLong;
+  private ColumnCapabilities columnCapabilitiesLong;
+  private TestLongColumnSelector selectorLong;
+
+  private SingleValueAggregatorFactory doubleAggFatory;
+  private ColumnSelectorFactory colSelectorFactoryDouble;
+  private ColumnCapabilities columnCapabilitiesDouble;
+  private TestDoubleColumnSelectorImpl selectorDouble;
+
+  private SingleValueAggregatorFactory stringAggFatory;
+  private ColumnSelectorFactory colSelectorFactoryString;
+  private ColumnCapabilities columnCapabilitiesString;
+  private TestObjectColumnSelector selectorString;
+
+  private final long[] longValues = {9223372036854775802L, 
9223372036854775803L};
+  private final double[] doubleValues = {5.2d, 2.8976552d};
+  private final String[] strValues = {"str1", "str2"};
+
+  public SingleValueAggregationTest() throws Exception
+  {
+    String longAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"lng\", 
\"fieldName\": \"lngFld\", \"columnType\": \"LONG\"}";
+    longAggFatory = TestHelper.makeJsonMapper().readValue(longAggSpecJson, 
SingleValueAggregatorFactory.class);
+
+    String doubleAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"dbl\", 
\"fieldName\": \"dblFld\", \"columnType\": \"DOUBLE\"}";
+    doubleAggFatory = TestHelper.makeJsonMapper().readValue(doubleAggSpecJson, 
SingleValueAggregatorFactory.class);
+
+    String strAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"str\", 
\"fieldName\": \"strFld\", \"columnType\": \"STRING\"}";
+    stringAggFatory = TestHelper.makeJsonMapper().readValue(strAggSpecJson, 
SingleValueAggregatorFactory.class);
+  }

Review Comment:
   nit: spelling



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This AggregatorFactory is meant to wrap the subquery used as an expression 
into a single value
+ * and is expected to throw an exception when the subquery results in more 
than one row
+ *
+ * <p>
+ * This consumes columnType as well along with name and fieldName to pass it 
on to underlying
+ * {@link SingleValueBufferAggregator} to work with different ColumnTypes
+ */
+@JsonTypeName("singleValue")
+public class SingleValueAggregatorFactory extends AggregatorFactory
+{
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String name;
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String fieldName;
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final ColumnType columnType;
+
+  public static final int DEFAULT_MAX_STRING_SIZE = 1024;
+
+  @JsonCreator
+  public SingleValueAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("columnType") final ColumnType columnType
+  )
+  {
+    Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator 
name");
+    Preconditions.checkNotNull(fieldName, "Must have a valid, non-null 
fieldName");
+
+    this.name = name;
+    this.fieldName = fieldName;
+    this.columnType = columnType;
+  }
+
+  @Override
+  public Aggregator factorize(ColumnSelectorFactory metricFactory)
+  {
+    ColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
+    return new SingleValueAggregator(selector);
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
+  {
+    ColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
+    ColumnCapabilities columnCapabilities = 
metricFactory.getColumnCapabilities(fieldName);
+    ColumnType columnType = new ColumnType(columnCapabilities.getType(), null, 
null);
+    return new SingleValueBufferAggregator(selector, columnType);
+  }
+
+  @Override
+  public Comparator getComparator()
+  {
+    throw DruidException.defensive("Single Value Aggregator would not have 
more than one row to compare");
+  }
+
+  @Override
+  @Nullable
+  public Object combine(@Nullable Object lhs, @Nullable Object rhs)
+  {
+    throw DruidException.defensive("Single Value Aggregator would not have 
more than one row to combine");
+  }
+
+  @Override
+  public AggregatorFactory getCombiningFactory()
+  {
+    return new SingleValueAggregatorFactory(name, name, columnType);
+  }
+
+  @Override
+  public Object deserialize(Object object)
+  {
+    return object;
+  }
+
+  @Nullable
+  @Override
+  public Object finalizeComputation(@Nullable Object object)
+  {
+    return object;
+  }
+
+  @Override
+  public ColumnType getIntermediateType()
+  {
+    return columnType;
+  }
+
+  @Override
+  public ColumnType getResultType()
+  {
+    return columnType;
+  }
+
+
+  @Override
+  @JsonProperty
+  public String getName()
+  {
+    return name;
+  }
+
+  @JsonProperty
+  public String getFieldName()
+  {
+    return fieldName;
+  }
+
+  @Override
+  public List<String> requiredFields()
+  {
+    return Collections.singletonList(fieldName);
+  }
+
+  @Override
+  public int getMaxIntermediateSize()
+  {
+    if (columnType.is(ValueType.STRING)) {
+      return Byte.BYTES + DEFAULT_MAX_STRING_SIZE;
+    }
+    return Byte.BYTES + Double.BYTES;

Review Comment:
   This seems incorrect for ComplexTypes (which can take 
`DEFAULT_MAX_STRING_SIZE`), and longs and floats. 



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This AggregatorFactory is meant to wrap the subquery used as an expression 
into a single value
+ * and is expected to throw an exception when the subquery results in more 
than one row
+ *
+ * <p>
+ * This consumes columnType as well along with name and fieldName to pass it 
on to underlying
+ * {@link SingleValueBufferAggregator} to work with different ColumnTypes
+ */
+@JsonTypeName("singleValue")
+public class SingleValueAggregatorFactory extends AggregatorFactory
+{
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String name;
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String fieldName;
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final ColumnType columnType;
+
+  public static final int DEFAULT_MAX_STRING_SIZE = 1024;

Review Comment:
   If possible, we should call it out somewhere. Also, this afaict this is not 
restricted to strings, (but also complex types), therefore let's rename this. 



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This AggregatorFactory is meant to wrap the subquery used as an expression 
into a single value
+ * and is expected to throw an exception when the subquery results in more 
than one row
+ *
+ * <p>
+ * This consumes columnType as well along with name and fieldName to pass it 
on to underlying
+ * {@link SingleValueBufferAggregator} to work with different ColumnTypes
+ */
+@JsonTypeName("singleValue")
+public class SingleValueAggregatorFactory extends AggregatorFactory
+{
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String name;
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String fieldName;
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final ColumnType columnType;
+
+  public static final int DEFAULT_MAX_STRING_SIZE = 1024;
+
+  @JsonCreator
+  public SingleValueAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("columnType") final ColumnType columnType
+  )
+  {
+    Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator 
name");
+    Preconditions.checkNotNull(fieldName, "Must have a valid, non-null 
fieldName");
+
+    this.name = name;
+    this.fieldName = fieldName;
+    this.columnType = columnType;
+  }
+
+  @Override
+  public Aggregator factorize(ColumnSelectorFactory metricFactory)
+  {
+    ColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
+    return new SingleValueAggregator(selector);
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
+  {
+    ColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
+    ColumnCapabilities columnCapabilities = 
metricFactory.getColumnCapabilities(fieldName);
+    ColumnType columnType = new ColumnType(columnCapabilities.getType(), null, 
null);
+    return new SingleValueBufferAggregator(selector, columnType);
+  }
+
+  @Override
+  public Comparator getComparator()
+  {
+    throw DruidException.defensive("Single Value Aggregator would not have 
more than one row to compare");
+  }
+
+  @Override
+  @Nullable
+  public Object combine(@Nullable Object lhs, @Nullable Object rhs)
+  {
+    throw DruidException.defensive("Single Value Aggregator would not have 
more than one row to combine");
+  }
+
+  @Override
+  public AggregatorFactory getCombiningFactory()
+  {
+    return new SingleValueAggregatorFactory(name, name, columnType);
+  }
+
+  @Override
+  public Object deserialize(Object object)
+  {
+    return object;
+  }
+
+  @Nullable
+  @Override
+  public Object finalizeComputation(@Nullable Object object)
+  {
+    return object;
+  }
+
+  @Override
+  public ColumnType getIntermediateType()
+  {
+    return columnType;
+  }
+
+  @Override
+  public ColumnType getResultType()
+  {
+    return columnType;
+  }
+

Review Comment:
   nit: Spacing



##########
processing/src/main/java/org/apache/druid/query/aggregation/SingleValueAggregatorFactory.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This AggregatorFactory is meant to wrap the subquery used as an expression 
into a single value
+ * and is expected to throw an exception when the subquery results in more 
than one row
+ *
+ * <p>
+ * This consumes columnType as well along with name and fieldName to pass it 
on to underlying
+ * {@link SingleValueBufferAggregator} to work with different ColumnTypes
+ */
+@JsonTypeName("singleValue")
+public class SingleValueAggregatorFactory extends AggregatorFactory
+{
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String name;
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final String fieldName;
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final ColumnType columnType;
+
+  public static final int DEFAULT_MAX_STRING_SIZE = 1024;
+
+  @JsonCreator
+  public SingleValueAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("columnType") final ColumnType columnType
+  )
+  {
+    Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator 
name");
+    Preconditions.checkNotNull(fieldName, "Must have a valid, non-null 
fieldName");
+
+    this.name = name;
+    this.fieldName = fieldName;
+    this.columnType = columnType;
+  }
+
+  @Override
+  public Aggregator factorize(ColumnSelectorFactory metricFactory)
+  {
+    ColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
+    return new SingleValueAggregator(selector);
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
+  {
+    ColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
+    ColumnCapabilities columnCapabilities = 
metricFactory.getColumnCapabilities(fieldName);
+    ColumnType columnType = new ColumnType(columnCapabilities.getType(), null, 
null);
+    return new SingleValueBufferAggregator(selector, columnType);
+  }
+
+  @Override
+  public Comparator getComparator()
+  {
+    throw DruidException.defensive("Single Value Aggregator would not have 
more than one row to compare");
+  }
+
+  @Override
+  @Nullable
+  public Object combine(@Nullable Object lhs, @Nullable Object rhs)

Review Comment:
   What would happen if `lhs` represents something empty (i.e. hasn't 
encountered a single row), while `rhs` is something that has encountered a 
single row. In that case `rhs` should be returned. Similarly, the cases for 
when `lhs` represents something which means a single row, while `rhs` is empty; 
or when both are empty). I am not sure how we would encode "that something is 
empty", hence the confusion.
   
   This also is at odds with `getCombiningFactory`, since that doesn't throw 
upon being called, and returns a correct implementation. I assumed that both 
should have the same semantics. The calling patterns might be such that its 
okay to not implement the .combine() and throw an exception here, and if so, I 
think there should be some comment explaining as to why. 



##########
processing/src/test/java/org/apache/druid/query/aggregation/SingleValueAggregationTest.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+/**
+  */
+public class SingleValueAggregationTest
+{
+  private SingleValueAggregatorFactory longAggFatory;
+  private ColumnSelectorFactory colSelectorFactoryLong;
+  private ColumnCapabilities columnCapabilitiesLong;
+  private TestLongColumnSelector selectorLong;
+
+  private SingleValueAggregatorFactory doubleAggFatory;
+  private ColumnSelectorFactory colSelectorFactoryDouble;
+  private ColumnCapabilities columnCapabilitiesDouble;
+  private TestDoubleColumnSelectorImpl selectorDouble;
+
+  private SingleValueAggregatorFactory stringAggFatory;
+  private ColumnSelectorFactory colSelectorFactoryString;
+  private ColumnCapabilities columnCapabilitiesString;
+  private TestObjectColumnSelector selectorString;
+
+  private final long[] longValues = {9223372036854775802L, 
9223372036854775803L};
+  private final double[] doubleValues = {5.2d, 2.8976552d};
+  private final String[] strValues = {"str1", "str2"};
+
+  public SingleValueAggregationTest() throws Exception
+  {
+    String longAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"lng\", 
\"fieldName\": \"lngFld\", \"columnType\": \"LONG\"}";
+    longAggFatory = TestHelper.makeJsonMapper().readValue(longAggSpecJson, 
SingleValueAggregatorFactory.class);
+
+    String doubleAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"dbl\", 
\"fieldName\": \"dblFld\", \"columnType\": \"DOUBLE\"}";
+    doubleAggFatory = TestHelper.makeJsonMapper().readValue(doubleAggSpecJson, 
SingleValueAggregatorFactory.class);
+
+    String strAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"str\", 
\"fieldName\": \"strFld\", \"columnType\": \"STRING\"}";
+    stringAggFatory = TestHelper.makeJsonMapper().readValue(strAggSpecJson, 
SingleValueAggregatorFactory.class);
+  }
+
+  @Before
+  public void setup()
+  {
+    NullHandling.initializeForTests();

Review Comment:
   nit: Instead of calling it in the setup(), we can subclass the test class 
with `extends InitializedNullHandlingTest`



##########
processing/src/test/java/org/apache/druid/query/aggregation/SingleValueAggregationTest.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+/**
+  */

Review Comment:
   nit: cleanup



##########
processing/src/test/java/org/apache/druid/query/aggregation/SingleValueAggregationTest.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+/**
+  */
+public class SingleValueAggregationTest
+{
+  private SingleValueAggregatorFactory longAggFatory;
+  private ColumnSelectorFactory colSelectorFactoryLong;
+  private ColumnCapabilities columnCapabilitiesLong;
+  private TestLongColumnSelector selectorLong;
+
+  private SingleValueAggregatorFactory doubleAggFatory;
+  private ColumnSelectorFactory colSelectorFactoryDouble;
+  private ColumnCapabilities columnCapabilitiesDouble;
+  private TestDoubleColumnSelectorImpl selectorDouble;
+
+  private SingleValueAggregatorFactory stringAggFatory;
+  private ColumnSelectorFactory colSelectorFactoryString;
+  private ColumnCapabilities columnCapabilitiesString;
+  private TestObjectColumnSelector selectorString;
+
+  private final long[] longValues = {9223372036854775802L, 
9223372036854775803L};
+  private final double[] doubleValues = {5.2d, 2.8976552d};
+  private final String[] strValues = {"str1", "str2"};
+
+  public SingleValueAggregationTest() throws Exception
+  {
+    String longAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"lng\", 
\"fieldName\": \"lngFld\", \"columnType\": \"LONG\"}";
+    longAggFatory = TestHelper.makeJsonMapper().readValue(longAggSpecJson, 
SingleValueAggregatorFactory.class);

Review Comment:
   ```suggestion
       longAggFactory = TestHelper.makeJsonMapper().readValue(longAggSpecJson, 
SingleValueAggregatorFactory.class);
   ```



##########
processing/src/test/java/org/apache/druid/query/aggregation/SingleValueAggregationTest.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ValueType;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+/**
+  */
+public class SingleValueAggregationTest
+{
+  private SingleValueAggregatorFactory longAggFatory;
+  private ColumnSelectorFactory colSelectorFactoryLong;
+  private ColumnCapabilities columnCapabilitiesLong;
+  private TestLongColumnSelector selectorLong;
+
+  private SingleValueAggregatorFactory doubleAggFatory;
+  private ColumnSelectorFactory colSelectorFactoryDouble;
+  private ColumnCapabilities columnCapabilitiesDouble;
+  private TestDoubleColumnSelectorImpl selectorDouble;
+
+  private SingleValueAggregatorFactory stringAggFatory;
+  private ColumnSelectorFactory colSelectorFactoryString;
+  private ColumnCapabilities columnCapabilitiesString;
+  private TestObjectColumnSelector selectorString;
+
+  private final long[] longValues = {9223372036854775802L, 
9223372036854775803L};
+  private final double[] doubleValues = {5.2d, 2.8976552d};
+  private final String[] strValues = {"str1", "str2"};
+
+  public SingleValueAggregationTest() throws Exception
+  {
+    String longAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"lng\", 
\"fieldName\": \"lngFld\", \"columnType\": \"LONG\"}";
+    longAggFatory = TestHelper.makeJsonMapper().readValue(longAggSpecJson, 
SingleValueAggregatorFactory.class);
+
+    String doubleAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"dbl\", 
\"fieldName\": \"dblFld\", \"columnType\": \"DOUBLE\"}";
+    doubleAggFatory = TestHelper.makeJsonMapper().readValue(doubleAggSpecJson, 
SingleValueAggregatorFactory.class);
+
+    String strAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"str\", 
\"fieldName\": \"strFld\", \"columnType\": \"STRING\"}";
+    stringAggFatory = TestHelper.makeJsonMapper().readValue(strAggSpecJson, 
SingleValueAggregatorFactory.class);
+  }
+
+  @Before
+  public void setup()
+  {
+    NullHandling.initializeForTests();
+
+    selectorLong = new TestLongColumnSelector(longValues);
+    columnCapabilitiesLong = EasyMock.createMock(ColumnCapabilities.class);
+    
EasyMock.expect(columnCapabilitiesLong.getType()).andReturn(ValueType.LONG);
+
+    colSelectorFactoryLong = EasyMock.createMock(ColumnSelectorFactory.class);
+    
EasyMock.expect(colSelectorFactoryLong.makeColumnValueSelector("lngFld")).andReturn(selectorLong);
+    
EasyMock.expect(colSelectorFactoryLong.getColumnCapabilities("lngFld")).andReturn(columnCapabilitiesLong);
+
+    EasyMock.replay(columnCapabilitiesLong);
+    EasyMock.replay(colSelectorFactoryLong);
+
+    selectorDouble = new TestDoubleColumnSelectorImpl(doubleValues);
+    columnCapabilitiesDouble = EasyMock.createMock(ColumnCapabilities.class);
+    
EasyMock.expect(columnCapabilitiesDouble.getType()).andReturn(ValueType.DOUBLE);
+
+    colSelectorFactoryDouble = 
EasyMock.createMock(ColumnSelectorFactory.class);
+    
EasyMock.expect(colSelectorFactoryDouble.makeColumnValueSelector("dblFld")).andReturn(selectorDouble);
+    
EasyMock.expect(colSelectorFactoryDouble.getColumnCapabilities("dblFld")).andReturn(columnCapabilitiesDouble);
+
+    EasyMock.replay(columnCapabilitiesDouble);
+    EasyMock.replay(colSelectorFactoryDouble);

Review Comment:
   There should be a cleaner way than mocking the selector factory and the 
column capabilities. For column capabilities, you can use the 
`ColumnCapabilitiesImpl#createSimpleNumericColumnCapabilities` et al
   . I was going through the code to find any reusable class for the column 
selector factory and I found `TestColumnSelectorFactory`. Perhaps others can 
achieve this more cleanly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to