This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 355c2f5da00 Add sql + ingestion compatibility for first/last on
numeric values (#15607)
355c2f5da00 is described below
commit 355c2f5da003e99439774dbc63ed9ef2016b0d8d
Author: Ankit Kothari <[email protected]>
AuthorDate: Tue Jan 9 23:29:38 2024 -0800
Add sql + ingestion compatibility for first/last on numeric values (#15607)
SQL compatibility for numeric last and first column types.
Ingestion UI now provides option for first and last aggregation as well.
---
.../druid/msq/test/CalciteMSQTestsHelper.java | 3 +
.../builtin/EarliestLatestAnySqlAggregator.java | 23 +++++++
.../druid/sql/avatica/DruidAvaticaHandlerTest.java | 16 ++++-
.../apache/druid/sql/calcite/CalciteQueryTest.java | 75 +++++++++++++++++++++-
.../druid/sql/calcite/CalciteSimpleQueryTest.java | 2 +-
.../druid/sql/calcite/util/CalciteTests.java | 2 +
.../druid/sql/calcite/util/TestDataBuilder.java | 67 ++++++++++++++++++-
.../src/druid-models/metric-spec/metric-spec.tsx | 26 ++++++--
8 files changed, 204 insertions(+), 10 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
index 5b49c649cc0..f6297b28c64 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
@@ -311,6 +311,9 @@ public class CalciteMSQTestsHelper
.inputTmpDir(temporaryFolder.newFolder())
.buildMMappedIndex();
break;
+ case CalciteTests.WIKIPEDIA_FIRST_LAST:
+ index =
TestDataBuilder.makeWikipediaIndexWithAggregation(temporaryFolder.newFolder());
+ break;
default:
throw new ISE("Cannot query segment %s in test runner", segmentId);
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
index 21bcc833e04..66bbdf8a49b 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
@@ -44,6 +44,9 @@ import org.apache.calcite.util.Optionality;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import
org.apache.druid.query.aggregation.SerializablePairLongDoubleComplexMetricSerde;
+import
org.apache.druid.query.aggregation.SerializablePairLongFloatComplexMetricSerde;
+import
org.apache.druid.query.aggregation.SerializablePairLongLongComplexMetricSerde;
import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.LongAnyAggregatorFactory;
@@ -68,6 +71,7 @@ import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+import org.apache.druid.sql.calcite.table.RowSignatures;
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -316,6 +320,25 @@ public class EarliestLatestAnySqlAggregator implements
SqlAggregator
public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding)
{
RelDataType type = sqlOperatorBinding.getOperandType(this.ordinal);
+
+ // If complex and of type SerializablePairLong*, return scalar type
+ if (type instanceof RowSignatures.ComplexSqlType) {
+ ColumnType complexColumnType = ((RowSignatures.ComplexSqlType)
type).getColumnType();
+ String complexTypeName = complexColumnType.getComplexTypeName();
+ if (complexTypeName != null) {
+ switch (complexTypeName) {
+ case SerializablePairLongLongComplexMetricSerde.TYPE_NAME:
+ return
sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.BIGINT);
+ case SerializablePairLongFloatComplexMetricSerde.TYPE_NAME:
+ return
sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.FLOAT);
+ case SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME:
+ return
sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.DOUBLE);
+ default:
+ return
sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.VARCHAR);
+ }
+ }
+ }
+
// For non-number and non-string type, which is COMPLEX type, we set the
return type to VARCHAR.
if (!SqlTypeUtil.isNumeric(type) &&
!SqlTypeUtil.isString(type)) {
diff --git
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index 90e75e10f9c..193ba0c7f1b 100644
---
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -581,7 +581,13 @@ public class DruidAvaticaHandlerTest extends
CalciteTestBase
),
row(
Pair.of("TABLE_CAT", "druid"),
- Pair.of("TABLE_NAME", "wikipedia"),
+ Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA),
+ Pair.of("TABLE_SCHEM", "druid"),
+ Pair.of("TABLE_TYPE", "TABLE")
+ ),
+ row(
+ Pair.of("TABLE_CAT", "druid"),
+ Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA_FIRST_LAST),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
)
@@ -661,7 +667,13 @@ public class DruidAvaticaHandlerTest extends
CalciteTestBase
),
row(
Pair.of("TABLE_CAT", "druid"),
- Pair.of("TABLE_NAME", "wikipedia"),
+ Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA),
+ Pair.of("TABLE_SCHEM", "druid"),
+ Pair.of("TABLE_TYPE", "TABLE")
+ ),
+ row(
+ Pair.of("TABLE_CAT", "druid"),
+ Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA_FIRST_LAST),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
)
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 6e918c65e5c..b0e607b61a0 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -179,7 +179,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE,
"TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE,
"TABLE", "NO", "NO"})
.add(new Object[]{"druid",
CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"})
- .add(new Object[]{"druid", "wikipedia", "TABLE", "NO",
"NO"})
+ .add(new Object[]{"druid", CalciteTests.WIKIPEDIA,
"TABLE", "NO", "NO"})
+ .add(new Object[]{"druid",
CalciteTests.WIKIPEDIA_FIRST_LAST, "TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS",
"SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "ROUTINES",
"SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA",
"SYSTEM_TABLE", "NO", "NO"})
@@ -217,7 +218,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE,
"TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE,
"TABLE", "NO", "NO"})
.add(new Object[]{"druid",
CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"})
- .add(new Object[]{"druid", "wikipedia", "TABLE", "NO",
"NO"})
+ .add(new Object[]{"druid", CalciteTests.WIKIPEDIA,
"TABLE", "NO", "NO"})
+ .add(new Object[]{"druid",
CalciteTests.WIKIPEDIA_FIRST_LAST, "TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS",
"SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "ROUTINES",
"SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA",
"SYSTEM_TABLE", "NO", "NO"})
@@ -1075,6 +1077,75 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
);
}
+ @Test
+ public void testNumericLatestEarliestGroupBy()
+ {
+ testQuery(
+ "SELECT isNew, LATEST(long_last_added), EARLIEST(long_first_added),
LATEST(float_last_added), EARLIEST(float_first_added),
LATEST(double_last_added), EARLIEST(double_first_added) FROM
wikipedia_first_last GROUP BY isNew",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(CalciteTests.WIKIPEDIA_FIRST_LAST)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(new
DefaultDimensionSpec("isNew", "d0")))
+ .setAggregatorSpecs(aggregators(
+ new
LongLastAggregatorFactory("a0", "long_last_added", null),
+ new
LongFirstAggregatorFactory("a1", "long_first_added", null),
+ new
FloatLastAggregatorFactory("a2", "float_last_added", null),
+ new
FloatFirstAggregatorFactory("a3", "float_first_added", null),
+ new
DoubleLastAggregatorFactory("a4", "double_last_added", null),
+ new
DoubleFirstAggregatorFactory("a5", "double_first_added", null)
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{"false", 182L, 36L, 182.0F, 36.0F, 182.0D, 36.0D},
+ new Object[]{"true", 113L, 345L, 113.0F, 345.0F, 113.0D, 345.0D}
+ )
+ );
+ }
+
+ @Test
+ public void testNumericLatestEarliestWithOpratorsGroupBy()
+ {
+ testQuery(
+ "SELECT isNew, LATEST(long_last_added)+4,
EARLIEST(long_first_added)-4, LATEST(float_last_added)*2,
EARLIEST(float_first_added)/2f, LATEST(double_last_added)+2.5,
EARLIEST(double_first_added)-2.5 FROM wikipedia_first_last GROUP BY isNew",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(CalciteTests.WIKIPEDIA_FIRST_LAST)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(new
DefaultDimensionSpec("isNew", "d0")))
+ .setAggregatorSpecs(aggregators(
+ new
LongLastAggregatorFactory("a0", "long_last_added", null),
+ new
LongFirstAggregatorFactory("a1", "long_first_added", null),
+ new
FloatLastAggregatorFactory("a2", "float_last_added", null),
+ new
FloatFirstAggregatorFactory("a3", "float_first_added", null),
+ new
DoubleLastAggregatorFactory("a4", "double_last_added", null),
+ new
DoubleFirstAggregatorFactory("a5", "double_first_added", null)
+ )
+ )
+ .setPostAggregatorSpecs(
+ expressionPostAgg("p0", "(\"a0\" + 4)",
ColumnType.LONG),
+ expressionPostAgg("p1", "(\"a1\" - 4)",
ColumnType.LONG),
+ expressionPostAgg("p2", "(\"a2\" * 2)",
ColumnType.FLOAT),
+ expressionPostAgg("p3", "(\"a3\" / 2)",
ColumnType.FLOAT),
+ expressionPostAgg("p4", "(\"a4\" + 2.5)",
ColumnType.DOUBLE),
+ expressionPostAgg("p5", "(\"a5\" - 2.5)",
ColumnType.DOUBLE)
+ )
+
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{"false", 186L, 32L, 364.0F, 18.0F, 184.5D, 33.5D},
+ new Object[]{"true", 117L, 341L, 226.0F, 172.5F, 115.5D, 342.5D}
+ )
+ );
+ }
+
@Test
public void testStringLatestGroupByWithAlwaysFalseCondition()
{
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSimpleQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSimpleQueryTest.java
index 1eb6d58bcb8..5dee57dac32 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSimpleQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSimpleQueryTest.java
@@ -647,7 +647,7 @@ public class CalciteSimpleQueryTest extends
BaseCalciteQueryTest
.expectedQueries(
ImmutableList.of(
GroupByQuery.builder()
- .setDataSource("wikipedia")
+ .setDataSource(CalciteTests.WIKIPEDIA)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setVirtualColumns(
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index af723c3b27b..a9ca48e90a9 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -117,6 +117,8 @@ public class CalciteTests
public static final String SOMEXDATASOURCE = "somexdatasource";
public static final String USERVISITDATASOURCE = "visits";
public static final String DRUID_SCHEMA_NAME = "druid";
+ public static final String WIKIPEDIA = "wikipedia";
+ public static final String WIKIPEDIA_FIRST_LAST = "wikipedia_first_last";
public static final String TEST_SUPERUSER_NAME = "testSuperuser";
public static final AuthorizerMapper TEST_AUTHORIZER_MAPPER = new
AuthorizerMapper(null)
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
index 2d3d681220f..53480e2532d 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
@@ -47,7 +47,12 @@ import
org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
+import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
import
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
+import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
+import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
@@ -639,6 +644,57 @@ public class TestDataBuilder
.buildMMappedIndex();
}
+ public static QueryableIndex makeWikipediaIndexWithAggregation(File tmpDir)
+ {
+ final List<DimensionSchema> dimensions = Arrays.asList(
+ new StringDimensionSchema("channel"),
+ new StringDimensionSchema("cityName"),
+ new StringDimensionSchema("comment"),
+ new StringDimensionSchema("countryIsoCode"),
+ new StringDimensionSchema("countryName"),
+ new StringDimensionSchema("isAnonymous"),
+ new StringDimensionSchema("isMinor"),
+ new StringDimensionSchema("isNew"),
+ new StringDimensionSchema("isRobot"),
+ new StringDimensionSchema("isUnpatrolled"),
+ new StringDimensionSchema("metroCode"),
+ new StringDimensionSchema("namespace"),
+ new StringDimensionSchema("page"),
+ new StringDimensionSchema("regionIsoCode"),
+ new StringDimensionSchema("regionName"),
+ new StringDimensionSchema("user")
+ );
+
+ return IndexBuilder
+ .create()
+ .tmpDir(new File(tmpDir, "wikipedia1"))
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(new IncrementalIndexSchema.Builder()
+ .withRollup(true)
+ .withTimestampSpec(new TimestampSpec("time", null, null))
+ .withDimensionsSpec(new DimensionsSpec(dimensions))
+ .withMetrics(
+ new LongLastAggregatorFactory("long_last_added",
"added", "__time"),
+ new LongFirstAggregatorFactory("long_first_added",
"added", "__time"),
+ new FloatLastAggregatorFactory("float_last_added",
"added", "__time"),
+ new FloatLastAggregatorFactory("float_first_added",
"added", "__time"),
+ new DoubleLastAggregatorFactory("double_last_added",
"added", "__time"),
+ new DoubleFirstAggregatorFactory("double_first_added",
"added", "__time")
+
+ )
+ .build()
+ )
+ .inputSource(
+ ResourceInputSource.of(
+ TestDataBuilder.class.getClassLoader(),
+ "calcite/tests/wikiticker-2015-09-12-sampled.json.gz"
+ )
+ )
+ .inputFormat(DEFAULT_JSON_INPUT_FORMAT)
+ .inputTmpDir(new File(tmpDir, "tmpWikipedia1"))
+ .buildMMappedIndex();
+ }
+
public static SpecificSegmentsQuerySegmentWalker createMockWalker(
final Injector injector,
final QueryRunnerFactoryConglomerate conglomerate,
@@ -873,13 +929,22 @@ public class TestDataBuilder
userVisitIndex
).add(
DataSegment.builder()
- .dataSource("wikipedia")
+ .dataSource(CalciteTests.WIKIPEDIA)
.interval(Intervals.of("2015-09-12/2015-09-13"))
.version("1")
.shardSpec(new NumberedShardSpec(0, 0))
.size(0)
.build(),
makeWikipediaIndex(tmpDir)
+ ).add(
+ DataSegment.builder()
+ .dataSource(CalciteTests.WIKIPEDIA_FIRST_LAST)
+ .interval(Intervals.of("2015-09-12/2015-09-13"))
+ .version("1")
+ .shardSpec(new NumberedShardSpec(0, 0))
+ .size(0)
+ .build(),
+ makeWikipediaIndexWithAggregation(tmpDir)
);
}
diff --git a/web-console/src/druid-models/metric-spec/metric-spec.tsx
b/web-console/src/druid-models/metric-spec/metric-spec.tsx
index ee689880d52..99cf11aaed4 100644
--- a/web-console/src/druid-models/metric-spec/metric-spec.tsx
+++ b/web-console/src/druid-models/metric-spec/metric-spec.tsx
@@ -59,6 +59,12 @@ const KNOWN_TYPES = [
'longMax',
'doubleMax',
'floatMax',
+ 'longFirst',
+ 'longLast',
+ 'doubleFirst',
+ 'doubleLast',
+ 'floatFirst',
+ 'floatLast',
'stringFirst',
'stringLast',
'thetaSketch',
@@ -97,10 +103,14 @@ export const METRIC_SPEC_FIELDS: Field<MetricSpec>[] = [
group: 'max',
suggestions: ['longMax', 'doubleMax', 'floatMax'],
},
- // Do not show first and last aggregators as they can not be used in
ingestion specs and this definition is only used in the data loader.
- // Ref:
https://druid.apache.org/docs/latest/querying/aggregations.html#first--last-aggregator
- // Should the first / last aggregators become usable at ingestion time,
reverse the changes made in:
- // https://github.com/apache/druid/pull/10794
+ {
+ group: 'first',
+ suggestions: ['longFirst', 'doubleFirst', 'floatFirst', 'stringFirst'],
+ },
+ {
+ group: 'last',
+ suggestions: ['longLast', 'doubleLast', 'floatLast', 'stringLast'],
+ },
'thetaSketch',
'arrayOfDoublesSketch',
{
@@ -129,6 +139,14 @@ export const METRIC_SPEC_FIELDS: Field<MetricSpec>[] = [
'longMax',
'doubleMax',
'floatMax',
+ 'longFirst',
+ 'longLast',
+ 'doubleFirst',
+ 'doubleLast',
+ 'floatFirst',
+ 'floatLast',
+ 'stringFirst',
+ 'stringLast',
'thetaSketch',
'arrayOfDoublesSketch',
'HLLSketchBuild',
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]