This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch 29.0.1
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/29.0.1 by this push:
new 44d0d3245c0 [Backport] Handling latest_by and earliest_by on numeric
columns correctly #15939
44d0d3245c0 is described below
commit 44d0d3245c063e3c9d7a3805632fa1295143b115
Author: Karan Kumar <[email protected]>
AuthorDate: Tue Mar 19 15:31:49 2024 +0530
[Backport] Handling latest_by and earliest_by on numeric columns correctly
#15939
Co-authored-by: Soumyava <[email protected]>
---
.../aggregation/first/NumericFirstAggregator.java | 8 +-
.../first/NumericFirstBufferAggregator.java | 8 +-
.../aggregation/last/NumericLastAggregator.java | 9 +-
.../last/NumericLastBufferAggregator.java | 8 +-
.../apache/druid/sql/calcite/CalciteQueryTest.java | 508 +++++++++++----------
5 files changed, 286 insertions(+), 255 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
index b3092377b57..6b32996b4f2 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
@@ -62,10 +62,6 @@ public abstract class NumericFirstAggregator implements
Aggregator
@Override
public void aggregate()
{
- if (timeSelector.isNull()) {
- return;
- }
-
if (needsFoldCheck) {
final Object object = valueSelector.getObject();
if (object instanceof SerializablePair) {
@@ -84,6 +80,10 @@ public abstract class NumericFirstAggregator implements
Aggregator
}
}
+ if (timeSelector.isNull()) {
+ return;
+ }
+
long time = timeSelector.getLong();
if (time < firstTime) {
firstTime = time;
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
index 4531ee71bcd..f20456d3122 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
@@ -97,10 +97,6 @@ public abstract class NumericFirstBufferAggregator
implements BufferAggregator
@Override
public void aggregate(ByteBuffer buf, int position)
{
- if (timeSelector.isNull()) {
- return;
- }
-
long firstTime = buf.getLong(position);
if (needsFoldCheck) {
final Object object = valueSelector.getObject();
@@ -117,6 +113,10 @@ public abstract class NumericFirstBufferAggregator
implements BufferAggregator
}
}
+ if (timeSelector.isNull()) {
+ return;
+ }
+
long time = timeSelector.getLong();
if (time < firstTime) {
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
index 159939450ee..50d4470fa54 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
@@ -61,10 +61,6 @@ public abstract class NumericLastAggregator implements
Aggregator
@Override
public void aggregate()
{
- if (timeSelector.isNull()) {
- return;
- }
-
if (needsFoldCheck) {
final Object object = valueSelector.getObject();
if (object instanceof SerializablePair) {
@@ -83,6 +79,11 @@ public abstract class NumericLastAggregator implements
Aggregator
return;
}
}
+
+ if (timeSelector.isNull()) {
+ return;
+ }
+
long time = timeSelector.getLong();
if (time >= lastTime) {
lastTime = time;
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
index 9de6f996887..2ba15a7929d 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
@@ -100,10 +100,6 @@ public abstract class NumericLastBufferAggregator
implements BufferAggregator
@Override
public void aggregate(ByteBuffer buf, int position)
{
- if (timeSelector.isNull()) {
- return;
- }
-
long lastTime = buf.getLong(position);
if (needsFoldCheck) {
final Object object = valueSelector.getObject();
@@ -121,6 +117,10 @@ public abstract class NumericLastBufferAggregator
implements BufferAggregator
}
}
+ if (timeSelector.isNull()) {
+ return;
+ }
+
long time = timeSelector.getLong();
if (time >= lastTime) {
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 503cf436fe1..8df5d32ddcd 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -14818,12 +14818,12 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
"select DATE_TRUNC('HOUR', __time), COUNT(*) from druid.foo group by
DATE_TRUNC('HOUR', __time)",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .granularity(Granularities.HOUR)
- .aggregators(aggregators(new
CountAggregatorFactory("a0")))
- .context(QUERY_CONTEXT_DEFAULT)
- .build()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.HOUR)
+ .aggregators(aggregators(new CountAggregatorFactory("a0")))
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
),
ImmutableList.of(
new Object[]{946684800000L, 1L},
@@ -14845,22 +14845,22 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
"SELECT dim2,LATEST(dim3),LATEST_BY(dim1,
__time),EARLIEST(dim3),EARLIEST_BY(dim1, __time),ANY_VALUE(dim3) FROM druid.foo
where dim2='abc' group by 1",
ImmutableList.of(
GroupByQuery.builder()
- .setDataSource(CalciteTests.DATASOURCE1)
- .setInterval(querySegmentSpec(Filtration.eternity()))
- .setGranularity(Granularities.ALL)
- .setVirtualColumns(
- expressionVirtualColumn("v0", "'abc'", ColumnType.STRING))
- .setDimFilter(equality("dim2", "abc", ColumnType.STRING))
- .setDimensions(
- dimensions(new DefaultDimensionSpec("v0", "d0",
ColumnType.STRING)))
- .setAggregatorSpecs(
- aggregators(
- new StringLastAggregatorFactory("a0", "dim3",
"__time", 1024),
- new StringLastAggregatorFactory("a1", "dim1",
"__time", 1024),
- new StringFirstAggregatorFactory("a2", "dim3",
"__time", 1024),
- new StringFirstAggregatorFactory("a3", "dim1",
"__time", 1024),
- new StringAnyAggregatorFactory("a4", "dim3", 1024,
true)))
- .build()
+ .setDataSource(CalciteTests.DATASOURCE1)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setVirtualColumns(
+ expressionVirtualColumn("v0", "'abc'",
ColumnType.STRING))
+ .setDimFilter(equality("dim2", "abc",
ColumnType.STRING))
+ .setDimensions(
+ dimensions(new DefaultDimensionSpec("v0", "d0",
ColumnType.STRING)))
+ .setAggregatorSpecs(
+ aggregators(
+ new StringLastAggregatorFactory("a0", "dim3",
"__time", 1024),
+ new StringLastAggregatorFactory("a1", "dim1",
"__time", 1024),
+ new StringFirstAggregatorFactory("a2", "dim3",
"__time", 1024),
+ new StringFirstAggregatorFactory("a3", "dim1",
"__time", 1024),
+ new StringAnyAggregatorFactory("a4", "dim3",
1024, true)))
+ .build()
),
ImmutableList.of(
@@ -14954,63 +14954,63 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
testBuilder()
.sql(
"with t AS (SELECT m2, COUNT(m1) as trend_score\n"
- + "FROM \"foo\"\n"
- + "GROUP BY 1 \n"
- + "LIMIT 10\n"
- + ")\n"
- + "select m2, (MAX(trend_score)) from t\n"
- + "where m2 > 2\n"
- + "GROUP BY 1 \n"
- + "ORDER BY 2 DESC"
+ + "FROM \"foo\"\n"
+ + "GROUP BY 1 \n"
+ + "LIMIT 10\n"
+ + ")\n"
+ + "select m2, (MAX(trend_score)) from t\n"
+ + "where m2 > 2\n"
+ + "GROUP BY 1 \n"
+ + "ORDER BY 2 DESC"
)
.expectedQuery(
WindowOperatorQueryBuilder.builder()
- .setDataSource(
- new TopNQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .dimension(new DefaultDimensionSpec("m2", "d0",
ColumnType.DOUBLE))
- .threshold(10)
- .aggregators(
- aggregators(
- useDefault
- ? new CountAggregatorFactory("a0")
- : new FilteredAggregatorFactory(
- new CountAggregatorFactory("a0"),
- notNull("m1")
- )
- )
- )
- .metric(new DimensionTopNMetricSpec(null,
StringComparators.NUMERIC))
- .context(OUTER_LIMIT_CONTEXT)
- .build()
- )
- .setSignature(
- RowSignature.builder()
- .add("d0", ColumnType.DOUBLE)
- .add("a0", ColumnType.LONG)
- .build()
- )
- .setOperators(
- OperatorFactoryBuilders.naiveSortOperator("a0",
ColumnWithDirection.Direction.DESC)
- )
- .setLeafOperators(
- OperatorFactoryBuilders.scanOperatorFactoryBuilder()
- .setOffsetLimit(0, Long.MAX_VALUE)
- .setFilter(
- range(
- "d0",
- ColumnType.LONG,
- 2L,
- null,
- true,
- false
- )
- )
- .setProjectedColumns("a0", "d0")
- .build()
- )
- .build()
+ .setDataSource(
+ new TopNQueryBuilder()
+
.dataSource(CalciteTests.DATASOURCE1)
+
.intervals(querySegmentSpec(Filtration.eternity()))
+ .dimension(new
DefaultDimensionSpec("m2", "d0", ColumnType.DOUBLE))
+ .threshold(10)
+ .aggregators(
+ aggregators(
+ useDefault
+ ? new
CountAggregatorFactory("a0")
+ : new
FilteredAggregatorFactory(
+ new
CountAggregatorFactory("a0"),
+ notNull("m1")
+ )
+ )
+ )
+ .metric(new
DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+ .context(OUTER_LIMIT_CONTEXT)
+ .build()
+ )
+ .setSignature(
+ RowSignature.builder()
+ .add("d0",
ColumnType.DOUBLE)
+ .add("a0",
ColumnType.LONG)
+ .build()
+ )
+ .setOperators(
+
OperatorFactoryBuilders.naiveSortOperator("a0",
ColumnWithDirection.Direction.DESC)
+ )
+ .setLeafOperators(
+
OperatorFactoryBuilders.scanOperatorFactoryBuilder()
+
.setOffsetLimit(0, Long.MAX_VALUE)
+ .setFilter(
+ range(
+ "d0",
+
ColumnType.LONG,
+ 2L,
+ null,
+ true,
+ false
+ )
+ )
+
.setProjectedColumns("a0", "d0")
+ .build()
+ )
+ .build()
)
.expectedResults(
ImmutableList.of(
@@ -15029,14 +15029,14 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
skipVectorize();
cannotVectorize();
String sql = "with t AS (SELECT m2 as mo, COUNT(m1) as trend_score\n"
- + "FROM \"foo\"\n"
- + "GROUP BY 1\n"
- + "ORDER BY trend_score DESC\n"
- + "LIMIT 10)\n"
- + "select mo, (MAX(trend_score)) from t\n"
- + "where mo > 2\n"
- + "GROUP BY 1 \n"
- + "ORDER BY 2 DESC LIMIT 2 OFFSET 1\n";
+ + "FROM \"foo\"\n"
+ + "GROUP BY 1\n"
+ + "ORDER BY trend_score DESC\n"
+ + "LIMIT 10)\n"
+ + "select mo, (MAX(trend_score)) from t\n"
+ + "where mo > 2\n"
+ + "GROUP BY 1 \n"
+ + "ORDER BY 2 DESC LIMIT 2 OFFSET 1\n";
ImmutableList<Object[]> expectedResults = ImmutableList.of(
new Object[] {4.0D, 1L},
new Object[] {5.0D, 1L}
@@ -15046,55 +15046,55 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
.sql(sql)
.expectedQuery(
WindowOperatorQueryBuilder.builder()
- .setDataSource(
- new TopNQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .dimension(new DefaultDimensionSpec("m2", "d0",
ColumnType.DOUBLE))
- .threshold(10)
- .aggregators(
- aggregators(
- useDefault
- ? new CountAggregatorFactory("a0")
- : new FilteredAggregatorFactory(
- new CountAggregatorFactory("a0"),
- notNull("m1")
- )
- )
- )
- .metric(new NumericTopNMetricSpec("a0"))
- .context(OUTER_LIMIT_CONTEXT)
- .build()
- )
- .setSignature(
- RowSignature.builder()
- .add("d0", ColumnType.DOUBLE)
- .add("a0", ColumnType.LONG)
- .build()
- )
- .setOperators(
- OperatorFactoryBuilders.naiveSortOperator("a0",
ColumnWithDirection.Direction.DESC),
- OperatorFactoryBuilders.scanOperatorFactoryBuilder()
- .setOffsetLimit(1, 2)
- .build()
- )
- .setLeafOperators(
- OperatorFactoryBuilders.scanOperatorFactoryBuilder()
- .setOffsetLimit(0, Long.MAX_VALUE)
- .setFilter(
- range(
- "d0",
- ColumnType.LONG,
- 2L,
- null,
- true,
- false
- )
- )
- .setProjectedColumns("a0", "d0")
- .build()
- )
- .build()
+ .setDataSource(
+ new TopNQueryBuilder()
+
.dataSource(CalciteTests.DATASOURCE1)
+
.intervals(querySegmentSpec(Filtration.eternity()))
+ .dimension(new
DefaultDimensionSpec("m2", "d0", ColumnType.DOUBLE))
+ .threshold(10)
+ .aggregators(
+ aggregators(
+ useDefault
+ ? new
CountAggregatorFactory("a0")
+ : new
FilteredAggregatorFactory(
+ new
CountAggregatorFactory("a0"),
+ notNull("m1")
+ )
+ )
+ )
+ .metric(new
NumericTopNMetricSpec("a0"))
+ .context(OUTER_LIMIT_CONTEXT)
+ .build()
+ )
+ .setSignature(
+ RowSignature.builder()
+ .add("d0",
ColumnType.DOUBLE)
+ .add("a0",
ColumnType.LONG)
+ .build()
+ )
+ .setOperators(
+
OperatorFactoryBuilders.naiveSortOperator("a0",
ColumnWithDirection.Direction.DESC),
+
OperatorFactoryBuilders.scanOperatorFactoryBuilder()
+
.setOffsetLimit(1, 2)
+ .build()
+ )
+ .setLeafOperators(
+
OperatorFactoryBuilders.scanOperatorFactoryBuilder()
+
.setOffsetLimit(0, Long.MAX_VALUE)
+ .setFilter(
+ range(
+ "d0",
+
ColumnType.LONG,
+ 2L,
+ null,
+ true,
+ false
+ )
+ )
+
.setProjectedColumns("a0", "d0")
+ .build()
+ )
+ .build()
)
.expectedResults(expectedResults)
.run();
@@ -15108,14 +15108,14 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
cannotVectorize();
msqIncompatible();
String sql = "with t AS (\n"
- + "SELECT \n"
- + " RANK() OVER (PARTITION BY m2 ORDER BY m2 ASC) \n"
- + " AS ranking,\n"
- + " COUNT(m1) as trend_score\n"
- + "FROM foo\n"
- + "GROUP BY m2,m1 LIMIT 10\n"
- + ")\n"
- + "select ranking, trend_score from t ORDER BY trend_score";
+ + "SELECT \n"
+ + " RANK() OVER (PARTITION BY m2 ORDER BY m2 ASC) \n"
+ + " AS ranking,\n"
+ + " COUNT(m1) as trend_score\n"
+ + "FROM foo\n"
+ + "GROUP BY m2,m1 LIMIT 10\n"
+ + ")\n"
+ + "select ranking, trend_score from t ORDER BY trend_score";
ImmutableList<Object[]> expectedResults = ImmutableList.of(
new Object[] {1L, 1L},
new Object[] {1L, 1L},
@@ -15130,71 +15130,71 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS,
true))
.expectedQuery(
WindowOperatorQueryBuilder.builder()
- .setDataSource(
- Druids.newScanQueryBuilder()
- .dataSource(
- new WindowOperatorQueryBuilder()
- .setDataSource(
- GroupByQuery.builder()
-
.setDataSource(CalciteTests.DATASOURCE1)
-
.setInterval(querySegmentSpec(Filtration.eternity()))
- .setGranularity(Granularities.ALL)
- .setDimensions(
- dimensions(
- new DefaultDimensionSpec("m2",
"d0", ColumnType.DOUBLE),
- new DefaultDimensionSpec("m1",
"d1", ColumnType.FLOAT)
- )
- )
- .setAggregatorSpecs(
- aggregators(
- useDefault
- ? new
CountAggregatorFactory("a0")
- : new
FilteredAggregatorFactory(
- new
CountAggregatorFactory("a0"),
- notNull("m1")
- )
- )
- )
- .build()
- )
- .setOperators(
-
OperatorFactoryBuilders.naivePartitionOperator("d0"),
- OperatorFactoryBuilders.windowOperators(
-
OperatorFactoryBuilders.rankProcessor("w0", "d0")
- )
- )
- .setSignature(
- RowSignature.builder()
- .add("w0", ColumnType.LONG)
- .add("a0", ColumnType.LONG)
- .build()
- )
- .build()
- )
- .intervals(querySegmentSpec(Filtration.eternity()))
- .columns("a0", "w0")
- .context(QUERY_CONTEXT_DEFAULT)
-
.resultFormat(ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
- .legacy(false)
- .limit(10)
- .build()
- )
- .setSignature(
- RowSignature.builder()
- .add("w0", ColumnType.LONG)
- .add("a0", ColumnType.LONG)
- .build()
- )
- .setOperators(
- OperatorFactoryBuilders.naiveSortOperator("a0",
ColumnWithDirection.Direction.ASC)
- )
- .setLeafOperators(
- OperatorFactoryBuilders.scanOperatorFactoryBuilder()
- .setOffsetLimit(0, Long.MAX_VALUE)
- .setProjectedColumns("a0", "w0")
- .build()
- )
- .build()
+ .setDataSource(
+ Druids.newScanQueryBuilder()
+ .dataSource(
+ new
WindowOperatorQueryBuilder()
+ .setDataSource(
+
GroupByQuery.builder()
+
.setDataSource(CalciteTests.DATASOURCE1)
+
.setInterval(querySegmentSpec(Filtration.eternity()))
+
.setGranularity(Granularities.ALL)
+
.setDimensions(
+
dimensions(
+
new DefaultDimensionSpec("m2", "d0", ColumnType.DOUBLE),
+
new DefaultDimensionSpec("m1", "d1", ColumnType.FLOAT)
+ )
+ )
+
.setAggregatorSpecs(
+
aggregators(
+
useDefault
+
? new CountAggregatorFactory("a0")
+
: new FilteredAggregatorFactory(
+
new CountAggregatorFactory("a0"),
+
notNull("m1")
+
)
+ )
+ )
+
.build()
+ )
+ .setOperators(
+
OperatorFactoryBuilders.naivePartitionOperator("d0"),
+
OperatorFactoryBuilders.windowOperators(
+
OperatorFactoryBuilders.rankProcessor("w0", "d0")
+ )
+ )
+ .setSignature(
+
RowSignature.builder()
+
.add("w0", ColumnType.LONG)
+
.add("a0", ColumnType.LONG)
+
.build()
+ )
+ .build()
+ )
+
.intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("a0", "w0")
+ .context(QUERY_CONTEXT_DEFAULT)
+
.resultFormat(ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+ .legacy(false)
+ .limit(10)
+ .build()
+ )
+ .setSignature(
+ RowSignature.builder()
+ .add("w0",
ColumnType.LONG)
+ .add("a0",
ColumnType.LONG)
+ .build()
+ )
+ .setOperators(
+
OperatorFactoryBuilders.naiveSortOperator("a0",
ColumnWithDirection.Direction.ASC)
+ )
+ .setLeafOperators(
+
OperatorFactoryBuilders.scanOperatorFactoryBuilder()
+
.setOffsetLimit(0, Long.MAX_VALUE)
+
.setProjectedColumns("a0", "w0")
+ .build()
+ )
+ .build()
)
.expectedResults(expectedResults)
.run();
@@ -15209,11 +15209,11 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
testBuilder()
.sql(
"SELECT\n"
- + " FLOOR(__time TO DAY) t,\n"
- + " SUM(cnt) c,\n"
- + " SUM(SUM(cnt)) OVER (ORDER BY FLOOR(__time TO DAY)) cc\n"
- + "FROM foo\n"
- + "GROUP BY FLOOR(__time TO DAY)"
+ + " FLOOR(__time TO DAY) t,\n"
+ + " SUM(cnt) c,\n"
+ + " SUM(SUM(cnt)) OVER (ORDER BY FLOOR(__time TO DAY)) cc\n"
+ + "FROM foo\n"
+ + "GROUP BY FLOOR(__time TO DAY)"
)
.queryContext(
ImmutableMap.of(
@@ -15223,34 +15223,34 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
)
.expectedQuery(
WindowOperatorQueryBuilder.builder()
- .setDataSource(
- Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .granularity(Granularities.DAY)
- .aggregators(
- new LongSumAggregatorFactory("a0", "cnt")
- )
- .context(OUTER_LIMIT_CONTEXT)
- .build()
- )
- .setSignature(
- RowSignature.builder()
- .add("d0", ColumnType.LONG)
- .add("a0", ColumnType.LONG)
- .add("w0", ColumnType.LONG)
- .build()
- )
- .setOperators(
- OperatorFactoryBuilders.naivePartitionOperator(),
- OperatorFactoryBuilders.windowOperators(
- OperatorFactoryBuilders.framedAggregateProcessor(
-
WindowFrame.forOrderBy(ColumnWithDirection.ascending("d0")),
- new LongSumAggregatorFactory("w0", "a0")
- )
- )
- )
- .build()
+ .setDataSource(
+ Druids.newTimeseriesQueryBuilder()
+
.dataSource(CalciteTests.DATASOURCE1)
+
.intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.DAY)
+ .aggregators(
+ new
LongSumAggregatorFactory("a0", "cnt")
+ )
+ .context(OUTER_LIMIT_CONTEXT)
+ .build()
+ )
+ .setSignature(
+ RowSignature.builder()
+ .add("d0",
ColumnType.LONG)
+ .add("a0",
ColumnType.LONG)
+ .add("w0",
ColumnType.LONG)
+ .build()
+ )
+ .setOperators(
+
OperatorFactoryBuilders.naivePartitionOperator(),
+
OperatorFactoryBuilders.windowOperators(
+
OperatorFactoryBuilders.framedAggregateProcessor(
+
WindowFrame.forOrderBy(ColumnWithDirection.ascending("d0")),
+ new
LongSumAggregatorFactory("w0", "a0")
+ )
+ )
+ )
+ .build()
)
.expectedResults(
ImmutableList.of(
@@ -15264,4 +15264,34 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
)
.run();
}
+
+ @Test
+ public void testLatestByAggregatorOnSecondaryTimestampGroupBy()
+ {
+ msqIncompatible();
+ testQuery(
+ "SELECT __time, m1, LATEST_BY(m1, MILLIS_TO_TIMESTAMP(CAST(m2 AS
NUMERIC))) from druid.numfoo GROUP BY 1,2",
+ ImmutableList.of(
+ new GroupByQuery.Builder()
+ .setDataSource(CalciteTests.DATASOURCE3)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(
+ new DefaultDimensionSpec("__time", "_d0", ColumnType.LONG),
+ new DefaultDimensionSpec("m1", "_d1", ColumnType.FLOAT)
+ )
+ .setAggregatorSpecs(aggregators(new
FloatLastAggregatorFactory("a0", "m1", "m2")))
+ .setContext(OUTER_LIMIT_CONTEXT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{946684800000L, 1.0F, 1.0F},
+ new Object[]{946771200000L, 2.0F, 2.0F},
+ new Object[]{946857600000L, 3.0F, 3.0F},
+ new Object[]{978307200000L, 4.0F, 4.0F},
+ new Object[]{978393600000L, 5.0F, 5.0F},
+ new Object[]{978480000000L, 6.0F, 6.0F}
+ )
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]