This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 355c2f5da00 Add sql + ingestion compatibility for first/last on 
numeric values (#15607)
355c2f5da00 is described below

commit 355c2f5da003e99439774dbc63ed9ef2016b0d8d
Author: Ankit Kothari <[email protected]>
AuthorDate: Tue Jan 9 23:29:38 2024 -0800

    Add sql + ingestion compatibility for first/last on numeric values (#15607)
    
    SQL compatibility for numeric last and first column types.
    Ingestion UI now provides option for first and last aggregation as well.
---
 .../druid/msq/test/CalciteMSQTestsHelper.java      |  3 +
 .../builtin/EarliestLatestAnySqlAggregator.java    | 23 +++++++
 .../druid/sql/avatica/DruidAvaticaHandlerTest.java | 16 ++++-
 .../apache/druid/sql/calcite/CalciteQueryTest.java | 75 +++++++++++++++++++++-
 .../druid/sql/calcite/CalciteSimpleQueryTest.java  |  2 +-
 .../druid/sql/calcite/util/CalciteTests.java       |  2 +
 .../druid/sql/calcite/util/TestDataBuilder.java    | 67 ++++++++++++++++++-
 .../src/druid-models/metric-spec/metric-spec.tsx   | 26 ++++++--
 8 files changed, 204 insertions(+), 10 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
index 5b49c649cc0..f6297b28c64 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
@@ -311,6 +311,9 @@ public class CalciteMSQTestsHelper
                               .inputTmpDir(temporaryFolder.newFolder())
                               .buildMMappedIndex();
           break;
+        case CalciteTests.WIKIPEDIA_FIRST_LAST:
+          index = 
TestDataBuilder.makeWikipediaIndexWithAggregation(temporaryFolder.newFolder());
+          break;
         default:
           throw new ISE("Cannot query segment %s in test runner", segmentId);
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
index 21bcc833e04..66bbdf8a49b 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
@@ -44,6 +44,9 @@ import org.apache.calcite.util.Optionality;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.error.InvalidSqlInput;
 import org.apache.druid.query.aggregation.AggregatorFactory;
+import 
org.apache.druid.query.aggregation.SerializablePairLongDoubleComplexMetricSerde;
+import 
org.apache.druid.query.aggregation.SerializablePairLongFloatComplexMetricSerde;
+import 
org.apache.druid.query.aggregation.SerializablePairLongLongComplexMetricSerde;
 import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory;
 import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory;
 import org.apache.druid.query.aggregation.any.LongAnyAggregatorFactory;
@@ -68,6 +71,7 @@ import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.apache.druid.sql.calcite.rel.InputAccessor;
 import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+import org.apache.druid.sql.calcite.table.RowSignatures;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
@@ -316,6 +320,25 @@ public class EarliestLatestAnySqlAggregator implements 
SqlAggregator
     public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding)
     {
       RelDataType type = sqlOperatorBinding.getOperandType(this.ordinal);
+
+      // If complex and of type SerializablePairLong*, return scalar type
+      if (type instanceof RowSignatures.ComplexSqlType) {
+        ColumnType complexColumnType = ((RowSignatures.ComplexSqlType) 
type).getColumnType();
+        String complexTypeName = complexColumnType.getComplexTypeName();
+        if (complexTypeName != null) {
+          switch (complexTypeName) {
+            case SerializablePairLongLongComplexMetricSerde.TYPE_NAME:
+              return 
sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.BIGINT);
+            case SerializablePairLongFloatComplexMetricSerde.TYPE_NAME:
+              return 
sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.FLOAT);
+            case SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME:
+              return 
sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.DOUBLE);
+            default:
+              return 
sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.VARCHAR);
+          }
+        }
+      }
+
       // For non-number and non-string type, which is COMPLEX type, we set the 
return type to VARCHAR.
       if (!SqlTypeUtil.isNumeric(type) &&
           !SqlTypeUtil.isString(type)) {
diff --git 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index 90e75e10f9c..193ba0c7f1b 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -581,7 +581,13 @@ public class DruidAvaticaHandlerTest extends 
CalciteTestBase
             ),
             row(
                 Pair.of("TABLE_CAT", "druid"),
-                Pair.of("TABLE_NAME", "wikipedia"),
+                Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA),
+                Pair.of("TABLE_SCHEM", "druid"),
+                Pair.of("TABLE_TYPE", "TABLE")
+            ),
+            row(
+                Pair.of("TABLE_CAT", "druid"),
+                Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA_FIRST_LAST),
                 Pair.of("TABLE_SCHEM", "druid"),
                 Pair.of("TABLE_TYPE", "TABLE")
             )
