This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 0.17.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.17.0 by this push:
new e92e109 Fix LATEST / EARLIEST Buffer Aggregator does not work on
String column (#9197) (#9210)
e92e109 is described below
commit e92e1094d0db78a41e9d27402664e0d85b6aa180
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Fri Jan 17 18:42:22 2020 -0800
Fix LATEST / EARLIEST Buffer Aggregator does not work on String column
(#9197) (#9210)
* fix buff limit bug
* add tests
* add test
* add tests
* fix checkstyle
---
.../aggregation/first/StringFirstLastUtils.java | 2 +-
.../first/StringFirstLastUtilsTest.java | 59 +++++++++
.../apache/druid/sql/calcite/CalciteQueryTest.java | 146 ++++++++++++++++++++-
3 files changed, 201 insertions(+), 6 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
index 133c4ba..630f70c 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
@@ -87,7 +87,7 @@ public class StringFirstLastUtils
if (pair.rhs != null) {
mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
- mutationBuffer.limit(maxStringBytes);
+ mutationBuffer.limit(position + Long.BYTES + Integer.BYTES +
maxStringBytes);
final int len = StringUtils.toUtf8WithLimit(pair.rhs, mutationBuffer);
mutationBuffer.putInt(position + Long.BYTES, len);
} else {
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java
new file mode 100644
index 0000000..b4e4088
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.query.aggregation.SerializablePairLongString;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class StringFirstLastUtilsTest
+{
+ private static final SerializablePairLongString PAIR_TO_WRITE = new
SerializablePairLongString(
+ DateTimes.MAX.getMillis(),
+ "asdasddsaasd"
+ );
+
+ private static final int BUFFER_CAPACITY = 100;
+ // PAIR_TO_WRITE Size is 12 so MAX_BYTE_TO_WRITE is set to 15 which is more
than enough
+ private static final int MAX_BYTE_TO_WRITE = 15;
+
+ @Test
+ public void testWritePairThenReadPairAtBeginningBuffer()
+ {
+ int positionAtBeginning = 0;
+ ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY);
+ StringFirstLastUtils.writePair(buf, positionAtBeginning, PAIR_TO_WRITE,
MAX_BYTE_TO_WRITE);
+ SerializablePairLongString actual = StringFirstLastUtils.readPair(buf,
positionAtBeginning);
+ Assert.assertEquals(PAIR_TO_WRITE, actual);
+ }
+
+ @Test
+ public void testWritePairThenReadPairAtMiddleBuffer()
+ {
+ int positionAtMiddle = 60;
+ ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY);
+ StringFirstLastUtils.writePair(buf, positionAtMiddle, PAIR_TO_WRITE,
MAX_BYTE_TO_WRITE);
+ SerializablePairLongString actual = StringFirstLastUtils.readPair(buf,
positionAtMiddle);
+ Assert.assertEquals(PAIR_TO_WRITE, actual);
+ }
+}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 895b452..8fbb1ad 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -44,11 +44,13 @@ import
org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import
org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
+import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
import
org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
@@ -1298,13 +1300,13 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
}
@Test
- public void testLatestInSubquery() throws Exception
+ public void testPrimitiveLatestInSubquery() throws Exception
{
// Cannot vectorize LATEST aggregator.
skipVectorize();
testQuery(
- "SELECT SUM(val) FROM (SELECT dim2, LATEST(m1) AS val FROM foo GROUP
BY dim2)",
+ "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, LATEST(m1)
AS val1, LATEST(cnt) AS val2, LATEST(m2) AS val3 FROM foo GROUP BY dim2)",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
@@ -1313,7 +1315,141 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new
DefaultDimensionSpec("dim2", "d0")))
- .setAggregatorSpecs(aggregators(new
FloatLastAggregatorFactory("a0:a", "m1")))
+ .setAggregatorSpecs(aggregators(
+ new
FloatLastAggregatorFactory("a0:a", "m1"),
+ new
LongLastAggregatorFactory("a1:a", "cnt"),
+ new
DoubleLastAggregatorFactory("a2:a", "m2"))
+ )
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new
FinalizingFieldAccessPostAggregator("a0", "a0:a"),
+ new
FinalizingFieldAccessPostAggregator("a1", "a1:a"),
+ new
FinalizingFieldAccessPostAggregator("a2", "a2:a")
+
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ )
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setAggregatorSpecs(aggregators(
+ new DoubleSumAggregatorFactory("_a0", "a0"),
+ new LongSumAggregatorFactory("_a1", "a1"),
+ new DoubleSumAggregatorFactory("_a2", "a2")
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{18.0, 4L,
18.0}) : ImmutableList.of(new Object[]{15.0, 3L, 15.0})
+ );
+ }
+
+ // This test the off-heap (buffer) version of the EarliestAggregator
(Double/Float/Long)
+ @Test
+ public void testPrimitiveEarliestInSubquery() throws Exception
+ {
+ // Cannot vectorize EARLIEST aggregator.
+ skipVectorize();
+
+ testQuery(
+ "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2,
EARLIEST(m1) AS val1, EARLIEST(cnt) AS val2, EARLIEST(m2) AS val3 FROM foo
GROUP BY dim2)",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(
+ GroupByQuery.builder()
+
.setDataSource(CalciteTests.DATASOURCE1)
+
.setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(new
DefaultDimensionSpec("dim2", "d0")))
+ .setAggregatorSpecs(aggregators(
+ new
FloatFirstAggregatorFactory("a0:a", "m1"),
+ new
LongFirstAggregatorFactory("a1:a", "cnt"),
+ new
DoubleFirstAggregatorFactory("a2:a", "m2"))
+ )
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new
FinalizingFieldAccessPostAggregator("a0", "a0:a"),
+ new
FinalizingFieldAccessPostAggregator("a1", "a1:a"),
+ new
FinalizingFieldAccessPostAggregator("a2", "a2:a")
+
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ )
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setAggregatorSpecs(aggregators(
+ new DoubleSumAggregatorFactory("_a0", "a0"),
+ new LongSumAggregatorFactory("_a1", "a1"),
+ new DoubleSumAggregatorFactory("_a2", "a2")
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{11.0, 4L,
11.0}) : ImmutableList.of(new Object[]{8.0, 3L, 8.0})
+ );
+ }
+
+ // This test the off-heap (buffer) version of the LatestAggregator (String)
+ @Test
+ public void testStringLatestInSubquery() throws Exception
+ {
+ // Cannot vectorize LATEST aggregator.
+ skipVectorize();
+
+ testQuery(
+ "SELECT SUM(val) FROM (SELECT dim2, LATEST(dim1, 10) AS val FROM foo
GROUP BY dim2)",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(
+ GroupByQuery.builder()
+
.setDataSource(CalciteTests.DATASOURCE1)
+
.setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(new
DefaultDimensionSpec("dim2", "d0")))
+ .setAggregatorSpecs(aggregators(new
StringLastAggregatorFactory("a0:a", "dim1", 10)))
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new
FinalizingFieldAccessPostAggregator("a0", "a0:a")
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ )
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setAggregatorSpecs(aggregators(new
DoubleSumAggregatorFactory("_a0", null, "CAST(\"a0\", 'DOUBLE')",
ExprMacroTable.nil())))
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{NullHandling.sqlCompatible() ? 3 : 1.0}
+ )
+ );
+ }
+
+ // This test the off-heap (buffer) version of the EarliestAggregator (String)
+ @Test
+ public void testStringEarliestInSubquery() throws Exception
+ {
+ // Cannot vectorize EARLIEST aggregator.
+ skipVectorize();
+
+ testQuery(
+ "SELECT SUM(val) FROM (SELECT dim2, EARLIEST(dim1, 10) AS val FROM foo
GROUP BY dim2)",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(
+ GroupByQuery.builder()
+
.setDataSource(CalciteTests.DATASOURCE1)
+
.setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(new
DefaultDimensionSpec("dim2", "d0")))
+ .setAggregatorSpecs(aggregators(new
StringFirstAggregatorFactory("a0:a", "dim1", 10)))
.setPostAggregatorSpecs(
ImmutableList.of(
new
FinalizingFieldAccessPostAggregator("a0", "a0:a")
@@ -1324,12 +1460,12 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
- .setAggregatorSpecs(aggregators(new
DoubleSumAggregatorFactory("_a0", "a0")))
+ .setAggregatorSpecs(aggregators(new
DoubleSumAggregatorFactory("_a0", null, "CAST(\"a0\", 'DOUBLE')",
ExprMacroTable.nil())))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
- new Object[]{NullHandling.sqlCompatible() ? 18.0 : 15.0}
+ new Object[]{NullHandling.sqlCompatible() ? 12.1 : 11.1}
)
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]