This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 0.17.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.17.0 by this push:
new 9647219 [0.17.0] Speed up String first/last aggregators when folding
isn't needed. (#9181) (#9215)
9647219 is described below
commit 9647219c90a6d8d63dc2d7fadd1c700bacb8718b
Author: Suneet Saldanha <[email protected]>
AuthorDate: Sat Jan 18 04:12:50 2020 -0800
[0.17.0] Speed up String first/last aggregators when folding isn't needed.
(#9181) (#9215)
* Speed up String first/last aggregators when folding isn't needed. (#9181)
* Speed up String first/last aggregators when folding isn't needed.
Examines the value column, and disables fold checking via a needsFoldCheck
flag if that column can't possibly contain SerializableLongStringPairs. This
is helpful because it avoids calling getObject on the value selector when
unnecessary; say, because the time selector didn't yield an earlier or later
value.
* PR comments.
* Move fastLooseChop to StringUtils.
* actually fix conflict correctly
* remove unused import
Co-authored-by: Gian Merlino <[email protected]>
---
.../apache/druid/java/util/common/StringUtils.java | 35 ++++++++++++++
.../druid/java/util/common/StringUtilsTest.java | 28 +++++++++++
.../aggregation/first/StringFirstAggregator.java | 47 ++++++++++++-------
.../first/StringFirstAggregatorFactory.java | 13 ++++--
.../first/StringFirstBufferAggregator.java | 54 ++++++++++++++++------
.../aggregation/first/StringFirstLastUtils.java | 35 +++++++++-----
.../aggregation/last/StringLastAggregator.java | 47 +++++++++++--------
.../last/StringLastAggregatorFactory.java | 14 ++++--
.../last/StringLastBufferAggregator.java | 54 ++++++++++++++++------
.../first/StringFirstAggregationTest.java | 8 +++-
.../first/StringFirstBufferAggregatorTest.java | 46 ++++++++++++++++--
.../last/StringLastAggregationTest.java | 5 ++
.../last/StringLastBufferAggregatorTest.java | 50 ++++++++++++++++++--
13 files changed, 341 insertions(+), 95 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
index 7485802..33a4e3c 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
@@ -546,4 +546,39 @@ public class StringUtils
return new String(data);
}
+ /**
+ * Returns the string truncated to maxBytes.
+ * If given string input is shorter than maxBytes, then it remains the same.
+ *
+ * @param s The input string to possibly be truncated
+ * @param maxBytes The max bytes that string input will be truncated to
+ *
+ * @return the string after truncated to maxBytes
+ */
+ @Nullable
+ public static String chop(@Nullable final String s, final int maxBytes)
+ {
+ if (s == null) {
+ return null;
+ } else {
+ // Shorten firstValue to what could fit in maxBytes as UTF-8.
+ final byte[] bytes = new byte[maxBytes];
+ final int len = StringUtils.toUtf8WithLimit(s, ByteBuffer.wrap(bytes));
+ return new String(bytes, 0, len, StandardCharsets.UTF_8);
+ }
+ }
+
+ /**
+ * Shorten "s" to "maxBytes" chars. Fast and loose because these are *chars*
not *bytes*. Use
+ * {@link #chop(String, int)} for slower, but accurate chopping.
+ */
+ @Nullable
+ public static String fastLooseChop(@Nullable final String s, final int
maxBytes)
+ {
+ if (s == null || s.length() <= maxBytes) {
+ return s;
+ } else {
+ return s.substring(0, maxBytes);
+ }
+ }
}
diff --git
a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
index e9f5f21..1d4cb6d 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
@@ -246,4 +246,32 @@ public class StringUtilsTest
Assert.assertEquals(s5, null);
}
+ @Test
+ public void testChop()
+ {
+ Assert.assertEquals("foo", StringUtils.chop("foo", 5));
+ Assert.assertEquals("fo", StringUtils.chop("foo", 2));
+ Assert.assertEquals("", StringUtils.chop("foo", 0));
+ Assert.assertEquals("smile 🙂 for", StringUtils.chop("smile 🙂 for the
camera", 14));
+ Assert.assertEquals("smile 🙂", StringUtils.chop("smile 🙂 for the camera",
10));
+ Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera",
9));
+ Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera",
8));
+ Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera",
7));
+ Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera",
6));
+ Assert.assertEquals("smile", StringUtils.chop("smile 🙂 for the camera",
5));
+ }
+
+ @Test
+ public void testFastLooseChop()
+ {
+ Assert.assertEquals("foo", StringUtils.fastLooseChop("foo", 5));
+ Assert.assertEquals("fo", StringUtils.fastLooseChop("foo", 2));
+ Assert.assertEquals("", StringUtils.fastLooseChop("foo", 0));
+ Assert.assertEquals("smile 🙂 for", StringUtils.fastLooseChop("smile 🙂 for
the camera", 12));
+ Assert.assertEquals("smile 🙂 ", StringUtils.fastLooseChop("smile 🙂 for the
camera", 9));
+ Assert.assertEquals("smile 🙂", StringUtils.fastLooseChop("smile 🙂 for the
camera", 8));
+ Assert.assertEquals("smile \uD83D", StringUtils.fastLooseChop("smile 🙂 for
the camera", 7));
+ Assert.assertEquals("smile ", StringUtils.fastLooseChop("smile 🙂 for the
camera", 6));
+ Assert.assertEquals("smile", StringUtils.fastLooseChop("smile 🙂 for the
camera", 5));
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
index 0260044..2d5ee99 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
@@ -20,32 +20,34 @@
package org.apache.druid.query.aggregation.first;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
-
-import javax.annotation.Nullable;
+import org.apache.druid.segment.DimensionHandlerUtils;
public class StringFirstAggregator implements Aggregator
{
- @Nullable
private final BaseLongColumnValueSelector timeSelector;
- private final BaseObjectColumnValueSelector valueSelector;
+ private final BaseObjectColumnValueSelector<?> valueSelector;
private final int maxStringBytes;
+ private final boolean needsFoldCheck;
protected long firstTime;
protected String firstValue;
public StringFirstAggregator(
- @Nullable BaseLongColumnValueSelector timeSelector,
- BaseObjectColumnValueSelector valueSelector,
- int maxStringBytes
+ BaseLongColumnValueSelector timeSelector,
+ BaseObjectColumnValueSelector<?> valueSelector,
+ int maxStringBytes,
+ boolean needsFoldCheck
)
{
this.valueSelector = valueSelector;
this.timeSelector = timeSelector;
this.maxStringBytes = maxStringBytes;
+ this.needsFoldCheck = needsFoldCheck;
firstTime = DateTimes.MAX.getMillis();
firstValue = null;
@@ -54,17 +56,28 @@ public class StringFirstAggregator implements Aggregator
@Override
public void aggregate()
{
- final SerializablePairLongString inPair =
StringFirstLastUtils.readPairFromSelectors(
- timeSelector,
- valueSelector
- );
+ if (needsFoldCheck) {
+ // Less efficient code path when folding is a possibility (we must read
the value selector first just in case
+ // it's a foldable object).
+ final SerializablePairLongString inPair =
StringFirstLastUtils.readPairFromSelectors(
+ timeSelector,
+ valueSelector
+ );
+
+ if (inPair != null && inPair.rhs != null && inPair.lhs < firstTime) {
+ firstTime = inPair.lhs;
+ firstValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes);
+ }
+ } else {
+ final long time = timeSelector.getLong();
- if (inPair != null && inPair.rhs != null && inPair.lhs < firstTime) {
- firstTime = inPair.lhs;
- firstValue = inPair.rhs;
+ if (time < firstTime) {
+ final String value =
DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
- if (firstValue.length() > maxStringBytes) {
- firstValue = firstValue.substring(0, maxStringBytes);
+ if (value != null) {
+ firstTime = time;
+ firstValue = StringUtils.fastLooseChop(value, maxStringBytes);
+ }
}
}
}
@@ -72,7 +85,7 @@ public class StringFirstAggregator implements Aggregator
@Override
public Object get()
{
- return new SerializablePairLongString(firstTime,
StringFirstLastUtils.chop(firstValue, maxStringBytes));
+ return new SerializablePairLongString(firstTime,
StringUtils.chop(firstValue, maxStringBytes));
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
index 1b30bf7..6ad8558 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
@@ -32,6 +32,7 @@ import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnHolder;
@@ -118,20 +119,24 @@ public class StringFirstAggregatorFactory extends
AggregatorFactory
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
+ final BaseObjectColumnValueSelector<?> valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
return new StringFirstAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
- metricFactory.makeColumnValueSelector(fieldName),
- maxStringBytes
+ valueSelector,
+ maxStringBytes,
+ StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector,
metricFactory.getColumnCapabilities(fieldName))
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
+ final BaseObjectColumnValueSelector<?> valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
return new StringFirstBufferAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
- metricFactory.makeColumnValueSelector(fieldName),
- maxStringBytes
+ valueSelector,
+ maxStringBytes,
+ StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector,
metricFactory.getColumnCapabilities(fieldName))
);
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
index 5a4c006..b7d5ac8 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
@@ -25,6 +25,7 @@ import
org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.DimensionHandlerUtils;
import java.nio.ByteBuffer;
@@ -36,18 +37,21 @@ public class StringFirstBufferAggregator implements
BufferAggregator
);
private final BaseLongColumnValueSelector timeSelector;
- private final BaseObjectColumnValueSelector valueSelector;
+ private final BaseObjectColumnValueSelector<?> valueSelector;
private final int maxStringBytes;
+ private final boolean needsFoldCheck;
public StringFirstBufferAggregator(
BaseLongColumnValueSelector timeSelector,
- BaseObjectColumnValueSelector valueSelector,
- int maxStringBytes
+ BaseObjectColumnValueSelector<?> valueSelector,
+ int maxStringBytes,
+ boolean needsFoldCheck
)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
this.maxStringBytes = maxStringBytes;
+ this.needsFoldCheck = needsFoldCheck;
}
@Override
@@ -59,20 +63,40 @@ public class StringFirstBufferAggregator implements
BufferAggregator
@Override
public void aggregate(ByteBuffer buf, int position)
{
- final SerializablePairLongString inPair =
StringFirstLastUtils.readPairFromSelectors(
- timeSelector,
- valueSelector
- );
+ if (needsFoldCheck) {
+ // Less efficient code path when folding is a possibility (we must read
the value selector first just in case
+ // it's a foldable object).
+ final SerializablePairLongString inPair =
StringFirstLastUtils.readPairFromSelectors(
+ timeSelector,
+ valueSelector
+ );
- if (inPair != null && inPair.rhs != null) {
+ if (inPair != null && inPair.rhs != null) {
+ final long firstTime = buf.getLong(position);
+ if (inPair.lhs < firstTime) {
+ StringFirstLastUtils.writePair(
+ buf,
+ position,
+ new SerializablePairLongString(inPair.lhs, inPair.rhs),
+ maxStringBytes
+ );
+ }
+ }
+ } else {
+ final long time = timeSelector.getLong();
final long firstTime = buf.getLong(position);
- if (inPair.lhs < firstTime) {
- StringFirstLastUtils.writePair(
- buf,
- position,
- new SerializablePairLongString(inPair.lhs, inPair.rhs),
- maxStringBytes
- );
+
+ if (time < firstTime) {
+ final String value =
DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
+
+ if (value != null) {
+ StringFirstLastUtils.writePair(
+ buf,
+ position,
+ new SerializablePairLongString(time, value),
+ maxStringBytes
+ );
+ }
}
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
index 630f70c..910cb94 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
@@ -24,32 +24,45 @@ import
org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.NilColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ValueType;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
public class StringFirstLastUtils
{
private static final int NULL_VALUE = -1;
- @Nullable
- public static String chop(@Nullable final String s, final int maxBytes)
+ /**
+ * Returns whether a given value selector *might* contain
SerializablePairLongString objects.
+ */
+ public static boolean selectorNeedsFoldCheck(
+ final BaseObjectColumnValueSelector<?> valueSelector,
+ @Nullable final ColumnCapabilities valueSelectorCapabilities
+ )
{
- if (s == null) {
- return null;
- } else {
- // Shorten firstValue to what could fit in maxBytes as UTF-8.
- final byte[] bytes = new byte[maxBytes];
- final int len = StringUtils.toUtf8WithLimit(s, ByteBuffer.wrap(bytes));
- return new String(bytes, 0, len, StandardCharsets.UTF_8);
+ if (valueSelectorCapabilities != null &&
valueSelectorCapabilities.getType() != ValueType.COMPLEX) {
+ // Known, non-complex type.
+ return false;
}
+
+ if (valueSelector instanceof NilColumnValueSelector) {
+ // Nil column, definitely no SerializablePairLongStrings.
+ return false;
+ }
+
+ // Check if the selector class could possibly be a
SerializablePairLongString (either a superclass or subclass).
+ final Class<?> clazz = valueSelector.classOfObject();
+ return clazz.isAssignableFrom(SerializablePairLongString.class)
+ || SerializablePairLongString.class.isAssignableFrom(clazz);
}
@Nullable
public static SerializablePairLongString readPairFromSelectors(
final BaseLongColumnValueSelector timeSelector,
- final BaseObjectColumnValueSelector valueSelector
+ final BaseObjectColumnValueSelector<?> valueSelector
)
{
final long time;
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
index 01be5db..ea37ff4 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
@@ -20,30 +20,35 @@
package org.apache.druid.query.aggregation.last;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.first.StringFirstLastUtils;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.DimensionHandlerUtils;
public class StringLastAggregator implements Aggregator
{
private final BaseLongColumnValueSelector timeSelector;
- private final BaseObjectColumnValueSelector valueSelector;
+ private final BaseObjectColumnValueSelector<?> valueSelector;
private final int maxStringBytes;
+ private final boolean needsFoldCheck;
protected long lastTime;
protected String lastValue;
public StringLastAggregator(
- BaseLongColumnValueSelector timeSelector,
- BaseObjectColumnValueSelector valueSelector,
- int maxStringBytes
+ final BaseLongColumnValueSelector timeSelector,
+ final BaseObjectColumnValueSelector<?> valueSelector,
+ final int maxStringBytes,
+ final boolean needsFoldCheck
)
{
this.valueSelector = valueSelector;
this.timeSelector = timeSelector;
this.maxStringBytes = maxStringBytes;
+ this.needsFoldCheck = needsFoldCheck;
lastTime = DateTimes.MIN.getMillis();
lastValue = null;
@@ -52,22 +57,28 @@ public class StringLastAggregator implements Aggregator
@Override
public void aggregate()
{
- final SerializablePairLongString inPair =
StringFirstLastUtils.readPairFromSelectors(
- timeSelector,
- valueSelector
- );
+ if (needsFoldCheck) {
+ // Less efficient code path when folding is a possibility (we must read
the value selector first just in case
+ // it's a foldable object).
+ final SerializablePairLongString inPair =
StringFirstLastUtils.readPairFromSelectors(
+ timeSelector,
+ valueSelector
+ );
- if (inPair == null) {
- // Don't aggregate nulls.
- return;
- }
+ if (inPair != null && inPair.rhs != null && inPair.lhs >= lastTime) {
+ lastTime = inPair.lhs;
+ lastValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes);
+ }
+ } else {
+ final long time = timeSelector.getLong();
- if (inPair != null && inPair.rhs != null && inPair.lhs >= lastTime) {
- lastTime = inPair.lhs;
- lastValue = inPair.rhs;
+ if (time >= lastTime) {
+ final String value =
DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
- if (lastValue.length() > maxStringBytes) {
- lastValue = lastValue.substring(0, maxStringBytes);
+ if (value != null) {
+ lastTime = time;
+ lastValue = StringUtils.fastLooseChop(value, maxStringBytes);
+ }
}
}
}
@@ -75,7 +86,7 @@ public class StringLastAggregator implements Aggregator
@Override
public Object get()
{
- return new SerializablePairLongString(lastTime,
StringFirstLastUtils.chop(lastValue, maxStringBytes));
+ return new SerializablePairLongString(lastTime,
StringUtils.chop(lastValue, maxStringBytes));
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
index 9277d05..9a3264f 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
@@ -32,7 +32,9 @@ import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
+import org.apache.druid.query.aggregation.first.StringFirstLastUtils;
import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnHolder;
@@ -74,20 +76,24 @@ public class StringLastAggregatorFactory extends
AggregatorFactory
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
+ final BaseObjectColumnValueSelector<?> valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
return new StringLastAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
- metricFactory.makeColumnValueSelector(fieldName),
- maxStringBytes
+ valueSelector,
+ maxStringBytes,
+ StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector,
metricFactory.getColumnCapabilities(fieldName))
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
+ final BaseObjectColumnValueSelector<?> valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
return new StringLastBufferAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
- metricFactory.makeColumnValueSelector(fieldName),
- maxStringBytes
+ valueSelector,
+ maxStringBytes,
+ StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector,
metricFactory.getColumnCapabilities(fieldName))
);
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
index 30ea428..09e3276 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
@@ -26,6 +26,7 @@ import
org.apache.druid.query.aggregation.first.StringFirstLastUtils;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.DimensionHandlerUtils;
import java.nio.ByteBuffer;
@@ -37,18 +38,21 @@ public class StringLastBufferAggregator implements
BufferAggregator
);
private final BaseLongColumnValueSelector timeSelector;
- private final BaseObjectColumnValueSelector valueSelector;
+ private final BaseObjectColumnValueSelector<?> valueSelector;
private final int maxStringBytes;
+ private final boolean needsFoldCheck;
public StringLastBufferAggregator(
BaseLongColumnValueSelector timeSelector,
- BaseObjectColumnValueSelector valueSelector,
- int maxStringBytes
+ BaseObjectColumnValueSelector<?> valueSelector,
+ int maxStringBytes,
+ boolean needsFoldCheck
)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
this.maxStringBytes = maxStringBytes;
+ this.needsFoldCheck = needsFoldCheck;
}
@Override
@@ -60,20 +64,40 @@ public class StringLastBufferAggregator implements
BufferAggregator
@Override
public void aggregate(ByteBuffer buf, int position)
{
- final SerializablePairLongString inPair =
StringFirstLastUtils.readPairFromSelectors(
- timeSelector,
- valueSelector
- );
+ if (needsFoldCheck) {
+ // Less efficient code path when folding is a possibility (we must read
the value selector first just in case
+ // it's a foldable object).
+ final SerializablePairLongString inPair =
StringFirstLastUtils.readPairFromSelectors(
+ timeSelector,
+ valueSelector
+ );
- if (inPair != null && inPair.rhs != null) {
+ if (inPair != null && inPair.rhs != null) {
+ final long lastTime = buf.getLong(position);
+ if (inPair.lhs >= lastTime) {
+ StringFirstLastUtils.writePair(
+ buf,
+ position,
+ new SerializablePairLongString(inPair.lhs, inPair.rhs),
+ maxStringBytes
+ );
+ }
+ }
+ } else {
+ final long time = timeSelector.getLong();
final long lastTime = buf.getLong(position);
- if (inPair.lhs >= lastTime) {
- StringFirstLastUtils.writePair(
- buf,
- position,
- new SerializablePairLongString(inPair.lhs, inPair.rhs),
- maxStringBytes
- );
+
+ if (time >= lastTime) {
+ final String value =
DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
+
+ if (value != null) {
+ StringFirstLastUtils.writePair(
+ buf,
+ position,
+ new SerializablePairLongString(time, value),
+ maxStringBytes
+ );
+ }
}
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java
index 190c343..0e450d3 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java
@@ -28,7 +28,9 @@ import
org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.TestLongColumnSelector;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
@@ -68,6 +70,9 @@ public class StringFirstAggregationTest
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
+ EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly"))
+ .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.STRING));
+
EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null);
EasyMock.replay(colSelectorFactory);
}
@@ -133,8 +138,7 @@ public class StringFirstAggregationTest
@Test
public void testStringFirstCombiningBufferAggregator()
{
- BufferAggregator agg = combiningAggFactory.factorizeBuffered(
- colSelectorFactory);
+ BufferAggregator agg =
combiningAggFactory.factorizeBuffered(colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new
byte[stringFirstAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
index 04700fa..3b4ef69 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
@@ -46,7 +46,6 @@ public class StringFirstBufferAggregatorTest
@Test
public void testBufferAggregate()
{
-
final long[] timestamps = {1526724600L, 1526724700L, 1526724800L,
1526725900L, 1526725000L};
final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
Integer maxStringBytes = 1024;
@@ -61,7 +60,8 @@ public class StringFirstBufferAggregatorTest
StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
longColumnSelector,
objectColumnSelector,
- maxStringBytes
+ maxStringBytes,
+ false
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
@@ -78,7 +78,43 @@ public class StringFirstBufferAggregatorTest
Assert.assertEquals("expected last string value", strings[0], sp.rhs);
Assert.assertEquals("last string timestamp is the biggest", new
Long(timestamps[0]), new Long(sp.lhs));
+ }
+
+ @Test
+ public void testBufferAggregateWithFoldCheck()
+ {
+ final long[] timestamps = {1526724600L, 1526724700L, 1526724800L,
1526725900L, 1526725000L};
+ final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+ Integer maxStringBytes = 1024;
+
+ TestLongColumnSelector longColumnSelector = new
TestLongColumnSelector(timestamps);
+ TestObjectColumnSelector<String> objectColumnSelector = new
TestObjectColumnSelector<>(strings);
+
+ StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
+ "billy", "billy", maxStringBytes
+ );
+
+ StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
+ longColumnSelector,
+ objectColumnSelector,
+ maxStringBytes,
+ true
+ );
+ ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+ int position = 0;
+
+ agg.init(buf, position);
+ //noinspection ForLoopReplaceableByForEach
+ for (int i = 0; i < timestamps.length; i++) {
+ aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf,
position);
+ }
+
+ SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf,
position));
+
+
+ Assert.assertEquals("expected last string value", strings[0], sp.rhs);
+ Assert.assertEquals("last string timestamp is the biggest", new
Long(timestamps[0]), new Long(sp.lhs));
}
@Test
@@ -99,7 +135,8 @@ public class StringFirstBufferAggregatorTest
StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
longColumnSelector,
objectColumnSelector,
- maxStringBytes
+ maxStringBytes,
+ false
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
@@ -137,7 +174,8 @@ public class StringFirstBufferAggregatorTest
StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
longColumnSelector,
objectColumnSelector,
- maxStringBytes
+ maxStringBytes,
+ false
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java
index 2e22f9c..39f9925 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java
@@ -28,7 +28,9 @@ import
org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.TestLongColumnSelector;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
@@ -68,6 +70,9 @@ public class StringLastAggregationTest
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
+ EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly"))
+ .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.STRING));
+
EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null);
EasyMock.replay(colSelectorFactory);
}
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
index 18a3788..6c350c4 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
@@ -61,7 +61,8 @@ public class StringLastBufferAggregatorTest
StringLastBufferAggregator agg = new StringLastBufferAggregator(
longColumnSelector,
objectColumnSelector,
- maxStringBytes
+ maxStringBytes,
+ false
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
@@ -76,12 +77,49 @@ public class StringLastBufferAggregatorTest
SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf,
position));
- Assert.assertEquals("expectec last string value", "DDDD", sp.rhs);
+ Assert.assertEquals("expected last string value", "DDDD", sp.rhs);
Assert.assertEquals("last string timestamp is the biggest", new
Long(1526725900L), new Long(sp.lhs));
}
@Test
+ public void testBufferAggregateWithFoldCheck()
+ {
+ final long[] timestamps = {1526724600L, 1526724700L, 1526724800L,
1526725900L, 1526725000L};
+ final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+ Integer maxStringBytes = 1024;
+
+ TestLongColumnSelector longColumnSelector = new
TestLongColumnSelector(timestamps);
+ TestObjectColumnSelector<String> objectColumnSelector = new
TestObjectColumnSelector<>(strings);
+
+ StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
+ "billy", "billy", maxStringBytes
+ );
+
+ StringLastBufferAggregator agg = new StringLastBufferAggregator(
+ longColumnSelector,
+ objectColumnSelector,
+ maxStringBytes,
+ true
+ );
+
+ ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+ int position = 0;
+
+ agg.init(buf, position);
+ //noinspection ForLoopReplaceableByForEach
+ for (int i = 0; i < timestamps.length; i++) {
+ aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf,
position);
+ }
+
+ SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf,
position));
+
+
+ Assert.assertEquals("expected last string value", "DDDD", sp.rhs);
+ Assert.assertEquals("last string timestamp is the biggest", new
Long(1526725900L), new Long(sp.lhs));
+ }
+
+ @Test
public void testNullBufferAggregate()
{
@@ -99,7 +137,8 @@ public class StringLastBufferAggregatorTest
StringLastBufferAggregator agg = new StringLastBufferAggregator(
longColumnSelector,
objectColumnSelector,
- maxStringBytes
+ maxStringBytes,
+ false
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
@@ -114,7 +153,7 @@ public class StringLastBufferAggregatorTest
SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf,
position));
- Assert.assertEquals("expectec last string value", strings[2], sp.rhs);
+ Assert.assertEquals("expected last string value", strings[2], sp.rhs);
Assert.assertEquals("last string timestamp is the biggest", new
Long(timestamps[2]), new Long(sp.lhs));
}
@@ -137,7 +176,8 @@ public class StringLastBufferAggregatorTest
StringLastBufferAggregator agg = new StringLastBufferAggregator(
longColumnSelector,
objectColumnSelector,
- maxStringBytes
+ maxStringBytes,
+ false
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]