This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f2ce769  add EARLIEST_BY/LATEST_BY to make EARLIEST/LATEST function 
signatures less ambiguous (#12145)
f2ce769 is described below

commit f2ce76966cf06134dbe712782edf46a7c8d72563
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Jan 12 03:48:53 2022 -0800

    add EARLIEST_BY/LATEST_BY to make EARLIEST/LATEST function signatures less 
ambiguous (#12145)
    
    * add EARLIEST_BY/LATEST_BY to make EARLIEST/LATEST function signatures 
unambiguous
    
    * switcheroo
    
    * EARLIEST_BY/LATEST_BY use timestamp instead of numeric types, update docs
    
    * revert unintended change
    
    * fix docs
    
    * fix docs better
---
 docs/querying/sql.md                               |  12 +-
 .../aggregation/first/NumericFirstAggregator.java  |   3 +
 .../first/NumericFirstBufferAggregator.java        |   3 +
 .../aggregation/first/StringFirstAggregator.java   |   3 +
 .../first/StringFirstBufferAggregator.java         |   3 +
 .../aggregation/last/NumericLastAggregator.java    |   3 +
 .../last/NumericLastBufferAggregator.java          |   3 +
 .../aggregation/last/StringLastAggregator.java     |   3 +
 .../last/StringLastBufferAggregator.java           |   3 +
 .../builtin/EarliestLatestAnySqlAggregator.java    |  41 +----
 .../builtin/EarliestLatestBySqlAggregator.java     | 181 +++++++++++++++++++++
 .../sql/calcite/planner/DruidOperatorTable.java    |   3 +
 .../apache/druid/sql/calcite/CalciteQueryTest.java |  70 +++++---
 13 files changed, 268 insertions(+), 63 deletions(-)

diff --git a/docs/querying/sql.md b/docs/querying/sql.md
index f0e769a..e227757 100644
--- a/docs/querying/sql.md
+++ b/docs/querying/sql.md
@@ -364,14 +364,14 @@ In the aggregation functions supported by Druid, only 
`COUNT`, `ARRAY_AGG`, and
 |`STDDEV_POP(expr)`|Computes standard deviation population of `expr`. See 
[stats extension](../development/extensions-core/stats.md) documentation for 
additional details.|`null` if `druid.generic.useDefaultValueForNull=false`, 
otherwise `0`|
 |`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats 
extension](../development/extensions-core/stats.md) documentation for 
additional details.|`null` if `druid.generic.useDefaultValueForNull=false`, 
otherwise `0`|
 |`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats 
extension](../development/extensions-core/stats.md) documentation for 
additional details.|`null` if `druid.generic.useDefaultValueForNull=false`, 
otherwise `0`|
-|`EARLIEST(expr)`|Returns the earliest value of `expr`, which must be numeric. 
If `expr` comes from a relation with a timestamp column (like a Druid 
datasource) then "earliest" is the value first encountered with the minimum 
overall timestamp of all values being aggregated. If `expr` does not come from 
a relation with a timestamp, then it is simply the first value 
encountered.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise 
`0`|
-|`EARLIEST(expr, timeColumn)`|Returns the earliest value of `expr`, which must 
be numeric. Earliest value is defined as the value first encountered with the 
minimum overall value of time column of all values being aggregated.|`null` if 
`druid.generic.useDefaultValueForNull=false`, otherwise `0`|
+|`EARLIEST(expr)`|Returns the earliest value of `expr`, which must be numeric. 
If `expr` comes from a relation with a timestamp column (like `__time` in a 
Druid datasource), the "earliest" is taken from the row with the overall 
earliest non-null value of the timestamp column. If the earliest non-null value 
of the timestamp column appears in multiple rows, the `expr` may be taken from 
any of those rows. If `expr` does not come from a relation with a timestamp, 
then it is simply the first  [...]
 |`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. 
The `maxBytesPerString` parameter determines how much aggregation space to 
allocate per string. Strings longer than this limit will be truncated. This 
parameter should be set as low as possible, since high values will lead to 
wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, 
otherwise `''`|
-|`EARLIEST(expr, maxBytesPerString, timeColumn)`|Like `EARLIEST(expr, 
timeColumn)`, but for strings. The `maxBytesPerString` parameter determines how 
much aggregation space to allocate per string. Strings longer than this limit 
will be truncated. This parameter should be set as low as possible, since high 
values will lead to wasted memory.|`null` if 
`druid.generic.useDefaultValueForNull=false`, otherwise `''`|
-|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If 
`expr` comes from a relation with a timestamp column (like a Druid datasource) 
then "latest" is the value last encountered with the maximum overall timestamp 
of all values being aggregated. If `expr` does not come from a relation with a 
timestamp, then it is simply the last value encountered.|`null` if 
`druid.generic.useDefaultValueForNull=false`, otherwise `0`|
-|`LATEST(expr, timeColumn)`|Returns the latest value of `expr`, which must be 
numeric. Latest value is defined as the value last encountered with the maximum 
overall value of time column of all values being aggregated.|`null` if 
`druid.generic.useDefaultValueForNull=false`, otherwise `0`|
+|`EARLIEST_BY(expr, timestampExpr)`|Returns the earliest value of `expr`, 
which must be numeric. The earliest value of `expr` is taken from the row with 
the overall earliest non-null value of `timestampExpr`. If the earliest 
non-null value of `timestampExpr` appears in multiple rows, the `expr` may be 
taken from any of those rows.|`null` if 
`druid.generic.useDefaultValueForNull=false`, otherwise `0`|
+|`EARLIEST_BY(expr, timestampExpr, maxBytesPerString)`| Like 
`EARLIEST_BY(expr, timestampExpr)`, but for strings. The `maxBytesPerString` 
parameter determines how much aggregation space to allocate per string. Strings 
longer than this limit will be truncated. This parameter should be set as low 
as possible, since high values will lead to wasted memory.|`null` if 
`druid.generic.useDefaultValueForNull=false`, otherwise `''`|
+|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If 
`expr` comes from a relation with a timestamp column (like `__time` in a Druid 
datasource), the "latest" is taken from the row with the overall latest 
non-null value of the timestamp column. If the latest non-null value of the 
timestamp column appears in multiple rows, the `expr` may be taken from any of 
those rows. If `expr` does not come from a relation with a timestamp, then it 
is simply the last value encou [...]
 |`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The 
`maxBytesPerString` parameter determines how much aggregation space to allocate 
per string. Strings longer than this limit will be truncated. This parameter 
should be set as low as possible, since high values will lead to wasted 
memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
-|`LATEST(expr, maxBytesPerString, timeColumn)`|Like `LATEST(expr, 
timeColumn)`, but for strings. The `maxBytesPerString` parameter determines how 
much aggregation space to allocate per string. Strings longer than this limit 
will be truncated. This parameter should be set as low as possible, since high 
values will lead to wasted memory.|`null` if 
`druid.generic.useDefaultValueForNull=false`, otherwise `''`|
+|`LATEST_BY(expr, timestampExpr)`|Returns the latest value of `expr`, which 
must be numeric. The latest value of `expr` is taken from the row with the 
overall latest non-null value of `timestampExpr`. If the overall latest 
non-null value of `timestampExpr` appears in multiple rows, the `expr` may be 
taken from any of those rows.|`null` if 
`druid.generic.useDefaultValueForNull=false`, otherwise `0`|
+|`LATEST_BY(expr, timestampExpr, maxBytesPerString)`|Like `LATEST_BY(expr, 
timestampExpr)`, but for strings. The `maxBytesPerString` parameter determines 
how much aggregation space to allocate per string. Strings longer than this 
limit will be truncated. This parameter should be set as low as possible, since 
high values will lead to wasted memory.|`null` if 
`druid.generic.useDefaultValueForNull=false`, otherwise `''`|
 |`ANY_VALUE(expr)`|Returns any value of `expr` including null. `expr` must be 
numeric. This aggregator can simplify and optimize the performance by returning 
the first encountered value (including null)|`null` if 
`druid.generic.useDefaultValueForNull=false`, otherwise `0`|
 |`ANY_VALUE(expr, maxBytesPerString)`|Like `ANY_VALUE(expr)`, but for strings. 
The `maxBytesPerString` parameter determines how much aggregation space to 
allocate per string. Strings longer than this limit will be truncated. This 
parameter should be set as low as possible, since high values will lead to 
wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, 
otherwise `''`|
 |`GROUPING(expr, expr...)`|Returns a number to indicate which groupBy 
dimension is included in a row, when using `GROUPING SETS`. Refer to 
[additional documentation](aggregations.md#grouping-aggregator) on how to infer 
this number.|N/A|
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
index e7b60b9..c8a5374 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
@@ -54,6 +54,9 @@ public abstract class NumericFirstAggregator<TSelector 
extends BaseNullableColum
   @Override
   public void aggregate()
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     long time = timeSelector.getLong();
     if (time < firstTime) {
       firstTime = time;
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
index ebb0a87..159c6e1 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
@@ -86,6 +86,9 @@ public abstract class NumericFirstBufferAggregator<TSelector 
extends BaseNullabl
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     long time = timeSelector.getLong();
     long firstTime = buf.getLong(position);
     if (time < firstTime) {
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
index 5b581d5..8a6654f 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
@@ -56,6 +56,9 @@ public class StringFirstAggregator implements Aggregator
   @Override
   public void aggregate()
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     if (needsFoldCheck) {
       // Less efficient code path when folding is a possibility (we must read 
the value selector first just in case
       // it's a foldable object).
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
index d84793e..fbf2a41 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
@@ -63,6 +63,9 @@ public class StringFirstBufferAggregator implements 
BufferAggregator
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     if (needsFoldCheck) {
       // Less efficient code path when folding is a possibility (we must read 
the value selector first just in case
       // it's a foldable object).
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
index 6506f97..14f424a 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
@@ -50,6 +50,9 @@ public abstract class NumericLastAggregator<TSelector extends 
BaseNullableColumn
   @Override
   public void aggregate()
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     long time = timeSelector.getLong();
     if (time >= lastTime) {
       lastTime = time;
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
index 7c90aad..4b741e0 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
@@ -89,6 +89,9 @@ public abstract class NumericLastBufferAggregator<TSelector 
extends BaseNullable
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     long time = timeSelector.getLong();
     long lastTime = buf.getLong(position);
     if (time >= lastTime) {
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
index 0c5fe33..a7c33c8 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
@@ -57,6 +57,9 @@ public class StringLastAggregator implements Aggregator
   @Override
   public void aggregate()
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     if (needsFoldCheck) {
       // Less efficient code path when folding is a possibility (we must read 
the value selector first just in case
       // it's a foldable object).
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
index 9da9852..8611ef7 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
@@ -64,6 +64,9 @@ public class StringLastBufferAggregator implements 
BufferAggregator
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     if (needsFoldCheck) {
       // Less efficient code path when folding is a possibility (we must read 
the value selector first just in case
       // it's a foldable object).
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
index 224ecf4..48e305a 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
@@ -204,38 +204,20 @@ public class EarliestLatestAnySqlAggregator implements 
SqlAggregator
         theAggFactory = aggregatorType.createAggregatorFactory(aggregatorName, 
fieldName, null, outputType, -1);
         break;
       case 2:
-        if (!outputType.isNumeric()) { // translates (expr, maxBytesPerString) 
signature
-          theAggFactory = aggregatorType.createAggregatorFactory(
-              aggregatorName,
-              fieldName,
-              null,
-              outputType,
-              RexLiteral.intValue(rexNodes.get(1))
-          );
-        } else { // translates (expr, timeColumn) signature
-          theAggFactory = aggregatorType.createAggregatorFactory(
-              aggregatorName,
-              fieldName,
-              getColumnName(plannerContext, virtualColumnRegistry, 
args.get(1), rexNodes.get(1)),
-              outputType,
-              -1
-          );
-        }
-        break;
-      case 3:
         theAggFactory = aggregatorType.createAggregatorFactory(
             aggregatorName,
             fieldName,
-            getColumnName(plannerContext, virtualColumnRegistry, args.get(2), 
rexNodes.get(2)),
+            null,
             outputType,
             RexLiteral.intValue(rexNodes.get(1))
         );
         break;
       default:
         throw new IAE(
-            "aggregation[%s], Invalid number of arguments[%,d] to 
Earliest/Latest/Any operator",
+            "aggregation[%s], Invalid number of arguments[%,d] to [%s] 
operator",
             aggregatorName,
-            args.size()
+            args.size(),
+            aggregatorType.name()
         );
     }
 
@@ -245,7 +227,7 @@ public class EarliestLatestAnySqlAggregator implements 
SqlAggregator
     );
   }
 
-  private String getColumnName(
+  static String getColumnName(
       PlannerContext plannerContext,
       VirtualColumnRegistry virtualColumnRegistry,
       DruidExpression arg,
@@ -307,20 +289,9 @@ public class EarliestLatestAnySqlAggregator implements 
SqlAggregator
                   "'" + aggregatorType.name() + "(expr, maxBytesPerString)'\n",
                   OperandTypes.ANY,
                   OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL)
-              ),
-              OperandTypes.sequence(
-                  "'" + aggregatorType.name() + "(expr, timeColumn)'\n",
-                  OperandTypes.ANY,
-                  OperandTypes.NUMERIC
-              ),
-              OperandTypes.sequence(
-                  "'" + aggregatorType.name() + "(expr, maxBytesPerString, 
timeColumn)'\n",
-                  OperandTypes.ANY,
-                  OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL),
-                  OperandTypes.NUMERIC
               )
           ),
-          SqlFunctionCategory.STRING,
+          SqlFunctionCategory.USER_DEFINED_FUNCTION,
           false,
           false,
           Optionality.FORBIDDEN
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java
new file mode 100644
index 0000000..f13a918
--- /dev/null
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.aggregation.builtin;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.InferTypes;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.util.Optionality;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import 
org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.aggregation.Aggregation;
+import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.expression.Expressions;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class EarliestLatestBySqlAggregator implements SqlAggregator
+{
+  public static final SqlAggregator EARLIEST_BY = new 
EarliestLatestBySqlAggregator(EarliestLatestAnySqlAggregator.AggregatorType.EARLIEST);
+  public static final SqlAggregator LATEST_BY = new 
EarliestLatestBySqlAggregator(EarliestLatestAnySqlAggregator.AggregatorType.LATEST);
+
+  private final EarliestLatestAnySqlAggregator.AggregatorType aggregatorType;
+  private final SqlAggFunction function;
+
+  private EarliestLatestBySqlAggregator(final 
EarliestLatestAnySqlAggregator.AggregatorType aggregatorType)
+  {
+    this.aggregatorType = aggregatorType;
+    this.function = new EarliestByLatestBySqlAggFunction(aggregatorType);
+  }
+
+  @Override
+  public SqlAggFunction calciteFunction()
+  {
+    return function;
+  }
+
+  @Nullable
+  @Override
+  public Aggregation toDruidAggregation(
+      final PlannerContext plannerContext,
+      final RowSignature rowSignature,
+      final VirtualColumnRegistry virtualColumnRegistry,
+      final RexBuilder rexBuilder,
+      final String name,
+      final AggregateCall aggregateCall,
+      final Project project,
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
+  )
+  {
+    final List<RexNode> rexNodes = aggregateCall
+        .getArgList()
+        .stream()
+        .map(i -> Expressions.fromFieldAccess(rowSignature, project, i))
+        .collect(Collectors.toList());
+
+    final List<DruidExpression> args = 
Expressions.toDruidExpressions(plannerContext, rowSignature, rexNodes);
+
+    if (args == null) {
+      return null;
+    }
+
+    final String aggregatorName = finalizeAggregations ? 
Calcites.makePrefixedName(name, "a") : name;
+    final ColumnType outputType = 
Calcites.getColumnTypeForRelDataType(aggregateCall.getType());
+    if (outputType == null) {
+      throw new ISE(
+          "Cannot translate output sqlTypeName[%s] to Druid type for 
aggregator[%s]",
+          aggregateCall.getType().getSqlTypeName(),
+          aggregateCall.getName()
+      );
+    }
+
+    final String fieldName = 
EarliestLatestAnySqlAggregator.getColumnName(plannerContext, 
virtualColumnRegistry, args.get(0), rexNodes.get(0));
+
+    final AggregatorFactory theAggFactory;
+    switch (args.size()) {
+      case 2:
+        theAggFactory = aggregatorType.createAggregatorFactory(
+            aggregatorName,
+            fieldName,
+            EarliestLatestAnySqlAggregator.getColumnName(plannerContext, 
virtualColumnRegistry, args.get(1), rexNodes.get(1)),
+            outputType,
+            -1
+        );
+        break;
+      case 3:
+        theAggFactory = aggregatorType.createAggregatorFactory(
+            aggregatorName,
+            fieldName,
+            EarliestLatestAnySqlAggregator.getColumnName(plannerContext, 
virtualColumnRegistry, args.get(1), rexNodes.get(1)),
+            outputType,
+            RexLiteral.intValue(rexNodes.get(2))
+        );
+        break;
+      default:
+        throw new IAE(
+            "aggregation[%s], Invalid number of arguments[%,d] to [%s] 
operator",
+            aggregatorName,
+            args.size(),
+            aggregatorType.name()
+        );
+    }
+
+    return Aggregation.create(
+        Collections.singletonList(theAggFactory),
+        finalizeAggregations ? new FinalizingFieldAccessPostAggregator(name, 
aggregatorName) : null
+    );
+  }
+
+  private static class EarliestByLatestBySqlAggFunction extends SqlAggFunction
+  {
+    private static final SqlReturnTypeInference 
EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE =
+        new 
EarliestLatestAnySqlAggregator.EarliestLatestReturnTypeInference(0);
+
+    
EarliestByLatestBySqlAggFunction(EarliestLatestAnySqlAggregator.AggregatorType 
aggregatorType)
+    {
+      super(
+          StringUtils.format("%s_BY", aggregatorType.name()),
+          null,
+          SqlKind.OTHER_FUNCTION,
+          EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE,
+          InferTypes.RETURN_TYPE,
+          OperandTypes.or(
+              OperandTypes.sequence(
+                  "'" + aggregatorType.name() + "(expr, timeColumn)'\n",
+                  OperandTypes.ANY,
+                  OperandTypes.family(SqlTypeFamily.TIMESTAMP)
+              ),
+              OperandTypes.sequence(
+                  "'" + aggregatorType.name() + "(expr, timeColumn, 
maxBytesPerString)'\n",
+                  OperandTypes.ANY,
+                  OperandTypes.family(SqlTypeFamily.TIMESTAMP),
+                  OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL)
+              )
+          ),
+          SqlFunctionCategory.USER_DEFINED_FUNCTION,
+          false,
+          false,
+          Optionality.FORBIDDEN
+      );
+    }
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java
index 41ed11c..11244d4 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java
@@ -38,6 +38,7 @@ import 
org.apache.druid.sql.calcite.aggregation.builtin.AvgSqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.BitwiseSqlAggregator;
 import 
org.apache.druid.sql.calcite.aggregation.builtin.BuiltinApproxCountDistinctSqlAggregator;
 import 
org.apache.druid.sql.calcite.aggregation.builtin.EarliestLatestAnySqlAggregator;
+import 
org.apache.druid.sql.calcite.aggregation.builtin.EarliestLatestBySqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.GroupingSqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.MaxSqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.MinSqlAggregator;
@@ -133,6 +134,8 @@ public class DruidOperatorTable implements SqlOperatorTable
                    .add(EarliestLatestAnySqlAggregator.EARLIEST)
                    .add(EarliestLatestAnySqlAggregator.LATEST)
                    .add(EarliestLatestAnySqlAggregator.ANY_VALUE)
+                   .add(EarliestLatestBySqlAggregator.EARLIEST_BY)
+                   .add(EarliestLatestBySqlAggregator.LATEST_BY)
                    .add(new MinSqlAggregator())
                    .add(new MaxSqlAggregator())
                    .add(new SumSqlAggregator())
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 326c174..3c77792 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -609,12 +609,12 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
         "SELECT "
         + "EARLIEST(cnt), EARLIEST(m1), EARLIEST(dim1, 10), "
         + "EARLIEST(cnt + 1), EARLIEST(m1 + 1), EARLIEST(dim1 || CAST(cnt AS 
VARCHAR), 10), "
-        + "EARLIEST(cnt, m1), EARLIEST(m1, m1), EARLIEST(dim1, 10, m1), "
-        + "EARLIEST(cnt + 1, m1), EARLIEST(m1 + 1, m1), EARLIEST(dim1 || 
CAST(cnt AS VARCHAR), 10, m1) "
-        + "FROM druid.foo",
+        + "EARLIEST_BY(cnt, MILLIS_TO_TIMESTAMP(l1)), EARLIEST_BY(m1, 
MILLIS_TO_TIMESTAMP(l1)), EARLIEST_BY(dim1, MILLIS_TO_TIMESTAMP(l1), 10), "
+        + "EARLIEST_BY(cnt + 1, MILLIS_TO_TIMESTAMP(l1)), EARLIEST_BY(m1 + 1, 
MILLIS_TO_TIMESTAMP(l1)), EARLIEST_BY(dim1 || CAST(cnt AS VARCHAR), 
MILLIS_TO_TIMESTAMP(l1), 10) "
+        + "FROM druid.numfoo",
         ImmutableList.of(
             Druids.newTimeseriesQueryBuilder()
-                  .dataSource(CalciteTests.DATASOURCE1)
+                  .dataSource(CalciteTests.DATASOURCE3)
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .granularity(Granularities.ALL)
                   .virtualColumns(
@@ -630,19 +630,19 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                           new LongFirstAggregatorFactory("a3", "v0", null),
                           new FloatFirstAggregatorFactory("a4", "v1", null),
                           new StringFirstAggregatorFactory("a5", "v2", null, 
10),
-                          new LongFirstAggregatorFactory("a6", "cnt", "m1"),
-                          new FloatFirstAggregatorFactory("a7", "m1", "m1"),
-                          new StringFirstAggregatorFactory("a8", "dim1", "m1", 
10),
-                          new LongFirstAggregatorFactory("a9", "v0", "m1"),
-                          new FloatFirstAggregatorFactory("a10", "v1", "m1"),
-                          new StringFirstAggregatorFactory("a11", "v2", "m1", 
10)
+                          new LongFirstAggregatorFactory("a6", "cnt", "l1"),
+                          new FloatFirstAggregatorFactory("a7", "m1", "l1"),
+                          new StringFirstAggregatorFactory("a8", "dim1", "l1", 
10),
+                          new LongFirstAggregatorFactory("a9", "v0", "l1"),
+                          new FloatFirstAggregatorFactory("a10", "v1", "l1"),
+                          new StringFirstAggregatorFactory("a11", "v2", "l1", 
10)
                       )
                   )
                   .context(QUERY_CONTEXT_DEFAULT)
                   .build()
         ),
         ImmutableList.of(
-            new Object[]{1L, 1.0f, "", 2L, 2.0f, "1", 1L, 1.0f, "", 2L, 2.0f, 
"1"}
+            new Object[]{1L, 1.0f, "", 2L, 2.0f, "1", 1L, 3.0f, "2", 2L, 4.0f, 
"21"}
         )
     );
   }
