This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.17.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.17.0 by this push:
     new e92e109  Fix LATEST / EARLIEST Buffer Aggregator does not work on 
String column  (#9197) (#9210)
e92e109 is described below

commit e92e1094d0db78a41e9d27402664e0d85b6aa180
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Fri Jan 17 18:42:22 2020 -0800

    Fix LATEST / EARLIEST Buffer Aggregator does not work on String column  
(#9197) (#9210)
    
    * fix buff limit bug
    
    * add tests
    
    * add test
    
    * add tests
    
    * fix checkstyle
---
 .../aggregation/first/StringFirstLastUtils.java    |   2 +-
 .../first/StringFirstLastUtilsTest.java            |  59 +++++++++
 .../apache/druid/sql/calcite/CalciteQueryTest.java | 146 ++++++++++++++++++++-
 3 files changed, 201 insertions(+), 6 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
index 133c4ba..630f70c 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
@@ -87,7 +87,7 @@ public class StringFirstLastUtils
 
     if (pair.rhs != null) {
       mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
-      mutationBuffer.limit(maxStringBytes);
+      mutationBuffer.limit(position + Long.BYTES + Integer.BYTES + 
maxStringBytes);
       final int len = StringUtils.toUtf8WithLimit(pair.rhs, mutationBuffer);
       mutationBuffer.putInt(position + Long.BYTES, len);
     } else {
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java
new file mode 100644
index 0000000..b4e4088
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.query.aggregation.SerializablePairLongString;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class StringFirstLastUtilsTest
+{
+  private static final SerializablePairLongString PAIR_TO_WRITE = new 
SerializablePairLongString(
+      DateTimes.MAX.getMillis(),
+      "asdasddsaasd"
+  );
+
+  private static final int BUFFER_CAPACITY = 100;
+  // PAIR_TO_WRITE Size is 12 so MAX_BYTE_TO_WRITE is set to 15 which is more 
than enough
+  private static final int MAX_BYTE_TO_WRITE = 15;
+
+  @Test
+  public void testWritePairThenReadPairAtBeginningBuffer()
+  {
+    int positionAtBeginning = 0;
+    ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY);
+    StringFirstLastUtils.writePair(buf, positionAtBeginning, PAIR_TO_WRITE, 
MAX_BYTE_TO_WRITE);
+    SerializablePairLongString actual = StringFirstLastUtils.readPair(buf, 
positionAtBeginning);
+    Assert.assertEquals(PAIR_TO_WRITE, actual);
+  }
+
+  @Test
+  public void testWritePairThenReadPairAtMiddleBuffer()
+  {
+    int positionAtMiddle = 60;
+    ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY);
+    StringFirstLastUtils.writePair(buf, positionAtMiddle, PAIR_TO_WRITE, 
MAX_BYTE_TO_WRITE);
+    SerializablePairLongString actual = StringFirstLastUtils.readPair(buf, 
positionAtMiddle);
+    Assert.assertEquals(PAIR_TO_WRITE, actual);
+  }
+}
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 895b452..8fbb1ad 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -44,11 +44,13 @@ import 
org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
 import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import 
org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
+import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
 import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
 import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
 import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
 import 
org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
 import 
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
 import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
 import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
 import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
@@ -1298,13 +1300,13 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
   }
 
   @Test