@@ -661,7 +667,13 @@ public class DruidAvaticaHandlerTest extends 
CalciteTestBase
             ),
             row(
                 Pair.of("TABLE_CAT", "druid"),
-                Pair.of("TABLE_NAME", "wikipedia"),
+                Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA),
+                Pair.of("TABLE_SCHEM", "druid"),
+                Pair.of("TABLE_TYPE", "TABLE")
+            ),
+            row(
+                Pair.of("TABLE_CAT", "druid"),
+                Pair.of("TABLE_NAME", CalciteTests.WIKIPEDIA_FIRST_LAST),
                 Pair.of("TABLE_SCHEM", "druid"),
                 Pair.of("TABLE_TYPE", "TABLE")
             )
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 6e918c65e5c..b0e607b61a0 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -179,7 +179,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                      .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, 
"TABLE", "NO", "NO"})
                      .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, 
"TABLE", "NO", "NO"})
                      .add(new Object[]{"druid", 
CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"})
-                     .add(new Object[]{"druid", "wikipedia", "TABLE", "NO", 
"NO"})
+                     .add(new Object[]{"druid", CalciteTests.WIKIPEDIA, 
"TABLE", "NO", "NO"})
+                     .add(new Object[]{"druid", 
CalciteTests.WIKIPEDIA_FIRST_LAST, "TABLE", "NO", "NO"})
                      .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", 
"SYSTEM_TABLE", "NO", "NO"})
                      .add(new Object[]{"INFORMATION_SCHEMA", "ROUTINES", 
"SYSTEM_TABLE", "NO", "NO"})
                      .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", 
"SYSTEM_TABLE", "NO", "NO"})
@@ -217,7 +218,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                      .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, 
"TABLE", "NO", "NO"})
                      .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, 
"TABLE", "NO", "NO"})
                      .add(new Object[]{"druid", 
CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"})
-                     .add(new Object[]{"druid", "wikipedia", "TABLE", "NO", 
"NO"})
+                     .add(new Object[]{"druid", CalciteTests.WIKIPEDIA, 
"TABLE", "NO", "NO"})
+                     .add(new Object[]{"druid", 
CalciteTests.WIKIPEDIA_FIRST_LAST, "TABLE", "NO", "NO"})
                      .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", 
"SYSTEM_TABLE", "NO", "NO"})
                      .add(new Object[]{"INFORMATION_SCHEMA", "ROUTINES", 
"SYSTEM_TABLE", "NO", "NO"})
                      .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", 
