This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.17.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.17.0 by this push:
     new 9647219  [0.17.0] Speed up String first/last aggregators when folding 
isn't needed. (#9181) (#9215)
9647219 is described below

commit 9647219c90a6d8d63dc2d7fadd1c700bacb8718b
Author: Suneet Saldanha <[email protected]>
AuthorDate: Sat Jan 18 04:12:50 2020 -0800

    [0.17.0] Speed up String first/last aggregators when folding isn't needed. 
(#9181) (#9215)
    
    * Speed up String first/last aggregators when folding isn't needed. (#9181)
    
    * Speed up String first/last aggregators when folding isn't needed.
    
    Examines the value column, and disables fold checking via a needsFoldCheck
    flag if that column can't possibly contain SerializableLongStringPairs. This
    is helpful because it avoids calling getObject on the value selector when
    unnecessary; say, because the time selector didn't yield an earlier or later
    value.
    
    * PR comments.
    
    * Move fastLooseChop to StringUtils.
    
    * actually fix conflict correctly
    
    * remove unused import
    
    Co-authored-by: Gian Merlino <[email protected]>
---
 .../apache/druid/java/util/common/StringUtils.java | 35 ++++++++++++++
 .../druid/java/util/common/StringUtilsTest.java    | 28 +++++++++++
 .../aggregation/first/StringFirstAggregator.java   | 47 ++++++++++++-------
 .../first/StringFirstAggregatorFactory.java        | 13 ++++--
 .../first/StringFirstBufferAggregator.java         | 54 ++++++++++++++++------
 .../aggregation/first/StringFirstLastUtils.java    | 35 +++++++++-----
 .../aggregation/last/StringLastAggregator.java     | 47 +++++++++++--------
 .../last/StringLastAggregatorFactory.java          | 14 ++++--
 .../last/StringLastBufferAggregator.java           | 54 ++++++++++++++++------
 .../first/StringFirstAggregationTest.java          |  8 +++-
 .../first/StringFirstBufferAggregatorTest.java     | 46 ++++++++++++++++--
 .../last/StringLastAggregationTest.java            |  5 ++
 .../last/StringLastBufferAggregatorTest.java       | 50 ++++++++++++++++++--
 13 files changed, 341 insertions(+), 95 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java 
b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
index 7485802..33a4e3c 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
@@ -546,4 +546,39 @@ public class StringUtils
     return new String(data);
   }
 
+  /**
+   * Returns the string truncated to maxBytes.
+   * If given string input is shorter than maxBytes, then it remains the same.
+   *
+   * @param s        The input string to possibly be truncated
+   * @param maxBytes The max bytes that string input will be truncated to
+   *
+   * @return the string after truncated to maxBytes
+   */
+  @Nullable
+  public static String chop(@Nullable final String s, final int maxBytes)
+  {
+    if (s == null) {
+      return null;
+    } else {
+      // Shorten firstValue to what could fit in maxBytes as UTF-8.
+      final byte[] bytes = new byte[maxBytes];
+      final int len = StringUtils.toUtf8WithLimit(s, ByteBuffer.wrap(bytes));
+      return new String(bytes, 0, len, StandardCharsets.UTF_8);
+    }
+  }
+
+  /**
+   * Shorten "s" to "maxBytes" chars. Fast and loose because these are *chars* 
not *bytes*. Use
+   * {@link #chop(String, int)} for slower, but accurate chopping.
+   */
+  @Nullable
+  public static String fastLooseChop(@Nullable final String s, final int 
maxBytes)
+  {
+    if (s == null || s.length() <= maxBytes) {
+      return s;
+    } else {
+      return s.substring(0, maxBytes);
+    }
+  }
 }
diff --git 
a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java 
b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
index e9f5f21..1d4cb6d 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
@@ -246,4 +246,32 @@ public class StringUtilsTest
     Assert.assertEquals(s5, null);
   }
 