@@ -657,12 +657,12 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
         "SELECT "
         + "LATEST(cnt), LATEST(m1), LATEST(dim1, 10), "
         + "LATEST(cnt + 1), LATEST(m1 + 1), LATEST(dim1 || CAST(cnt AS 
VARCHAR), 10), "
-        + "LATEST(cnt, m1), LATEST(m1, m1), LATEST(dim1, 10, m1), "
-        + "LATEST(cnt + 1, m1), LATEST(m1 + 1, m1), LATEST(dim1 || CAST(cnt AS 
VARCHAR), 10, m1) "
-        + "FROM druid.foo",
+        + "LATEST_BY(cnt, MILLIS_TO_TIMESTAMP(l1)), LATEST_BY(m1, 
MILLIS_TO_TIMESTAMP(l1)), LATEST_BY(dim1, MILLIS_TO_TIMESTAMP(l1), 10), "
+        + "LATEST_BY(cnt + 1, MILLIS_TO_TIMESTAMP(l1)), LATEST_BY(m1 + 1, 
MILLIS_TO_TIMESTAMP(l1)), LATEST_BY(dim1 || CAST(cnt AS VARCHAR), 
MILLIS_TO_TIMESTAMP(l1), 10) "
+        + "FROM druid.numfoo",
         ImmutableList.of(
             Druids.newTimeseriesQueryBuilder()
-                  .dataSource(CalciteTests.DATASOURCE1)
+                  .dataSource(CalciteTests.DATASOURCE3)
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .granularity(Granularities.ALL)
                   .virtualColumns(
@@ -678,23 +678,49 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                           new LongLastAggregatorFactory("a3", "v0", null),
                           new FloatLastAggregatorFactory("a4", "v1", null),
                           new StringLastAggregatorFactory("a5", "v2", null, 
10),
-                          new LongLastAggregatorFactory("a6", "cnt", "m1"),
-                          new FloatLastAggregatorFactory("a7", "m1", "m1"),
-                          new StringLastAggregatorFactory("a8", "dim1", "m1", 
10),
-                          new LongLastAggregatorFactory("a9", "v0", "m1"),
-                          new FloatLastAggregatorFactory("a10", "v1", "m1"),
-                          new StringLastAggregatorFactory("a11", "v2", "m1", 
10)
+                          new LongLastAggregatorFactory("a6", "cnt", "l1"),
+                          new FloatLastAggregatorFactory("a7", "m1", "l1"),
+                          new StringLastAggregatorFactory("a8", "dim1", "l1", 
10),
+                          new LongLastAggregatorFactory("a9", "v0", "l1"),
+                          new FloatLastAggregatorFactory("a10", "v1", "l1"),
+                          new StringLastAggregatorFactory("a11", "v2", "l1", 
10)
                       )
                   )
                   .context(QUERY_CONTEXT_DEFAULT)
                   .build()
         ),
         ImmutableList.of(
-            new Object[]{1L, 6.0f, "abc", 2L, 7.0f, "abc1", 1L, 6.0f, "abc", 
2L, 7.0f, "abc1"}
+            new Object[]{1L, 6.0f, "abc", 2L, 7.0f, "abc1", 1L, 2.0f, "10.1", 
2L, 3.0f, "10.11"}
         )
     );
   }
 
+  @Test
+  public void testEarliestByInvalidTimestamp() throws Exception
+  {
+    expectedException.expect(SqlPlanningException.class);
+    expectedException.expectMessage("Cannot apply 'EARLIEST_BY' to arguments 
of type 'EARLIEST_BY(<FLOAT>, <BIGINT>)");
+
+    testQuery(
+        "SELECT EARLIEST_BY(m1, l1) FROM druid.numfoo",
+        ImmutableList.of(),
+        ImmutableList.of()
+    );
+  }
+
+  @Test
+  public void testLatestByInvalidTimestamp() throws Exception
+  {
+    expectedException.expect(SqlPlanningException.class);
+    expectedException.expectMessage("Cannot apply 'LATEST_BY' to arguments of 
type 'LATEST_BY(<FLOAT>, <BIGINT>)");
+
+    testQuery(
+        "SELECT LATEST_BY(m1, l1) FROM druid.numfoo",
+        ImmutableList.of(),
+        ImmutableList.of()
+    );
+  }
+
   // This test the on-heap version of the AnyAggregator 
(Double/Float/Long/String)
   @Test
   public void testAnyAggregator() throws Exception

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to