"SYSTEM_TABLE", "NO", "NO"})
@@ -1075,6 +1077,75 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
+  @Test
+  public void testNumericLatestEarliestGroupBy()
+  {
+    testQuery(
+        "SELECT isNew, LATEST(long_last_added), EARLIEST(long_first_added), 
LATEST(float_last_added), EARLIEST(float_first_added), 
LATEST(double_last_added), EARLIEST(double_first_added) FROM 
wikipedia_first_last GROUP BY isNew",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(CalciteTests.WIKIPEDIA_FIRST_LAST)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setDimensions(dimensions(new 
DefaultDimensionSpec("isNew", "d0")))
+                        .setAggregatorSpecs(aggregators(
+                                                new 
LongLastAggregatorFactory("a0", "long_last_added", null),
+                                                new 
LongFirstAggregatorFactory("a1", "long_first_added", null),
+                                                new 
FloatLastAggregatorFactory("a2", "float_last_added", null),
+                                                new 
FloatFirstAggregatorFactory("a3", "float_first_added", null),
+                                                new 
DoubleLastAggregatorFactory("a4", "double_last_added", null),
+                                                new 
DoubleFirstAggregatorFactory("a5", "double_first_added", null)
+                                            )
+                        )
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"false", 182L, 36L, 182.0F, 36.0F, 182.0D, 36.0D},
+            new Object[]{"true", 113L, 345L, 113.0F, 345.0F, 113.0D, 345.0D}
+        )
+    );
+  }
+
+  @Test
+  public void testNumericLatestEarliestWithOpratorsGroupBy()
+  {
+    testQuery(
+        "SELECT isNew, LATEST(long_last_added)+4, 
EARLIEST(long_first_added)-4, LATEST(float_last_added)*2, 
EARLIEST(float_first_added)/2f, LATEST(double_last_added)+2.5, 
EARLIEST(double_first_added)-2.5 FROM wikipedia_first_last GROUP BY isNew",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(CalciteTests.WIKIPEDIA_FIRST_LAST)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setDimensions(dimensions(new 
DefaultDimensionSpec("isNew", "d0")))
+                        .setAggregatorSpecs(aggregators(
+                                                new 
LongLastAggregatorFactory("a0", "long_last_added", null),
+                                                new 
LongFirstAggregatorFactory("a1", "long_first_added", null),
+                                                new 
FloatLastAggregatorFactory("a2", "float_last_added", null),
+                                                new 
FloatFirstAggregatorFactory("a3", "float_first_added", null),
+                                                new 
DoubleLastAggregatorFactory("a4", "double_last_added", null),
+                                                new 
DoubleFirstAggregatorFactory("a5", "double_first_added", null)
+                                            )
+                        )
+                        .setPostAggregatorSpecs(
+                            expressionPostAgg("p0", "(\"a0\" + 4)", 
ColumnType.LONG),
+                            expressionPostAgg("p1", "(\"a1\" - 4)", 
ColumnType.LONG),
+                            expressionPostAgg("p2", "(\"a2\" * 2)", 
ColumnType.FLOAT),
+                            expressionPostAgg("p3", "(\"a3\" / 2)", 
ColumnType.FLOAT),
+                            expressionPostAgg("p4", "(\"a4\" + 2.5)", 
ColumnType.DOUBLE),
+                            expressionPostAgg("p5", "(\"a5\" - 2.5)", 
ColumnType.DOUBLE)
+                        )
+
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"false", 186L, 32L, 364.0F, 18.0F, 184.5D, 33.5D},
+            new Object[]{"true", 117L, 341L, 226.0F, 172.5F, 115.5D, 342.5D}
+        )
+    );
+  }
+
   @Test
   public void testStringLatestGroupByWithAlwaysFalseCondition()
   {
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSimpleQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSimpleQueryTest.java
index 1eb6d58bcb8..5dee57dac32 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSimpleQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSimpleQueryTest.java
@@ -647,7 +647,7 @@ public class CalciteSimpleQueryTest extends 
BaseCalciteQueryTest
         .expectedQueries(
             ImmutableList.of(
                 GroupByQuery.builder()
-                    .setDataSource("wikipedia")
+                    .setDataSource(CalciteTests.WIKIPEDIA)
                     .setInterval(querySegmentSpec(Filtration.eternity()))
                     .setGranularity(Granularities.ALL)
                     .setVirtualColumns(
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index af723c3b27b..a9ca48e90a9 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -117,6 +117,8 @@ public class CalciteTests
   public static final String SOMEXDATASOURCE = "somexdatasource";
   public static final String USERVISITDATASOURCE = "visits";
   public static final String DRUID_SCHEMA_NAME = "druid";
+  public static final String WIKIPEDIA = "wikipedia";
+  public static final String WIKIPEDIA_FIRST_LAST = "wikipedia_first_last";
 
   public static final String TEST_SUPERUSER_NAME = "testSuperuser";
   public static final AuthorizerMapper TEST_AUTHORIZER_MAPPER = new 
AuthorizerMapper(null)
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
index 2d3d681220f..53480e2532d 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
@@ -47,7 +47,12 @@ import 
org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
+import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
 import 
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
+import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
+import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
 import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.QueryableIndex;
@@ -639,6 +644,57 @@ public class TestDataBuilder
         .buildMMappedIndex();
   }
 
+  public static QueryableIndex makeWikipediaIndexWithAggregation(File tmpDir)
+  {
+    final List<DimensionSchema> dimensions = Arrays.asList(
+        new StringDimensionSchema("channel"),
+        new StringDimensionSchema("cityName"),
+        new StringDimensionSchema("comment"),
+        new StringDimensionSchema("countryIsoCode"),
+        new StringDimensionSchema("countryName"),
+        new StringDimensionSchema("isAnonymous"),
+        new StringDimensionSchema("isMinor"),
+        new StringDimensionSchema("isNew"),
+        new StringDimensionSchema("isRobot"),
+        new StringDimensionSchema("isUnpatrolled"),
+        new StringDimensionSchema("metroCode"),
+        new StringDimensionSchema("namespace"),
+        new StringDimensionSchema("page"),
+        new StringDimensionSchema("regionIsoCode"),
+        new StringDimensionSchema("regionName"),
+        new StringDimensionSchema("user")
+    );
+
+    return IndexBuilder
+        .create()
+        .tmpDir(new File(tmpDir, "wikipedia1"))
+        
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+        .schema(new IncrementalIndexSchema.Builder()
+                    .withRollup(true)
+                    .withTimestampSpec(new TimestampSpec("time", null, null))
+                    .withDimensionsSpec(new DimensionsSpec(dimensions))
+                    .withMetrics(
+                        new LongLastAggregatorFactory("long_last_added", 
"added", "__time"),
+                        new LongFirstAggregatorFactory("long_first_added", 
"added", "__time"),
+                        new FloatLastAggregatorFactory("float_last_added", 
"added", "__time"),
+                        new FloatLastAggregatorFactory("float_first_added", 
"added", "__time"),
+                        new DoubleLastAggregatorFactory("double_last_added", 
"added", "__time"),
+                        new DoubleFirstAggregatorFactory("double_first_added", 
"added", "__time")
+
+                    )
+                    .build()
+        )
+        .inputSource(
+            ResourceInputSource.of(
+                TestDataBuilder.class.getClassLoader(),
+                "calcite/tests/wikiticker-2015-09-12-sampled.json.gz"
+            )
+        )
+        .inputFormat(DEFAULT_JSON_INPUT_FORMAT)
+        .inputTmpDir(new File(tmpDir, "tmpWikipedia1"))
+        .buildMMappedIndex();
+  }
+
   public static SpecificSegmentsQuerySegmentWalker createMockWalker(
       final Injector injector,
       final QueryRunnerFactoryConglomerate conglomerate,
@@ -873,13 +929,22 @@ public class TestDataBuilder
         userVisitIndex
     ).add(
         DataSegment.builder()
-                   .dataSource("wikipedia")
+                   .dataSource(CalciteTests.WIKIPEDIA)
                    .interval(Intervals.of("2015-09-12/2015-09-13"))
                    .version("1")
                    .shardSpec(new NumberedShardSpec(0, 0))
                    .size(0)
                    .build(),
         makeWikipediaIndex(tmpDir)
+    ).add(
+      DataSegment.builder()
+                 .dataSource(CalciteTests.WIKIPEDIA_FIRST_LAST)
+                 .interval(Intervals.of("2015-09-12/2015-09-13"))
+                 .version("1")
+                 .shardSpec(new NumberedShardSpec(0, 0))
+                 .size(0)
+                 .build(),
+      makeWikipediaIndexWithAggregation(tmpDir)
     );
   }
 
diff --git a/web-console/src/druid-models/metric-spec/metric-spec.tsx 
b/web-console/src/druid-models/metric-spec/metric-spec.tsx
index ee689880d52..99cf11aaed4 100644
--- a/web-console/src/druid-models/metric-spec/metric-spec.tsx
+++ b/web-console/src/druid-models/metric-spec/metric-spec.tsx
@@ -59,6 +59,12 @@ const KNOWN_TYPES = [
   'longMax',
   'doubleMax',
   'floatMax',
+  'longFirst',
+  'longLast',
+  'doubleFirst',
+  'doubleLast',
+  'floatFirst',
+  'floatLast',
   'stringFirst',
   'stringLast',
   'thetaSketch',
@@ -97,10 +103,14 @@ export const METRIC_SPEC_FIELDS: Field<MetricSpec>[] = [
         group: 'max',
         suggestions: ['longMax', 'doubleMax', 'floatMax'],
       },
-      // Do not show first and last aggregators as they can not be used in 
ingestion specs and this definition is only used in the data loader.
-      // Ref: 
https://druid.apache.org/docs/latest/querying/aggregations.html#first--last-aggregator
-      // Should the first / last aggregators become usable at ingestion time, 
reverse the changes made in:
-      // https://github.com/apache/druid/pull/10794
+      {
+        group: 'first',
+        suggestions: ['longFirst', 'doubleFirst', 'floatFirst', 'stringFirst'],
+      },
+      {
+        group: 'last',
+        suggestions: ['longLast', 'doubleLast', 'floatLast', 'stringLast'],
+      },
       'thetaSketch',
       'arrayOfDoublesSketch',
       {
@@ -129,6 +139,14 @@ export const METRIC_SPEC_FIELDS: Field<MetricSpec>[] = [
       'longMax',
       'doubleMax',
       'floatMax',
+      'longFirst',
+      'longLast',
+      'doubleFirst',
+      'doubleLast',
+      'floatFirst',
+      'floatLast',
+      'stringFirst',
+      'stringLast',
       'thetaSketch',
       'arrayOfDoublesSketch',
       'HLLSketchBuild',


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to