-  public void testLatestInSubquery() throws Exception
+  public void testPrimitiveLatestInSubquery() throws Exception
   {
     // Cannot vectorize LATEST aggregator.
     skipVectorize();
 
     testQuery(
-        "SELECT SUM(val) FROM (SELECT dim2, LATEST(m1) AS val FROM foo GROUP 
BY dim2)",
+        "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, LATEST(m1) 
AS val1, LATEST(cnt) AS val2, LATEST(m2) AS val3 FROM foo GROUP BY dim2)",
         ImmutableList.of(
             GroupByQuery.builder()
                         .setDataSource(
@@ -1313,7 +1315,141 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
                                         
.setInterval(querySegmentSpec(Filtration.eternity()))
                                         .setGranularity(Granularities.ALL)
                                         .setDimensions(dimensions(new 
DefaultDimensionSpec("dim2", "d0")))
-                                        .setAggregatorSpecs(aggregators(new 
FloatLastAggregatorFactory("a0:a", "m1")))
+                                        .setAggregatorSpecs(aggregators(
+                                            new 
FloatLastAggregatorFactory("a0:a", "m1"),
+                                            new 
LongLastAggregatorFactory("a1:a", "cnt"),
+                                            new 
DoubleLastAggregatorFactory("a2:a", "m2"))
+                                        )
+                                        .setPostAggregatorSpecs(
+                                            ImmutableList.of(
+                                                new 
FinalizingFieldAccessPostAggregator("a0", "a0:a"),
+                                                new 
FinalizingFieldAccessPostAggregator("a1", "a1:a"),
+                                                new 
FinalizingFieldAccessPostAggregator("a2", "a2:a")
+
+                                            )
+                                        )
+                                        .setContext(QUERY_CONTEXT_DEFAULT)
+                                        .build()
+                        )
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setAggregatorSpecs(aggregators(
+                            new DoubleSumAggregatorFactory("_a0", "a0"),
+                            new LongSumAggregatorFactory("_a1", "a1"),
+                            new DoubleSumAggregatorFactory("_a2", "a2")
+                                            )
+                        )
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{18.0, 4L, 
18.0}) : ImmutableList.of(new Object[]{15.0, 3L, 15.0})
+    );
+  }
+
+  // This test the off-heap (buffer) version of the EarliestAggregator 
(Double/Float/Long)
+  @Test
+  public void testPrimitiveEarliestInSubquery() throws Exception
+  {
+    // Cannot vectorize EARLIEST aggregator.
+    skipVectorize();
+
+    testQuery(
+        "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, 
EARLIEST(m1) AS val1, EARLIEST(cnt) AS val2, EARLIEST(m2) AS val3 FROM foo 
GROUP BY dim2)",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            GroupByQuery.builder()
+                                        
.setDataSource(CalciteTests.DATASOURCE1)
+                                        
.setInterval(querySegmentSpec(Filtration.eternity()))
+                                        .setGranularity(Granularities.ALL)
+                                        .setDimensions(dimensions(new 
DefaultDimensionSpec("dim2", "d0")))
+                                        .setAggregatorSpecs(aggregators(
+                                            new 
FloatFirstAggregatorFactory("a0:a", "m1"),
+                                            new 
LongFirstAggregatorFactory("a1:a", "cnt"),
+                                            new 
DoubleFirstAggregatorFactory("a2:a", "m2"))
+                                        )
+                                        .setPostAggregatorSpecs(
+                                            ImmutableList.of(
+                                                new 
FinalizingFieldAccessPostAggregator("a0", "a0:a"),
+                                                new 
FinalizingFieldAccessPostAggregator("a1", "a1:a"),
+                                                new 
FinalizingFieldAccessPostAggregator("a2", "a2:a")
+
+                                            )
+                                        )
+                                        .setContext(QUERY_CONTEXT_DEFAULT)
+                                        .build()
+                        )
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setAggregatorSpecs(aggregators(
+                            new DoubleSumAggregatorFactory("_a0", "a0"),
+                            new LongSumAggregatorFactory("_a1", "a1"),
+                            new DoubleSumAggregatorFactory("_a2", "a2")
+                                            )
+                        )
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{11.0, 4L, 
11.0}) : ImmutableList.of(new Object[]{8.0, 3L, 8.0})
+    );
+  }
+
+  // This test the off-heap (buffer) version of the LatestAggregator (String)
+  @Test
+  public void testStringLatestInSubquery() throws Exception
+  {
+    // Cannot vectorize LATEST aggregator.
+    skipVectorize();
+
+    testQuery(
+        "SELECT SUM(val) FROM (SELECT dim2, LATEST(dim1, 10) AS val FROM foo 
GROUP BY dim2)",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            GroupByQuery.builder()
+                                        
.setDataSource(CalciteTests.DATASOURCE1)
+                                        
.setInterval(querySegmentSpec(Filtration.eternity()))
+                                        .setGranularity(Granularities.ALL)
+                                        .setDimensions(dimensions(new 
DefaultDimensionSpec("dim2", "d0")))
+                                        .setAggregatorSpecs(aggregators(new 
StringLastAggregatorFactory("a0:a", "dim1", 10)))
+                                        .setPostAggregatorSpecs(
+                                            ImmutableList.of(
+                                                new 
FinalizingFieldAccessPostAggregator("a0", "a0:a")
+                                            )
+                                        )
+                                        .setContext(QUERY_CONTEXT_DEFAULT)
+                                        .build()
+                        )
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setAggregatorSpecs(aggregators(new 
DoubleSumAggregatorFactory("_a0", null, "CAST(\"a0\", 'DOUBLE')", 
ExprMacroTable.nil())))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{NullHandling.sqlCompatible() ? 3 : 1.0}
+        )
+    );
+  }
+
+  // This test the off-heap (buffer) version of the EarliestAggregator (String)
+  @Test
+  public void testStringEarliestInSubquery() throws Exception
+  {
+    // Cannot vectorize EARLIEST aggregator.
+    skipVectorize();
+
+    testQuery(
+        "SELECT SUM(val) FROM (SELECT dim2, EARLIEST(dim1, 10) AS val FROM foo 
GROUP BY dim2)",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            GroupByQuery.builder()
+                                        
.setDataSource(CalciteTests.DATASOURCE1)
+                                        
.setInterval(querySegmentSpec(Filtration.eternity()))
+                                        .setGranularity(Granularities.ALL)
+                                        .setDimensions(dimensions(new 
DefaultDimensionSpec("dim2", "d0")))
+                                        .setAggregatorSpecs(aggregators(new 
StringFirstAggregatorFactory("a0:a", "dim1", 10)))
                                         .setPostAggregatorSpecs(
                                             ImmutableList.of(
                                                 new 
FinalizingFieldAccessPostAggregator("a0", "a0:a")
@@ -1324,12 +1460,12 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
                         )
                         .setInterval(querySegmentSpec(Filtration.eternity()))
                         .setGranularity(Granularities.ALL)
-                        .setAggregatorSpecs(aggregators(new 
DoubleSumAggregatorFactory("_a0", "a0")))
+                        .setAggregatorSpecs(aggregators(new 
DoubleSumAggregatorFactory("_a0", null, "CAST(\"a0\", 'DOUBLE')", 
ExprMacroTable.nil())))
                         .setContext(QUERY_CONTEXT_DEFAULT)
                         .build()
         ),
         ImmutableList.of(
-            new Object[]{NullHandling.sqlCompatible() ? 18.0 : 15.0}
+            new Object[]{NullHandling.sqlCompatible() ? 12.1 : 11.1}
         )
     );
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to