This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch 29.0.1
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/29.0.1 by this push:
     new 44d0d3245c0 [Backport] Handling latest_by and earliest_by on numeric 
columns correctly #15939
44d0d3245c0 is described below

commit 44d0d3245c063e3c9d7a3805632fa1295143b115
Author: Karan Kumar <[email protected]>
AuthorDate: Tue Mar 19 15:31:49 2024 +0530

    [Backport] Handling latest_by and earliest_by on numeric columns correctly 
#15939
    
    Co-authored-by: Soumyava <[email protected]>
---
 .../aggregation/first/NumericFirstAggregator.java  |   8 +-
 .../first/NumericFirstBufferAggregator.java        |   8 +-
 .../aggregation/last/NumericLastAggregator.java    |   9 +-
 .../last/NumericLastBufferAggregator.java          |   8 +-
 .../apache/druid/sql/calcite/CalciteQueryTest.java | 508 +++++++++++----------
 5 files changed, 286 insertions(+), 255 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
index b3092377b57..6b32996b4f2 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
@@ -62,10 +62,6 @@ public abstract class NumericFirstAggregator implements 
Aggregator
   @Override
   public void aggregate()
   {
-    if (timeSelector.isNull()) {
-      return;
-    }
-
     if (needsFoldCheck) {
       final Object object = valueSelector.getObject();
       if (object instanceof SerializablePair) {
@@ -84,6 +80,10 @@ public abstract class NumericFirstAggregator implements 
Aggregator
       }
     }
 
+    if (timeSelector.isNull()) {
+      return;
+    }
+
     long time = timeSelector.getLong();
     if (time < firstTime) {
       firstTime = time;
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
index 4531ee71bcd..f20456d3122 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
@@ -97,10 +97,6 @@ public abstract class NumericFirstBufferAggregator 
implements BufferAggregator
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
-    if (timeSelector.isNull()) {
-      return;
-    }
-
     long firstTime = buf.getLong(position);
     if (needsFoldCheck) {
       final Object object = valueSelector.getObject();
@@ -117,6 +113,10 @@ public abstract class NumericFirstBufferAggregator 
implements BufferAggregator
       }
     }
 
+    if (timeSelector.isNull()) {
+      return;
+    }
+
     long time = timeSelector.getLong();
 
     if (time < firstTime) {
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
index 159939450ee..50d4470fa54 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
@@ -61,10 +61,6 @@ public abstract class NumericLastAggregator implements 
Aggregator
   @Override
   public void aggregate()
   {
-    if (timeSelector.isNull()) {
-      return;
-    }
-
     if (needsFoldCheck) {
       final Object object = valueSelector.getObject();
       if (object instanceof SerializablePair) {
@@ -83,6 +79,11 @@ public abstract class NumericLastAggregator implements 
Aggregator
         return;
       }
     }
+
+    if (timeSelector.isNull()) {
+      return;
+    }
+
     long time = timeSelector.getLong();
     if (time >= lastTime) {
       lastTime = time;
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
index 9de6f996887..2ba15a7929d 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
@@ -100,10 +100,6 @@ public abstract class NumericLastBufferAggregator 
implements BufferAggregator
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
-    if (timeSelector.isNull()) {
-      return;
-    }
-
     long lastTime = buf.getLong(position);
     if (needsFoldCheck) {
       final Object object = valueSelector.getObject();
@@ -121,6 +117,10 @@ public abstract class NumericLastBufferAggregator 
implements BufferAggregator
       }
     }
 
+    if (timeSelector.isNull()) {
+      return;
+    }
+
     long time = timeSelector.getLong();
 
     if (time >= lastTime) {
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 503cf436fe1..8df5d32ddcd 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -14818,12 +14818,12 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
         "select DATE_TRUNC('HOUR', __time), COUNT(*) from druid.foo group by 
DATE_TRUNC('HOUR', __time)",
         ImmutableList.of(
             Druids.newTimeseriesQueryBuilder()
-                           .dataSource(CalciteTests.DATASOURCE1)
-                           .intervals(querySegmentSpec(Filtration.eternity()))
-                           .granularity(Granularities.HOUR)
-                           .aggregators(aggregators(new 
CountAggregatorFactory("a0")))
-                           .context(QUERY_CONTEXT_DEFAULT)
-                           .build()
+                  .dataSource(CalciteTests.DATASOURCE1)
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.HOUR)
+                  .aggregators(aggregators(new CountAggregatorFactory("a0")))
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
         ),
         ImmutableList.of(
             new Object[]{946684800000L, 1L},
@@ -14845,22 +14845,22 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
         "SELECT dim2,LATEST(dim3),LATEST_BY(dim1, 
__time),EARLIEST(dim3),EARLIEST_BY(dim1, __time),ANY_VALUE(dim3) FROM druid.foo 
where dim2='abc' group by 1",
         ImmutableList.of(
             GroupByQuery.builder()
-                .setDataSource(CalciteTests.DATASOURCE1)
-                .setInterval(querySegmentSpec(Filtration.eternity()))
-                .setGranularity(Granularities.ALL)
-                .setVirtualColumns(
-                    expressionVirtualColumn("v0", "'abc'", ColumnType.STRING))
-                .setDimFilter(equality("dim2", "abc", ColumnType.STRING))
-                .setDimensions(
-                    dimensions(new DefaultDimensionSpec("v0", "d0", 
ColumnType.STRING)))
-                .setAggregatorSpecs(
-                    aggregators(
-                        new StringLastAggregatorFactory("a0", "dim3", 
"__time", 1024),
-                        new StringLastAggregatorFactory("a1", "dim1", 
"__time", 1024),
-                        new StringFirstAggregatorFactory("a2", "dim3", 
"__time", 1024),
-                        new StringFirstAggregatorFactory("a3", "dim1", 
"__time", 1024),
-                        new StringAnyAggregatorFactory("a4", "dim3", 1024, 
true)))
-                .build()
+                        .setDataSource(CalciteTests.DATASOURCE1)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setVirtualColumns(
+                            expressionVirtualColumn("v0", "'abc'", 
ColumnType.STRING))
+                        .setDimFilter(equality("dim2", "abc", 
ColumnType.STRING))
+                        .setDimensions(
+                            dimensions(new DefaultDimensionSpec("v0", "d0", 
ColumnType.STRING)))
+                        .setAggregatorSpecs(
+                            aggregators(
+                                new StringLastAggregatorFactory("a0", "dim3", 
"__time", 1024),
+                                new StringLastAggregatorFactory("a1", "dim1", 
"__time", 1024),
+                                new StringFirstAggregatorFactory("a2", "dim3", 
"__time", 1024),
+                                new StringFirstAggregatorFactory("a3", "dim1", 
"__time", 1024),
+                                new StringAnyAggregatorFactory("a4", "dim3", 
1024, true)))
+                        .build()
 
         ),
         ImmutableList.of(
@@ -14954,63 +14954,63 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
     testBuilder()
         .sql(
             "with t AS (SELECT m2, COUNT(m1) as trend_score\n"
-                + "FROM \"foo\"\n"
-                + "GROUP BY 1 \n"
-                + "LIMIT 10\n"
-                + ")\n"
-                + "select m2, (MAX(trend_score)) from t\n"
-                + "where m2 > 2\n"
-                + "GROUP BY 1 \n"
-                + "ORDER BY 2 DESC"
+            + "FROM \"foo\"\n"
+            + "GROUP BY 1 \n"
+            + "LIMIT 10\n"
+            + ")\n"
+            + "select m2, (MAX(trend_score)) from t\n"
+            + "where m2 > 2\n"
+            + "GROUP BY 1 \n"
+            + "ORDER BY 2 DESC"
         )
         .expectedQuery(
             WindowOperatorQueryBuilder.builder()
-                .setDataSource(
-                    new TopNQueryBuilder()
-                        .dataSource(CalciteTests.DATASOURCE1)
-                        .intervals(querySegmentSpec(Filtration.eternity()))
-                        .dimension(new DefaultDimensionSpec("m2", "d0", 
ColumnType.DOUBLE))
-                        .threshold(10)
-                        .aggregators(
-                            aggregators(
-                                useDefault
-                                    ? new CountAggregatorFactory("a0")
-                                    : new FilteredAggregatorFactory(
-                                        new CountAggregatorFactory("a0"),
-                                        notNull("m1")
-                                    )
-                            )
-                        )
-                        .metric(new DimensionTopNMetricSpec(null, 
StringComparators.NUMERIC))
-                        .context(OUTER_LIMIT_CONTEXT)
-                        .build()
-                )
-                .setSignature(
-                    RowSignature.builder()
-                        .add("d0", ColumnType.DOUBLE)
-                        .add("a0", ColumnType.LONG)
-                        .build()
-                )
-                .setOperators(
-                    OperatorFactoryBuilders.naiveSortOperator("a0", 
ColumnWithDirection.Direction.DESC)
-                )
-                .setLeafOperators(
-                    OperatorFactoryBuilders.scanOperatorFactoryBuilder()
-                        .setOffsetLimit(0, Long.MAX_VALUE)
-                        .setFilter(
-                            range(
-                                "d0",
-                                ColumnType.LONG,
-                                2L,
-                                null,
-                                true,
-                                false
-                            )
-                        )
-                        .setProjectedColumns("a0", "d0")
-                        .build()
-                )
-                .build()
+                                      .setDataSource(
+                                          new TopNQueryBuilder()
+                                              
.dataSource(CalciteTests.DATASOURCE1)
+                                              
.intervals(querySegmentSpec(Filtration.eternity()))
+                                              .dimension(new 
DefaultDimensionSpec("m2", "d0", ColumnType.DOUBLE))
+                                              .threshold(10)
+                                              .aggregators(
+                                                  aggregators(
+                                                      useDefault
+                                                      ? new 
CountAggregatorFactory("a0")
+                                                      : new 
FilteredAggregatorFactory(
+                                                          new 
CountAggregatorFactory("a0"),
+                                                          notNull("m1")
+                                                      )
+                                                  )
+                                              )
+                                              .metric(new 
DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+                                              .context(OUTER_LIMIT_CONTEXT)
+                                              .build()
+                                      )
+                                      .setSignature(
+                                          RowSignature.builder()
+                                                      .add("d0", 
ColumnType.DOUBLE)
+                                                      .add("a0", 
ColumnType.LONG)
+                                                      .build()
+                                      )
+                                      .setOperators(
+                                          
OperatorFactoryBuilders.naiveSortOperator("a0", 
ColumnWithDirection.Direction.DESC)
+                                      )
+                                      .setLeafOperators(
+                                          
OperatorFactoryBuilders.scanOperatorFactoryBuilder()
+                                                                 
.setOffsetLimit(0, Long.MAX_VALUE)
+                                                                 .setFilter(
+                                                                     range(
+                                                                         "d0",
+                                                                         
ColumnType.LONG,
+                                                                         2L,
+                                                                         null,
+                                                                         true,
+                                                                         false
+                                                                     )
+                                                                 )
+                                                                 
.setProjectedColumns("a0", "d0")
+                                                                 .build()
+                                      )
+                                      .build()
         )
         .expectedResults(
             ImmutableList.of(
@@ -15029,14 +15029,14 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
     skipVectorize();
     cannotVectorize();
     String sql = "with t AS (SELECT m2 as mo, COUNT(m1) as trend_score\n"
-        + "FROM \"foo\"\n"
-        + "GROUP BY 1\n"
-        + "ORDER BY trend_score DESC\n"
-        + "LIMIT 10)\n"
-        + "select mo, (MAX(trend_score)) from t\n"
-        + "where mo > 2\n"
-        + "GROUP BY 1 \n"
-        + "ORDER BY 2 DESC  LIMIT 2 OFFSET 1\n";
+                 + "FROM \"foo\"\n"
+                 + "GROUP BY 1\n"
+                 + "ORDER BY trend_score DESC\n"
+                 + "LIMIT 10)\n"
+                 + "select mo, (MAX(trend_score)) from t\n"
+                 + "where mo > 2\n"
+                 + "GROUP BY 1 \n"
+                 + "ORDER BY 2 DESC  LIMIT 2 OFFSET 1\n";
     ImmutableList<Object[]> expectedResults = ImmutableList.of(
         new Object[] {4.0D, 1L},
         new Object[] {5.0D, 1L}
@@ -15046,55 +15046,55 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
         .sql(sql)
         .expectedQuery(
             WindowOperatorQueryBuilder.builder()
-                .setDataSource(
-                    new TopNQueryBuilder()
-                        .dataSource(CalciteTests.DATASOURCE1)
-                        .intervals(querySegmentSpec(Filtration.eternity()))
-                        .dimension(new DefaultDimensionSpec("m2", "d0", 
ColumnType.DOUBLE))
-                        .threshold(10)
-                        .aggregators(
-                            aggregators(
-                                useDefault
-                                    ? new CountAggregatorFactory("a0")
-                                    : new FilteredAggregatorFactory(
-                                        new CountAggregatorFactory("a0"),
-                                        notNull("m1")
-                                    )
-                            )
-                        )
-                        .metric(new NumericTopNMetricSpec("a0"))
-                        .context(OUTER_LIMIT_CONTEXT)
-                        .build()
-                )
-                .setSignature(
-                    RowSignature.builder()
-                        .add("d0", ColumnType.DOUBLE)
-                        .add("a0", ColumnType.LONG)
-                        .build()
-                )
-                .setOperators(
-                    OperatorFactoryBuilders.naiveSortOperator("a0", 
ColumnWithDirection.Direction.DESC),
-                    OperatorFactoryBuilders.scanOperatorFactoryBuilder()
-                        .setOffsetLimit(1, 2)
-                        .build()
-                )
-                .setLeafOperators(
-                    OperatorFactoryBuilders.scanOperatorFactoryBuilder()
-                        .setOffsetLimit(0, Long.MAX_VALUE)
-                        .setFilter(
-                            range(
-                                "d0",
-                                ColumnType.LONG,
-                                2L,
-                                null,
-                                true,
-                                false
-                            )
-                        )
-                        .setProjectedColumns("a0", "d0")
-                        .build()
-                )
-                .build()
+                                      .setDataSource(
+                                          new TopNQueryBuilder()
+                                              
.dataSource(CalciteTests.DATASOURCE1)
+                                              
.intervals(querySegmentSpec(Filtration.eternity()))
+                                              .dimension(new 
DefaultDimensionSpec("m2", "d0", ColumnType.DOUBLE))
+                                              .threshold(10)
+                                              .aggregators(
+                                                  aggregators(
+                                                      useDefault
+                                                      ? new 
CountAggregatorFactory("a0")
+                                                      : new 
FilteredAggregatorFactory(
+                                                          new 
CountAggregatorFactory("a0"),
+                                                          notNull("m1")
+                                                      )
+                                                  )
+                                              )
+                                              .metric(new 
NumericTopNMetricSpec("a0"))
+                                              .context(OUTER_LIMIT_CONTEXT)
+                                              .build()
+                                      )
+                                      .setSignature(
+                                          RowSignature.builder()
+                                                      .add("d0", 
ColumnType.DOUBLE)
+                                                      .add("a0", 
ColumnType.LONG)
+                                                      .build()
+                                      )
+                                      .setOperators(
+                                          
OperatorFactoryBuilders.naiveSortOperator("a0", 
ColumnWithDirection.Direction.DESC),
+                                          
OperatorFactoryBuilders.scanOperatorFactoryBuilder()
+                                                                 
.setOffsetLimit(1, 2)
+                                                                 .build()
+                                      )
+                                      .setLeafOperators(
+                                          
OperatorFactoryBuilders.scanOperatorFactoryBuilder()
+                                                                 
.setOffsetLimit(0, Long.MAX_VALUE)
+                                                                 .setFilter(
+                                                                     range(
+                                                                         "d0",
+                                                                         
ColumnType.LONG,
+                                                                         2L,
+                                                                         null,
+                                                                         true,
+                                                                         false
+                                                                     )
+                                                                 )
+                                                                 
.setProjectedColumns("a0", "d0")
+                                                                 .build()
+                                      )
+                                      .build()
         )
         .expectedResults(expectedResults)
         .run();
@@ -15108,14 +15108,14 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
     cannotVectorize();
     msqIncompatible();
     String sql = "with t AS (\n"
-        + "SELECT  \n"
-        + "    RANK() OVER (PARTITION BY m2 ORDER BY m2 ASC) \n"
-        + "      AS ranking,\n"
-        + "    COUNT(m1) as trend_score\n"
-        + "FROM foo\n"
-        + "GROUP BY m2,m1 LIMIT 10\n"
-        + ")\n"
-        + "select ranking, trend_score from t ORDER BY trend_score";
+                 + "SELECT  \n"
+                 + "    RANK() OVER (PARTITION BY m2 ORDER BY m2 ASC) \n"
+                 + "      AS ranking,\n"
+                 + "    COUNT(m1) as trend_score\n"
+                 + "FROM foo\n"
+                 + "GROUP BY m2,m1 LIMIT 10\n"
+                 + ")\n"
+                 + "select ranking, trend_score from t ORDER BY trend_score";
     ImmutableList<Object[]> expectedResults = ImmutableList.of(
         new Object[] {1L, 1L},
         new Object[] {1L, 1L},
@@ -15130,71 +15130,71 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
         .queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, 
true))
         .expectedQuery(
             WindowOperatorQueryBuilder.builder()
-                .setDataSource(
-                    Druids.newScanQueryBuilder()
-                        .dataSource(
-                            new WindowOperatorQueryBuilder()
-                                .setDataSource(
-                                    GroupByQuery.builder()
-                                        
.setDataSource(CalciteTests.DATASOURCE1)
-                                        
.setInterval(querySegmentSpec(Filtration.eternity()))
-                                        .setGranularity(Granularities.ALL)
-                                        .setDimensions(
-                                            dimensions(
-                                                new DefaultDimensionSpec("m2", 
"d0", ColumnType.DOUBLE),
-                                                new DefaultDimensionSpec("m1", 
"d1", ColumnType.FLOAT)
-                                            )
-                                        )
-                                        .setAggregatorSpecs(
-                                            aggregators(
-                                                useDefault
-                                                    ? new 
CountAggregatorFactory("a0")
-                                                    : new 
FilteredAggregatorFactory(
-                                                        new 
CountAggregatorFactory("a0"),
-                                                        notNull("m1")
-                                                    )
-                                            )
-                                        )
-                                        .build()
-                                )
-                                .setOperators(
-                                    
OperatorFactoryBuilders.naivePartitionOperator("d0"),
-                                    OperatorFactoryBuilders.windowOperators(
-                                        
OperatorFactoryBuilders.rankProcessor("w0", "d0")
-                                    )
-                                )
-                                .setSignature(
-                                    RowSignature.builder()
-                                        .add("w0", ColumnType.LONG)
-                                        .add("a0", ColumnType.LONG)
-                                        .build()
-                                )
-                                .build()
-                        )
-                        .intervals(querySegmentSpec(Filtration.eternity()))
-                        .columns("a0", "w0")
-                        .context(QUERY_CONTEXT_DEFAULT)
-                        
.resultFormat(ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
-                        .legacy(false)
-                        .limit(10)
-                        .build()
-                )
-                .setSignature(
-                    RowSignature.builder()
-                        .add("w0", ColumnType.LONG)
-                        .add("a0", ColumnType.LONG)
-                        .build()
-                )
-                .setOperators(
-                    OperatorFactoryBuilders.naiveSortOperator("a0", 
ColumnWithDirection.Direction.ASC)
-                )
-                .setLeafOperators(
-                    OperatorFactoryBuilders.scanOperatorFactoryBuilder()
-                        .setOffsetLimit(0, Long.MAX_VALUE)
-                        .setProjectedColumns("a0", "w0")
-                        .build()
-                )
-                .build()
+                                      .setDataSource(
+                                          Druids.newScanQueryBuilder()
+                                                .dataSource(
+                                                    new 
WindowOperatorQueryBuilder()
+                                                        .setDataSource(
+                                                            
GroupByQuery.builder()
+                                                                        
.setDataSource(CalciteTests.DATASOURCE1)
+                                                                        
.setInterval(querySegmentSpec(Filtration.eternity()))
+                                                                        
.setGranularity(Granularities.ALL)
+                                                                        
.setDimensions(
+                                                                            
dimensions(
+                                                                               
 new DefaultDimensionSpec("m2", "d0", ColumnType.DOUBLE),
+                                                                               
 new DefaultDimensionSpec("m1", "d1", ColumnType.FLOAT)
+                                                                            )
+                                                                        )
+                                                                        
.setAggregatorSpecs(
+                                                                            
aggregators(
+                                                                               
 useDefault
+                                                                               
 ? new CountAggregatorFactory("a0")
+                                                                               
 : new FilteredAggregatorFactory(
+                                                                               
     new CountAggregatorFactory("a0"),
+                                                                               
     notNull("m1")
+                                                                               
 )
+                                                                            )
+                                                                        )
+                                                                        
.build()
+                                                        )
+                                                        .setOperators(
+                                                            
OperatorFactoryBuilders.naivePartitionOperator("d0"),
+                                                            
OperatorFactoryBuilders.windowOperators(
+                                                                
OperatorFactoryBuilders.rankProcessor("w0", "d0")
+                                                            )
+                                                        )
+                                                        .setSignature(
+                                                            
RowSignature.builder()
+                                                                        
.add("w0", ColumnType.LONG)
+                                                                        
.add("a0", ColumnType.LONG)
+                                                                        
.build()
+                                                        )
+                                                        .build()
+                                                )
+                                                
.intervals(querySegmentSpec(Filtration.eternity()))
+                                                .columns("a0", "w0")
+                                                .context(QUERY_CONTEXT_DEFAULT)
+                                                
.resultFormat(ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                                                .legacy(false)
+                                                .limit(10)
+                                                .build()
+                                      )
+                                      .setSignature(
+                                          RowSignature.builder()
+                                                      .add("w0", 
ColumnType.LONG)
+                                                      .add("a0", 
ColumnType.LONG)
+                                                      .build()
+                                      )
+                                      .setOperators(
+                                          
OperatorFactoryBuilders.naiveSortOperator("a0", 
ColumnWithDirection.Direction.ASC)
+                                      )
+                                      .setLeafOperators(
+                                          
OperatorFactoryBuilders.scanOperatorFactoryBuilder()
+                                                                 
.setOffsetLimit(0, Long.MAX_VALUE)
+                                                                 
.setProjectedColumns("a0", "w0")
+                                                                 .build()
+                                      )
+                                      .build()
         )
         .expectedResults(expectedResults)
         .run();
@@ -15209,11 +15209,11 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
     testBuilder()
         .sql(
             "SELECT\n"
-                + "  FLOOR(__time TO DAY) t,\n"
-                + "  SUM(cnt) c,\n"
-                + "  SUM(SUM(cnt)) OVER (ORDER BY FLOOR(__time TO DAY)) cc\n"
-                + "FROM foo\n"
-                + "GROUP BY FLOOR(__time TO DAY)"
+            + "  FLOOR(__time TO DAY) t,\n"
+            + "  SUM(cnt) c,\n"
+            + "  SUM(SUM(cnt)) OVER (ORDER BY FLOOR(__time TO DAY)) cc\n"
+            + "FROM foo\n"
+            + "GROUP BY FLOOR(__time TO DAY)"
         )
         .queryContext(
             ImmutableMap.of(
@@ -15223,34 +15223,34 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
         )
         .expectedQuery(
             WindowOperatorQueryBuilder.builder()
-                .setDataSource(
-                    Druids.newTimeseriesQueryBuilder()
-                        .dataSource(CalciteTests.DATASOURCE1)
-                        .intervals(querySegmentSpec(Filtration.eternity()))
-                        .granularity(Granularities.DAY)
-                        .aggregators(
-                            new LongSumAggregatorFactory("a0", "cnt")
-                        )
-                        .context(OUTER_LIMIT_CONTEXT)
-                        .build()
-                )
-                .setSignature(
-                    RowSignature.builder()
-                        .add("d0", ColumnType.LONG)
-                        .add("a0", ColumnType.LONG)
-                        .add("w0", ColumnType.LONG)
-                        .build()
-                )
-                .setOperators(
-                    OperatorFactoryBuilders.naivePartitionOperator(),
-                    OperatorFactoryBuilders.windowOperators(
-                        OperatorFactoryBuilders.framedAggregateProcessor(
-                            
WindowFrame.forOrderBy(ColumnWithDirection.ascending("d0")),
-                            new LongSumAggregatorFactory("w0", "a0")
-                        )
-                    )
-                )
-                .build()
+                                      .setDataSource(
+                                          Druids.newTimeseriesQueryBuilder()
+                                                
.dataSource(CalciteTests.DATASOURCE1)
+                                                
.intervals(querySegmentSpec(Filtration.eternity()))
+                                                .granularity(Granularities.DAY)
+                                                .aggregators(
+                                                    new 
LongSumAggregatorFactory("a0", "cnt")
+                                                )
+                                                .context(OUTER_LIMIT_CONTEXT)
+                                                .build()
+                                      )
+                                      .setSignature(
+                                          RowSignature.builder()
+                                                      .add("d0", 
ColumnType.LONG)
+                                                      .add("a0", 
ColumnType.LONG)
+                                                      .add("w0", 
ColumnType.LONG)
+                                                      .build()
+                                      )
+                                      .setOperators(
+                                          
OperatorFactoryBuilders.naivePartitionOperator(),
+                                          
OperatorFactoryBuilders.windowOperators(
+                                              
OperatorFactoryBuilders.framedAggregateProcessor(
+                                                  
WindowFrame.forOrderBy(ColumnWithDirection.ascending("d0")),
+                                                  new 
LongSumAggregatorFactory("w0", "a0")
+                                              )
+                                          )
+                                      )
+                                      .build()
         )
         .expectedResults(
             ImmutableList.of(
@@ -15264,4 +15264,34 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
         )
         .run();
   }
+
+  @Test
+  public void testLatestByAggregatorOnSecondaryTimestampGroupBy()
+  {
+    msqIncompatible();
+    testQuery(
+        "SELECT __time, m1, LATEST_BY(m1, MILLIS_TO_TIMESTAMP(CAST(m2 AS 
NUMERIC))) from druid.numfoo GROUP BY 1,2",
+        ImmutableList.of(
+            new GroupByQuery.Builder()
+                .setDataSource(CalciteTests.DATASOURCE3)
+                .setInterval(querySegmentSpec(Filtration.eternity()))
+                .setGranularity(Granularities.ALL)
+                .setDimensions(
+                    new DefaultDimensionSpec("__time", "_d0", ColumnType.LONG),
+                    new DefaultDimensionSpec("m1", "_d1", ColumnType.FLOAT)
+                )
+                .setAggregatorSpecs(aggregators(new 
FloatLastAggregatorFactory("a0", "m1", "m2")))
+                .setContext(OUTER_LIMIT_CONTEXT)
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{946684800000L, 1.0F, 1.0F},
+            new Object[]{946771200000L, 2.0F, 2.0F},
+            new Object[]{946857600000L, 3.0F, 3.0F},
+            new Object[]{978307200000L, 4.0F, 4.0F},
+            new Object[]{978393600000L, 5.0F, 5.0F},
+            new Object[]{978480000000L, 6.0F, 6.0F}
+        )
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to