+  @Test
+  public void testChop()
+  {
+    Assert.assertEquals("foo", StringUtils.chop("foo", 5));
+    Assert.assertEquals("fo", StringUtils.chop("foo", 2));
+    Assert.assertEquals("", StringUtils.chop("foo", 0));
+    Assert.assertEquals("smile 🙂 for", StringUtils.chop("smile 🙂 for the 
camera", 14));
+    Assert.assertEquals("smile 🙂", StringUtils.chop("smile 🙂 for the camera", 
10));
+    Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 
9));
+    Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 
8));
+    Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 
7));
+    Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 
6));
+    Assert.assertEquals("smile", StringUtils.chop("smile 🙂 for the camera", 
5));
+  }
+
+  @Test
+  public void testFastLooseChop()
+  {
+    Assert.assertEquals("foo", StringUtils.fastLooseChop("foo", 5));
+    Assert.assertEquals("fo", StringUtils.fastLooseChop("foo", 2));
+    Assert.assertEquals("", StringUtils.fastLooseChop("foo", 0));
+    Assert.assertEquals("smile 🙂 for", StringUtils.fastLooseChop("smile 🙂 for 
the camera", 12));
+    Assert.assertEquals("smile 🙂 ", StringUtils.fastLooseChop("smile 🙂 for the 
camera", 9));
+    Assert.assertEquals("smile 🙂", StringUtils.fastLooseChop("smile 🙂 for the 
camera", 8));
+    Assert.assertEquals("smile \uD83D", StringUtils.fastLooseChop("smile 🙂 for 
the camera", 7));
+    Assert.assertEquals("smile ", StringUtils.fastLooseChop("smile 🙂 for the 
camera", 6));
+    Assert.assertEquals("smile", StringUtils.fastLooseChop("smile 🙂 for the 
camera", 5));
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
index 0260044..2d5ee99 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
@@ -20,32 +20,34 @@
 package org.apache.druid.query.aggregation.first;
 
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.query.aggregation.SerializablePairLongString;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
-
-import javax.annotation.Nullable;
+import org.apache.druid.segment.DimensionHandlerUtils;
 
 public class StringFirstAggregator implements Aggregator
 {
-  @Nullable
   private final BaseLongColumnValueSelector timeSelector;
-  private final BaseObjectColumnValueSelector valueSelector;
+  private final BaseObjectColumnValueSelector<?> valueSelector;
   private final int maxStringBytes;
+  private final boolean needsFoldCheck;
 
   protected long firstTime;
   protected String firstValue;
 
   public StringFirstAggregator(
-      @Nullable BaseLongColumnValueSelector timeSelector,
-      BaseObjectColumnValueSelector valueSelector,
-      int maxStringBytes
+      BaseLongColumnValueSelector timeSelector,
+      BaseObjectColumnValueSelector<?> valueSelector,
+      int maxStringBytes,
+      boolean needsFoldCheck
   )
   {
     this.valueSelector = valueSelector;
     this.timeSelector = timeSelector;
     this.maxStringBytes = maxStringBytes;
+    this.needsFoldCheck = needsFoldCheck;
 
     firstTime = DateTimes.MAX.getMillis();
     firstValue = null;
@@ -54,17 +56,28 @@ public class StringFirstAggregator implements Aggregator
   @Override
   public void aggregate()
   {
-    final SerializablePairLongString inPair = 
StringFirstLastUtils.readPairFromSelectors(
-        timeSelector,
-        valueSelector
-    );
+    if (needsFoldCheck) {
+      // Less efficient code path when folding is a possibility (we must read 
the value selector first just in case
+      // it's a foldable object).
+      final SerializablePairLongString inPair = 
StringFirstLastUtils.readPairFromSelectors(
+          timeSelector,
+          valueSelector
+      );
+
+      if (inPair != null && inPair.rhs != null && inPair.lhs < firstTime) {
+        firstTime = inPair.lhs;
+        firstValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes);
+      }
+    } else {
+      final long time = timeSelector.getLong();
 
-    if (inPair != null && inPair.rhs != null && inPair.lhs < firstTime) {
-      firstTime = inPair.lhs;
-      firstValue = inPair.rhs;
+      if (time < firstTime) {
+        final String value = 
DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
 
-      if (firstValue.length() > maxStringBytes) {
-        firstValue = firstValue.substring(0, maxStringBytes);
+        if (value != null) {
+          firstTime = time;
+          firstValue = StringUtils.fastLooseChop(value, maxStringBytes);
+        }
       }
     }
   }
@@ -72,7 +85,7 @@ public class StringFirstAggregator implements Aggregator
   @Override
   public Object get()
   {
-    return new SerializablePairLongString(firstTime, 
StringFirstLastUtils.chop(firstValue, maxStringBytes));
+    return new SerializablePairLongString(firstTime, 
StringUtils.chop(firstValue, maxStringBytes));
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
index 1b30bf7..6ad8558 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
@@ -32,6 +32,7 @@ import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.aggregation.SerializablePairLongString;
 import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.column.ColumnHolder;
 
@@ -118,20 +119,24 @@ public class StringFirstAggregatorFactory extends 
AggregatorFactory
   @Override
   public Aggregator factorize(ColumnSelectorFactory metricFactory)
   {
+    final BaseObjectColumnValueSelector<?> valueSelector = 
metricFactory.makeColumnValueSelector(fieldName);
     return new StringFirstAggregator(
         metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
-        metricFactory.makeColumnValueSelector(fieldName),
-        maxStringBytes
+        valueSelector,
+        maxStringBytes,
+        StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, 
metricFactory.getColumnCapabilities(fieldName))
     );
   }
 
   @Override
   public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
   {
+    final BaseObjectColumnValueSelector<?> valueSelector = 
metricFactory.makeColumnValueSelector(fieldName);
     return new StringFirstBufferAggregator(
         metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
-        metricFactory.makeColumnValueSelector(fieldName),
-        maxStringBytes
+        valueSelector,
+        maxStringBytes,
+        StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, 
metricFactory.getColumnCapabilities(fieldName))
     );
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
index 5a4c006..b7d5ac8 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
@@ -25,6 +25,7 @@ import 
org.apache.druid.query.aggregation.SerializablePairLongString;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.DimensionHandlerUtils;
 
 import java.nio.ByteBuffer;
 
@@ -36,18 +37,21 @@ public class StringFirstBufferAggregator implements 
BufferAggregator
   );
 
   private final BaseLongColumnValueSelector timeSelector;
-  private final BaseObjectColumnValueSelector valueSelector;
+  private final BaseObjectColumnValueSelector<?> valueSelector;
   private final int maxStringBytes;
+  private final boolean needsFoldCheck;
 
   public StringFirstBufferAggregator(
       BaseLongColumnValueSelector timeSelector,
-      BaseObjectColumnValueSelector valueSelector,
-      int maxStringBytes
+      BaseObjectColumnValueSelector<?> valueSelector,
+      int maxStringBytes,
+      boolean needsFoldCheck
   )
   {
     this.timeSelector = timeSelector;
     this.valueSelector = valueSelector;
     this.maxStringBytes = maxStringBytes;
+    this.needsFoldCheck = needsFoldCheck;
   }
 
   @Override
@@ -59,20 +63,40 @@ public class StringFirstBufferAggregator implements 
BufferAggregator
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
-    final SerializablePairLongString inPair = 
StringFirstLastUtils.readPairFromSelectors(
-        timeSelector,
-        valueSelector
-    );
+    if (needsFoldCheck) {
+      // Less efficient code path when folding is a possibility (we must read 
the value selector first just in case
+      // it's a foldable object).
+      final SerializablePairLongString inPair = 
StringFirstLastUtils.readPairFromSelectors(
+          timeSelector,
+          valueSelector
+      );
 
-    if (inPair != null && inPair.rhs != null) {
+      if (inPair != null && inPair.rhs != null) {
+        final long firstTime = buf.getLong(position);
+        if (inPair.lhs < firstTime) {
+          StringFirstLastUtils.writePair(
+              buf,
+              position,
+              new SerializablePairLongString(inPair.lhs, inPair.rhs),
+              maxStringBytes
+          );
+        }
+      }
+    } else {
+      final long time = timeSelector.getLong();
       final long firstTime = buf.getLong(position);
-      if (inPair.lhs < firstTime) {
-        StringFirstLastUtils.writePair(
-            buf,
-            position,
-            new SerializablePairLongString(inPair.lhs, inPair.rhs),
-            maxStringBytes
-        );
+
+      if (time < firstTime) {
+        final String value = 
DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
+
+        if (value != null) {
+          StringFirstLastUtils.writePair(
+              buf,
+              position,
+              new SerializablePairLongString(time, value),
+              maxStringBytes
+          );
+        }
       }
     }
   }
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
index 630f70c..910cb94 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
@@ -24,32 +24,45 @@ import 
org.apache.druid.query.aggregation.SerializablePairLongString;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.NilColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ValueType;
 
 import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 
 public class StringFirstLastUtils
 {
   private static final int NULL_VALUE = -1;
 
-  @Nullable
-  public static String chop(@Nullable final String s, final int maxBytes)
+  /**
+   * Returns whether a given value selector *might* contain 
SerializablePairLongString objects.
+   */
+  public static boolean selectorNeedsFoldCheck(
+      final BaseObjectColumnValueSelector<?> valueSelector,
+      @Nullable final ColumnCapabilities valueSelectorCapabilities
+  )
   {
-    if (s == null) {
-      return null;
-    } else {
-      // Shorten firstValue to what could fit in maxBytes as UTF-8.
-      final byte[] bytes = new byte[maxBytes];
-      final int len = StringUtils.toUtf8WithLimit(s, ByteBuffer.wrap(bytes));
-      return new String(bytes, 0, len, StandardCharsets.UTF_8);
+    if (valueSelectorCapabilities != null && 
valueSelectorCapabilities.getType() != ValueType.COMPLEX) {
+      // Known, non-complex type.
+      return false;
     }
+
+    if (valueSelector instanceof NilColumnValueSelector) {
+      // Nil column, definitely no SerializablePairLongStrings.
+      return false;
+    }
+
+    // Check if the selector class could possibly be a 
SerializablePairLongString (either a superclass or subclass).
+    final Class<?> clazz = valueSelector.classOfObject();
+    return clazz.isAssignableFrom(SerializablePairLongString.class)
+           || SerializablePairLongString.class.isAssignableFrom(clazz);
   }
 
   @Nullable
   public static SerializablePairLongString readPairFromSelectors(
       final BaseLongColumnValueSelector timeSelector,
-      final BaseObjectColumnValueSelector valueSelector
+      final BaseObjectColumnValueSelector<?> valueSelector
   )
   {
     final long time;
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
index 01be5db..ea37ff4 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
@@ -20,30 +20,35 @@
 package org.apache.druid.query.aggregation.last;
 
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.query.aggregation.SerializablePairLongString;
 import org.apache.druid.query.aggregation.first.StringFirstLastUtils;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.DimensionHandlerUtils;
 
 public class StringLastAggregator implements Aggregator
 {
   private final BaseLongColumnValueSelector timeSelector;
-  private final BaseObjectColumnValueSelector valueSelector;
+  private final BaseObjectColumnValueSelector<?> valueSelector;
   private final int maxStringBytes;
+  private final boolean needsFoldCheck;
 
   protected long lastTime;
   protected String lastValue;
 
   public StringLastAggregator(
-      BaseLongColumnValueSelector timeSelector,
-      BaseObjectColumnValueSelector valueSelector,
-      int maxStringBytes
+      final BaseLongColumnValueSelector timeSelector,
+      final BaseObjectColumnValueSelector<?> valueSelector,
+      final int maxStringBytes,
+      final boolean needsFoldCheck
   )
   {
     this.valueSelector = valueSelector;
     this.timeSelector = timeSelector;
     this.maxStringBytes = maxStringBytes;
+    this.needsFoldCheck = needsFoldCheck;
 
     lastTime = DateTimes.MIN.getMillis();
     lastValue = null;
@@ -52,22 +57,28 @@ public class StringLastAggregator implements Aggregator
   @Override
   public void aggregate()
   {
-    final SerializablePairLongString inPair = 
StringFirstLastUtils.readPairFromSelectors(
-        timeSelector,
-        valueSelector
-    );
+    if (needsFoldCheck) {
+      // Less efficient code path when folding is a possibility (we must read 
the value selector first just in case
+      // it's a foldable object).
+      final SerializablePairLongString inPair = 
StringFirstLastUtils.readPairFromSelectors(
+          timeSelector,
+          valueSelector
+      );
 
-    if (inPair == null) {
-      // Don't aggregate nulls.
-      return;
-    }
+      if (inPair != null && inPair.rhs != null && inPair.lhs >= lastTime) {
+        lastTime = inPair.lhs;
+        lastValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes);
+      }
+    } else {
+      final long time = timeSelector.getLong();
 
-    if (inPair != null && inPair.rhs != null && inPair.lhs >= lastTime) {
-      lastTime = inPair.lhs;
-      lastValue = inPair.rhs;
+      if (time >= lastTime) {
+        final String value = 
DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
 
-      if (lastValue.length() > maxStringBytes) {
-        lastValue = lastValue.substring(0, maxStringBytes);
+        if (value != null) {
+          lastTime = time;
+          lastValue = StringUtils.fastLooseChop(value, maxStringBytes);
+        }
       }
     }
   }
@@ -75,7 +86,7 @@ public class StringLastAggregator implements Aggregator
   @Override
   public Object get()
   {
-    return new SerializablePairLongString(lastTime, 
StringFirstLastUtils.chop(lastValue, maxStringBytes));
+    return new SerializablePairLongString(lastTime, 
StringUtils.chop(lastValue, maxStringBytes));
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
index 9277d05..9a3264f 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
@@ -32,7 +32,9 @@ import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.aggregation.SerializablePairLongString;
 import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
+import org.apache.druid.query.aggregation.first.StringFirstLastUtils;
 import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.column.ColumnHolder;
 
@@ -74,20 +76,24 @@ public class StringLastAggregatorFactory extends 
AggregatorFactory
   @Override
   public Aggregator factorize(ColumnSelectorFactory metricFactory)
   {
+    final BaseObjectColumnValueSelector<?> valueSelector = 
metricFactory.makeColumnValueSelector(fieldName);
     return new StringLastAggregator(
         metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
-        metricFactory.makeColumnValueSelector(fieldName),
-        maxStringBytes
+        valueSelector,
+        maxStringBytes,
+        StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, 
metricFactory.getColumnCapabilities(fieldName))
     );
   }
 
   @Override
   public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
   {
+    final BaseObjectColumnValueSelector<?> valueSelector = 
metricFactory.makeColumnValueSelector(fieldName);
     return new StringLastBufferAggregator(
         metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
-        metricFactory.makeColumnValueSelector(fieldName),
-        maxStringBytes
+        valueSelector,
+        maxStringBytes,
+        StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, 
metricFactory.getColumnCapabilities(fieldName))
     );
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
index 30ea428..09e3276 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
@@ -26,6 +26,7 @@ import 
org.apache.druid.query.aggregation.first.StringFirstLastUtils;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.DimensionHandlerUtils;
 
 import java.nio.ByteBuffer;
 
@@ -37,18 +38,21 @@ public class StringLastBufferAggregator implements 
BufferAggregator
   );
 
   private final BaseLongColumnValueSelector timeSelector;
-  private final BaseObjectColumnValueSelector valueSelector;
+  private final BaseObjectColumnValueSelector<?> valueSelector;
   private final int maxStringBytes;
+  private final boolean needsFoldCheck;
 
   public StringLastBufferAggregator(
       BaseLongColumnValueSelector timeSelector,
-      BaseObjectColumnValueSelector valueSelector,
-      int maxStringBytes
+      BaseObjectColumnValueSelector<?> valueSelector,
+      int maxStringBytes,
+      boolean needsFoldCheck
   )
   {
     this.timeSelector = timeSelector;
     this.valueSelector = valueSelector;
     this.maxStringBytes = maxStringBytes;
+    this.needsFoldCheck = needsFoldCheck;
   }
 
   @Override
@@ -60,20 +64,40 @@ public class StringLastBufferAggregator implements 
BufferAggregator
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
-    final SerializablePairLongString inPair = 
StringFirstLastUtils.readPairFromSelectors(
-        timeSelector,
-        valueSelector
-    );
+    if (needsFoldCheck) {
+      // Less efficient code path when folding is a possibility (we must read 
the value selector first just in case
+      // it's a foldable object).
+      final SerializablePairLongString inPair = 
StringFirstLastUtils.readPairFromSelectors(
+          timeSelector,
+          valueSelector
+      );
 
-    if (inPair != null && inPair.rhs != null) {
+      if (inPair != null && inPair.rhs != null) {
+        final long lastTime = buf.getLong(position);
+        if (inPair.lhs >= lastTime) {
+          StringFirstLastUtils.writePair(
+              buf,
+              position,
+              new SerializablePairLongString(inPair.lhs, inPair.rhs),
+              maxStringBytes
+          );
+        }
+      }
+    } else {
+      final long time = timeSelector.getLong();
       final long lastTime = buf.getLong(position);
-      if (inPair.lhs >= lastTime) {
-        StringFirstLastUtils.writePair(
-            buf,
-            position,
-            new SerializablePairLongString(inPair.lhs, inPair.rhs),
-            maxStringBytes
-        );
+
+      if (time >= lastTime) {
+        final String value = 
DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
+
+        if (value != null) {
+          StringFirstLastUtils.writePair(
+              buf,
+              position,
+              new SerializablePairLongString(time, value),
+              maxStringBytes
+          );
+        }
       }
     }
   }
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java
index 190c343..0e450d3 100644
--- 
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java
@@ -28,7 +28,9 @@ import 
org.apache.druid.query.aggregation.SerializablePairLongString;
 import org.apache.druid.query.aggregation.TestLongColumnSelector;
 import org.apache.druid.query.aggregation.TestObjectColumnSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
 import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
@@ -68,6 +70,9 @@ public class StringFirstAggregationTest
     
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
     
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
     
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
+    EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly"))
+            .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.STRING));
+    
EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null);
     EasyMock.replay(colSelectorFactory);
   }
 
@@ -133,8 +138,7 @@ public class StringFirstAggregationTest
   @Test
   public void testStringFirstCombiningBufferAggregator()
   {
-    BufferAggregator agg = combiningAggFactory.factorizeBuffered(
-        colSelectorFactory);
+    BufferAggregator agg = 
combiningAggFactory.factorizeBuffered(colSelectorFactory);
 
     ByteBuffer buffer = ByteBuffer.wrap(new 
byte[stringFirstAggFactory.getMaxIntermediateSize()]);
     agg.init(buffer, 0);
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
index 04700fa..3b4ef69 100644
--- 
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
@@ -46,7 +46,6 @@ public class StringFirstBufferAggregatorTest
   @Test
   public void testBufferAggregate()
   {
-
     final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 
1526725900L, 1526725000L};
     final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
     Integer maxStringBytes = 1024;
@@ -61,7 +60,8 @@ public class StringFirstBufferAggregatorTest
     StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
         longColumnSelector,
         objectColumnSelector,
-        maxStringBytes
+        maxStringBytes,
+        false
     );
 
     ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
@@ -78,7 +78,43 @@ public class StringFirstBufferAggregatorTest
 
     Assert.assertEquals("expected last string value", strings[0], sp.rhs);
     Assert.assertEquals("last string timestamp is the biggest", new 
Long(timestamps[0]), new Long(sp.lhs));
+  }
+
+  @Test
+  public void testBufferAggregateWithFoldCheck()
+  {
+    final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 
1526725900L, 1526725000L};
+    final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+    Integer maxStringBytes = 1024;
+
+    TestLongColumnSelector longColumnSelector = new 
TestLongColumnSelector(timestamps);
+    TestObjectColumnSelector<String> objectColumnSelector = new 
TestObjectColumnSelector<>(strings);
+
+    StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
+        "billy", "billy", maxStringBytes
+    );
+
+    StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
+        longColumnSelector,
+        objectColumnSelector,
+        maxStringBytes,
+        true
+    );
 
+    ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+    int position = 0;
+
+    agg.init(buf, position);
+    //noinspection ForLoopReplaceableByForEach
+    for (int i = 0; i < timestamps.length; i++) {
+      aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, 
position);
+    }
+
+    SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, 
position));
+
+
+    Assert.assertEquals("expected last string value", strings[0], sp.rhs);
+    Assert.assertEquals("last string timestamp is the biggest", new 
Long(timestamps[0]), new Long(sp.lhs));
   }
 
   @Test
@@ -99,7 +135,8 @@ public class StringFirstBufferAggregatorTest
     StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
         longColumnSelector,
         objectColumnSelector,
-        maxStringBytes
+        maxStringBytes,
+        false
     );
 
     ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
@@ -137,7 +174,8 @@ public class StringFirstBufferAggregatorTest
     StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
         longColumnSelector,
         objectColumnSelector,
-        maxStringBytes
+        maxStringBytes,
+        false
     );
 
     ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java
index 2e22f9c..39f9925 100644
--- 
a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java
@@ -28,7 +28,9 @@ import 
org.apache.druid.query.aggregation.SerializablePairLongString;
 import org.apache.druid.query.aggregation.TestLongColumnSelector;
 import org.apache.druid.query.aggregation.TestObjectColumnSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
 import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
@@ -68,6 +70,9 @@ public class StringLastAggregationTest
     
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
     
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
     
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
+    EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly"))
+            .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.STRING));
+    
EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null);
     EasyMock.replay(colSelectorFactory);
   }
 
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
index 18a3788..6c350c4 100644
--- 
a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
@@ -61,7 +61,8 @@ public class StringLastBufferAggregatorTest
     StringLastBufferAggregator agg = new StringLastBufferAggregator(
         longColumnSelector,
         objectColumnSelector,
-        maxStringBytes
+        maxStringBytes,
+        false
     );
 
     ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
@@ -76,12 +77,49 @@ public class StringLastBufferAggregatorTest
     SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, 
position));
 
 
-    Assert.assertEquals("expectec last string value", "DDDD", sp.rhs);
+    Assert.assertEquals("expected last string value", "DDDD", sp.rhs);
     Assert.assertEquals("last string timestamp is the biggest", new 
Long(1526725900L), new Long(sp.lhs));
 
   }
 
   @Test
+  public void testBufferAggregateWithFoldCheck()
+  {
+    final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 
1526725900L, 1526725000L};
+    final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+    Integer maxStringBytes = 1024;
+
+    TestLongColumnSelector longColumnSelector = new 
TestLongColumnSelector(timestamps);
+    TestObjectColumnSelector<String> objectColumnSelector = new 
TestObjectColumnSelector<>(strings);
+
+    StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
+        "billy", "billy", maxStringBytes
+    );
+
+    StringLastBufferAggregator agg = new StringLastBufferAggregator(
+        longColumnSelector,
+        objectColumnSelector,
+        maxStringBytes,
+        true
+    );
+
+    ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+    int position = 0;
+
+    agg.init(buf, position);
+    //noinspection ForLoopReplaceableByForEach
+    for (int i = 0; i < timestamps.length; i++) {
+      aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, 
position);
+    }
+
+    SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, 
position));
+
+
+    Assert.assertEquals("expected last string value", "DDDD", sp.rhs);
+    Assert.assertEquals("last string timestamp is the biggest", new 
Long(1526725900L), new Long(sp.lhs));
+  }
+
+  @Test
   public void testNullBufferAggregate()
   {
 
@@ -99,7 +137,8 @@ public class StringLastBufferAggregatorTest
     StringLastBufferAggregator agg = new StringLastBufferAggregator(
         longColumnSelector,
         objectColumnSelector,
-        maxStringBytes
+        maxStringBytes,
+        false
     );
 
     ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
@@ -114,7 +153,7 @@ public class StringLastBufferAggregatorTest
     SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, 
position));
 
 
-    Assert.assertEquals("expectec last string value", strings[2], sp.rhs);
+    Assert.assertEquals("expected last string value", strings[2], sp.rhs);
     Assert.assertEquals("last string timestamp is the biggest", new 
Long(timestamps[2]), new Long(sp.lhs));
 
   }
@@ -137,7 +176,8 @@ public class StringLastBufferAggregatorTest
     StringLastBufferAggregator agg = new StringLastBufferAggregator(
         longColumnSelector,
         objectColumnSelector,
-        maxStringBytes
+        maxStringBytes,
+        false
     );
 
     ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to