This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new e270362 Add stringLast and stringFirst aggregators extension (#5789)
e270362 is described below
commit e2703627676366655001900091282c83d31315a4
Author: Andrés Gómez <[email protected]>
AuthorDate: Wed Aug 1 19:52:54 2018 +0200
Add stringLast and stringFirst aggregators extension (#5789)
* Add lastString and firstString aggregators extension
* Remove duplicated class
* Move first-last-string doc page to extensions-contrib
* Fix ObjectStrategy compare method
* Fix doc bad aggregatos type name
* Create FoldingAggregatorFactory classes to fix SegmentMetadataQuery
* Add getMaxStringBytes() method to support JSON serialization
* Fix null pointer exception at segment creation phase when the string
value is null
* Control the valueSelector object class on BufferAggregators
* Perform all improvements
* Add java doc on SerializablePairLongStringSerde
* Refactor ObjectStraty compare method
* Remove unused ;
* Add aggregateCombiner unit tests. Rename BufferAggregators unit tests
* Remove unused imports
* Add license header
* Add class name to java doc class serde
* Throw exception if value is unsupported class type
* Move first-last-string extension into druid core
* Update druid core docs
* Fix null pointer exception when pair->string is null
* Add null control unit tests
* Remove unused imports
* Add first/last string folding aggregator on AggregatorsModule to support
segment metadata query
* Change SerializablePairLongString to extend SerializablePair
* Change vars from public to private
* Convert vars to primitive type
* Clarify compare comment
* Change IllegalStateException to ISE
* Remove TODO comments
* Control possible null pointer exception
* Add @Nullable annotation
* Remove empty line
* Remove unused parameter type
* Improve AggregatorCombiner javadocs
* Add filterNullValues option at StringLast and StringFirst aggregators
* Add filterNullValues option at agg documentation
* Fix checkstyle
* Update header license
* Fix StringFirstAggregatorFactory.VALUE_COMPARATOR
* Fix StringFirstAggregatorCombiner
* Fix if condition at StringFirstAggregateCombiner
* Remove filterNullValues from string first/last aggregators
* Add isReset flag in FirstAggregatorCombiner
* Change Arrays.asList to Collections.singletonList
---
docs/content/querying/aggregations.md | 32 +++-
.../java/io/druid/jackson/AggregatorsModule.java | 20 ++-
.../io/druid/query/aggregation/AggregatorUtil.java | 4 +
.../aggregation/SerializablePairLongString.java | 35 ++++
.../SerializablePairLongStringSerde.java | 146 ++++++++++++++++
.../first/LongFirstAggregatorFactory.java | 2 +-
.../first/StringFirstAggregateCombiner.java | 60 +++++++
.../aggregation/first/StringFirstAggregator.java | 110 ++++++++++++
...tory.java => StringFirstAggregatorFactory.java} | 163 ++++++++---------
.../first/StringFirstBufferAggregator.java | 157 +++++++++++++++++
.../first/StringFirstFoldingAggregatorFactory.java | 105 +++++++++++
.../last/LongLastAggregatorFactory.java | 2 +-
.../last/StringLastAggregateCombiner.java | 55 ++++++
.../aggregation/last/StringLastAggregator.java | 110 ++++++++++++
...ctory.java => StringLastAggregatorFactory.java} | 122 +++++--------
.../last/StringLastBufferAggregator.java | 157 +++++++++++++++++
.../last/StringLastFoldingAggregatorFactory.java | 102 +++++++++++
.../first/StringFirstAggregationTest.java | 194 +++++++++++++++++++++
.../first/StringFirstBufferAggregatorTest.java | 171 ++++++++++++++++++
.../first/StringFirstTimeseriesQueryTest.java | 123 +++++++++++++
.../last/StringLastAggregationTest.java | 194 +++++++++++++++++++++
.../last/StringLastBufferAggregatorTest.java | 171 ++++++++++++++++++
.../last/StringLastTimeseriesQueryTest.java | 126 +++++++++++++
23 files changed, 2197 insertions(+), 164 deletions(-)
diff --git a/docs/content/querying/aggregations.md
b/docs/content/querying/aggregations.md
index b0ce5cc..3f6b5e7 100644
--- a/docs/content/querying/aggregations.md
+++ b/docs/content/querying/aggregations.md
@@ -102,7 +102,7 @@ Computes and stores the sum of values as 32-bit floating
point value. Similar to
### First / Last aggregator
-First and Last aggregator cannot be used in ingestion spec, and should only be
specified as part of queries.
+(Double/Float/Long) First and Last aggregator cannot be used in ingestion
spec, and should only be specified as part of queries.
Note that queries with first/last aggregators on a segment created with rollup
enabled will return the rolled up value, and not the last value within the raw
ingested data.
@@ -178,6 +178,36 @@ Note that queries with first/last aggregators on a segment
created with rollup e
}
```
+#### `stringFirst` aggregator
+
+`stringFirst` computes the metric value with the minimum timestamp or `null`
if no row exist
+
+```json
+{
+ "type" : "stringFirst",
+ "name" : <output_name>,
+ "fieldName" : <metric_name>,
+ "maxStringBytes" : <integer> # (optional, defaults to 1024),
+ "filterNullValues" : <boolean> # (optional, defaults to false)
+}
+```
+
+
+
+#### `stringLast` aggregator
+
+`stringLast` computes the metric value with the maximum timestamp or `null` if
no row exist
+
+```json
+{
+ "type" : "stringLast",
+ "name" : <output_name>,
+ "fieldName" : <metric_name>,
+ "maxStringBytes" : <integer> # (optional, defaults to 1024),
+ "filterNullValues" : <boolean> # (optional, defaults to false)
+}
+```
+
### JavaScript aggregator
Computes an arbitrary JavaScript function over a set of columns (both metrics
and dimensions are allowed). Your
diff --git a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java
b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java
index 94deda0..d3cc9a7 100644
--- a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java
+++ b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java
@@ -38,10 +38,13 @@ import io.druid.query.aggregation.LongMaxAggregatorFactory;
import io.druid.query.aggregation.LongMinAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.aggregation.SerializablePairLongStringSerde;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import io.druid.query.aggregation.first.FloatFirstAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
+import io.druid.query.aggregation.first.StringFirstAggregatorFactory;
+import io.druid.query.aggregation.first.StringFirstFoldingAggregatorFactory;
import
io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
@@ -49,6 +52,8 @@ import
io.druid.query.aggregation.hyperloglog.PreComputedHyperUniquesSerde;
import io.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import io.druid.query.aggregation.last.FloatLastAggregatorFactory;
import io.druid.query.aggregation.last.LongLastAggregatorFactory;
+import io.druid.query.aggregation.last.StringLastAggregatorFactory;
+import io.druid.query.aggregation.last.StringLastFoldingAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.DoubleGreatestPostAggregator;
@@ -74,7 +79,14 @@ public class AggregatorsModule extends SimpleModule
}
if (ComplexMetrics.getSerdeForType("preComputedHyperUnique") == null) {
- ComplexMetrics.registerSerde("preComputedHyperUnique", new
PreComputedHyperUniquesSerde(HyperLogLogHash.getDefault()));
+ ComplexMetrics.registerSerde(
+ "preComputedHyperUnique",
+ new PreComputedHyperUniquesSerde(HyperLogLogHash.getDefault())
+ );
+ }
+
+ if (ComplexMetrics.getSerdeForType("serializablePairLongString") == null) {
+ ComplexMetrics.registerSerde("serializablePairLongString", new
SerializablePairLongStringSerde());
}
setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
@@ -101,9 +113,13 @@ public class AggregatorsModule extends SimpleModule
@JsonSubTypes.Type(name = "longFirst", value =
LongFirstAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleFirst", value =
DoubleFirstAggregatorFactory.class),
@JsonSubTypes.Type(name = "floatFirst", value =
FloatFirstAggregatorFactory.class),
+ @JsonSubTypes.Type(name = "stringFirst", value =
StringFirstAggregatorFactory.class),
+ @JsonSubTypes.Type(name = "stringFirstFold", value =
StringFirstFoldingAggregatorFactory.class),
@JsonSubTypes.Type(name = "longLast", value =
LongLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleLast", value =
DoubleLastAggregatorFactory.class),
- @JsonSubTypes.Type(name = "floatLast", value =
FloatLastAggregatorFactory.class)
+ @JsonSubTypes.Type(name = "floatLast", value =
FloatLastAggregatorFactory.class),
+ @JsonSubTypes.Type(name = "stringLast", value =
StringLastAggregatorFactory.class),
+ @JsonSubTypes.Type(name = "stringLastFold", value =
StringLastFoldingAggregatorFactory.class)
})
public interface AggregatorFactoryMixin
{
diff --git
a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java
b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java
index 610e6ba..eedd095 100644
--- a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java
+++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java
@@ -94,6 +94,10 @@ public class AggregatorUtil
public static final byte ARRAY_OF_DOUBLES_SKETCH_T_TEST_CACHE_TYPE_ID = 0x29;
public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_STRING_CACHE_TYPE_ID =
0x2A;
+ // StringFirst, StringLast aggregator
+ public static final byte STRING_FIRST_CACHE_TYPE_ID = 0x2B;
+ public static final byte STRING_LAST_CACHE_TYPE_ID = 0x2C;
+
/**
* returns the list of dependent postAggregators that should be calculated
in order to calculate given postAgg
*
diff --git
a/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongString.java
b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongString.java
new file mode 100644
index 0000000..91f9b26
--- /dev/null
+++
b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongString.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.druid.collections.SerializablePair;
+
+public class SerializablePairLongString extends SerializablePair<Long, String>
+{
+ @JsonCreator
+ public SerializablePairLongString(@JsonProperty("lhs") Long lhs,
@JsonProperty("rhs") String rhs)
+ {
+ super(lhs, rhs);
+ }
+}
+
+
diff --git
a/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongStringSerde.java
b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongStringSerde.java
new file mode 100644
index 0000000..ca245fa
--- /dev/null
+++
b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongStringSerde.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation;
+
+import io.druid.data.input.InputRow;
+import io.druid.java.util.common.StringUtils;
+import io.druid.query.aggregation.first.StringFirstAggregatorFactory;
+import io.druid.segment.GenericColumnSerializer;
+import io.druid.segment.column.ColumnBuilder;
+import io.druid.segment.data.GenericIndexed;
+import io.druid.segment.data.ObjectStrategy;
+import io.druid.segment.serde.ComplexColumnPartSupplier;
+import io.druid.segment.serde.ComplexMetricExtractor;
+import io.druid.segment.serde.ComplexMetricSerde;
+import io.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
+import io.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * The SerializablePairLongStringSerde serializes a Long-String pair
(SerializablePairLongString).
+ * The serialization structure is: Long:Integer:String
+ * <p>
+ * The class is used on first/last String aggregators to store the time and
the first/last string.
+ * Long:Integer:String -> Timestamp:StringSize:StringData
+ */
+public class SerializablePairLongStringSerde extends ComplexMetricSerde
+{
+
+ private static final String TYPE_NAME = "serializablePairLongString";
+
+ @Override
+ public String getTypeName()
+ {
+ return TYPE_NAME;
+ }
+
+ @Override
+ public ComplexMetricExtractor getExtractor()
+ {
+ return new ComplexMetricExtractor()
+ {
+ @Override
+ public Class<SerializablePairLongString> extractedClass()
+ {
+ return SerializablePairLongString.class;
+ }
+
+ @Override
+ public Object extractValue(InputRow inputRow, String metricName)
+ {
+ return inputRow.getRaw(metricName);
+ }
+ };
+ }
+
+ @Override
+ public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder)
+ {
+ final GenericIndexed column = GenericIndexed.read(buffer,
getObjectStrategy(), columnBuilder.getFileMapper());
+ columnBuilder.setComplexColumn(new
ComplexColumnPartSupplier(getTypeName(), column));
+ }
+
+ @Override
+ public ObjectStrategy getObjectStrategy()
+ {
+ return new ObjectStrategy<SerializablePairLongString>()
+ {
+ @Override
+ public int compare(@Nullable SerializablePairLongString o1, @Nullable
SerializablePairLongString o2)
+ {
+ return StringFirstAggregatorFactory.VALUE_COMPARATOR.compare(o1, o2);
+ }
+
+ @Override
+ public Class<? extends SerializablePairLongString> getClazz()
+ {
+ return SerializablePairLongString.class;
+ }
+
+ @Override
+ public SerializablePairLongString fromByteBuffer(ByteBuffer buffer, int
numBytes)
+ {
+ final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
+
+ long lhs = readOnlyBuffer.getLong();
+ int stringSize = readOnlyBuffer.getInt();
+
+ String lastString = null;
+ if (stringSize > 0) {
+ byte[] stringBytes = new byte[stringSize];
+ readOnlyBuffer.get(stringBytes, 0, stringSize);
+ lastString = StringUtils.fromUtf8(stringBytes);
+ }
+
+ return new SerializablePairLongString(lhs, lastString);
+ }
+
+ @Override
+ public byte[] toBytes(SerializablePairLongString val)
+ {
+ String rhsString = val.rhs;
+ ByteBuffer bbuf;
+
+ if (rhsString != null) {
+ byte[] rhsBytes = StringUtils.toUtf8(rhsString);
+ bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES +
rhsBytes.length);
+ bbuf.putLong(val.lhs);
+ bbuf.putInt(Long.BYTES, rhsBytes.length);
+ bbuf.position(Long.BYTES + Integer.BYTES);
+ bbuf.put(rhsBytes);
+ } else {
+ bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
+ bbuf.putLong(val.lhs);
+ bbuf.putInt(Long.BYTES, 0);
+ }
+
+ return bbuf.array();
+ }
+ };
+ }
+
+ @Override
+ public GenericColumnSerializer getSerializer(SegmentWriteOutMedium
segmentWriteOutMedium, String column)
+ {
+ return
LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium,
column, this.getObjectStrategy());
+ }
+}
diff --git
a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
index 56d8aed..32b575f 100644
---
a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
+++
b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
@@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
-import io.druid.java.util.common.StringUtils;
import io.druid.collections.SerializablePair;
+import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.UOE;
import io.druid.query.aggregation.AggregateCombiner;
import io.druid.query.aggregation.Aggregator;
diff --git
a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregateCombiner.java
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregateCombiner.java
new file mode 100644
index 0000000..20487f6
--- /dev/null
+++
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregateCombiner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import io.druid.query.aggregation.ObjectAggregateCombiner;
+import io.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+public class StringFirstAggregateCombiner extends
ObjectAggregateCombiner<String>
+{
+ private String firstString;
+ private boolean isReset = false;
+
+ @Override
+ public void reset(ColumnValueSelector selector)
+ {
+ firstString = (String) selector.getObject();
+ isReset = true;
+ }
+
+ @Override
+ public void fold(ColumnValueSelector selector)
+ {
+ if (!isReset) {
+ firstString = (String) selector.getObject();
+ isReset = true;
+ }
+ }
+
+ @Nullable
+ @Override
+ public String getObject()
+ {
+ return firstString;
+ }
+
+ @Override
+ public Class<String> classOfObject()
+ {
+ return String.class;
+ }
+}
diff --git
a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregator.java
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregator.java
new file mode 100644
index 0000000..5710a61
--- /dev/null
+++
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import io.druid.java.util.common.ISE;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.segment.BaseLongColumnValueSelector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+
+public class StringFirstAggregator implements Aggregator
+{
+
+ private final BaseObjectColumnValueSelector valueSelector;
+ private final BaseLongColumnValueSelector timeSelector;
+ private final int maxStringBytes;
+
+ protected long firstTime;
+ protected String firstValue;
+
+ public StringFirstAggregator(
+ BaseLongColumnValueSelector timeSelector,
+ BaseObjectColumnValueSelector valueSelector,
+ int maxStringBytes
+ )
+ {
+ this.valueSelector = valueSelector;
+ this.timeSelector = timeSelector;
+ this.maxStringBytes = maxStringBytes;
+
+ firstTime = Long.MAX_VALUE;
+ firstValue = null;
+ }
+
+ @Override
+ public void aggregate()
+ {
+ long time = timeSelector.getLong();
+ if (time < firstTime) {
+ firstTime = time;
+ Object value = valueSelector.getObject();
+
+ if (value != null) {
+ if (value instanceof String) {
+ firstValue = (String) value;
+ } else if (value instanceof SerializablePairLongString) {
+ firstValue = ((SerializablePairLongString) value).rhs;
+ } else {
+ throw new ISE(
+ "Try to aggregate unsuported class type [%s].Supported class
types: String or SerializablePairLongString",
+ value.getClass().getCanonicalName()
+ );
+ }
+
+ if (firstValue != null && firstValue.length() > maxStringBytes) {
+ firstValue = firstValue.substring(0, maxStringBytes);
+ }
+ } else {
+ firstValue = null;
+ }
+ }
+ }
+
+ @Override
+ public Object get()
+ {
+ return new SerializablePairLongString(firstTime, firstValue);
+ }
+
+ @Override
+ public float getFloat()
+ {
+ throw new UnsupportedOperationException("StringFirstAggregator does not
support getFloat()");
+ }
+
+ @Override
+ public long getLong()
+ {
+ throw new UnsupportedOperationException("StringFirstAggregator does not
support getLong()");
+ }
+
+ @Override
+ public double getDouble()
+ {
+ throw new UnsupportedOperationException("StringFirstAggregator does not
support getDouble()");
+ }
+
+ @Override
+ public void close()
+ {
+ // no resources to cleanup
+ }
+}
diff --git
a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregatorFactory.java
similarity index 52%
copy from
processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
copy to
processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregatorFactory.java
index 56d8aed..187e891 100644
---
a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
+++
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregatorFactory.java
@@ -21,66 +21,108 @@ package io.druid.query.aggregation.first;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
-import io.druid.java.util.common.StringUtils;
-import io.druid.collections.SerializablePair;
-import io.druid.java.util.common.UOE;
import io.druid.query.aggregation.AggregateCombiner;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
-import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
-import io.druid.segment.BaseObjectColumnValueSelector;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.cache.CacheKeyBuilder;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.column.Column;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
-public class LongFirstAggregatorFactory extends AggregatorFactory
+@JsonTypeName("stringFirst")
+public class StringFirstAggregatorFactory extends AggregatorFactory
{
- public static final Comparator VALUE_COMPARATOR = (o1, o2) -> Longs.compare(
- ((SerializablePair<Long, Long>) o1).rhs,
- ((SerializablePair<Long, Long>) o2).rhs
+ public static final int DEFAULT_MAX_STRING_SIZE = 1024;
+
+ public static final Comparator TIME_COMPARATOR = (o1, o2) -> Longs.compare(
+ ((SerializablePairLongString) o1).lhs,
+ ((SerializablePairLongString) o2).lhs
);
+ public static final Comparator<SerializablePairLongString> VALUE_COMPARATOR
= (o1, o2) -> {
+ int comparation;
+
+ // First we check if the objects are null
+ if (o1 == null && o2 == null) {
+ comparation = 0;
+ } else if (o1 == null) {
+ comparation = -1;
+ } else if (o2 == null) {
+ comparation = 1;
+ } else {
+
+ // If the objects are not null, we will try to compare using timestamp
+ comparation = o1.lhs.compareTo(o2.lhs);
+
+ // If both timestamp are the same, we try to compare the Strings
+ if (comparation == 0) {
+
+ // First we check if the strings are null
+ if (o1.rhs == null && o2.rhs == null) {
+ comparation = 0;
+ } else if (o1.rhs == null) {
+ comparation = -1;
+ } else if (o2.rhs == null) {
+ comparation = 1;
+ } else {
+
+ // If the strings are not null, we will compare them
+ // Note: This comparation maybe doesn't make sense to first/last
aggregators
+ comparation = o1.rhs.compareTo(o2.rhs);
+ }
+ }
+ }
+
+ return comparation;
+ };
+
private final String fieldName;
private final String name;
+ protected final int maxStringBytes;
@JsonCreator
- public LongFirstAggregatorFactory(
+ public StringFirstAggregatorFactory(
@JsonProperty("name") String name,
- @JsonProperty("fieldName") final String fieldName
+ @JsonProperty("fieldName") final String fieldName,
+ @JsonProperty("maxStringBytes") Integer maxStringBytes
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator
name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null
fieldName");
-
this.name = name;
this.fieldName = fieldName;
+ this.maxStringBytes = maxStringBytes == null ? DEFAULT_MAX_STRING_SIZE :
maxStringBytes;
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
- return new LongFirstAggregator(
+ return new StringFirstAggregator(
metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
- metricFactory.makeColumnValueSelector(fieldName)
+ metricFactory.makeColumnValueSelector(fieldName),
+ maxStringBytes
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
- return new LongFirstBufferAggregator(
+ return new StringFirstBufferAggregator(
metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
- metricFactory.makeColumnValueSelector(fieldName)
+ metricFactory.makeColumnValueSelector(fieldName),
+ maxStringBytes
);
}
@@ -93,82 +135,38 @@ public class LongFirstAggregatorFactory extends
AggregatorFactory
@Override
public Object combine(Object lhs, Object rhs)
{
- return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) <= 0
? lhs : rhs;
+ return TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs;
}
@Override
public AggregateCombiner makeAggregateCombiner()
{
- throw new UOE("LongFirstAggregatorFactory is not supported during
ingestion for rollup");
+ return new StringFirstAggregateCombiner();
}
@Override
public AggregatorFactory getCombiningFactory()
{
- return new LongFirstAggregatorFactory(name, name)
- {
- @Override
- public Aggregator factorize(ColumnSelectorFactory metricFactory)
- {
- final BaseObjectColumnValueSelector selector =
metricFactory.makeColumnValueSelector(name);
- return new LongFirstAggregator(null, null)
- {
- @Override
- public void aggregate()
- {
- SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>)
selector.getObject();
- if (pair.lhs < firstTime) {
- firstTime = pair.lhs;
- firstValue = pair.rhs;
- }
- }
- };
- }
-
- @Override
- public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
- {
- final BaseObjectColumnValueSelector selector =
metricFactory.makeColumnValueSelector(name);
- return new LongFirstBufferAggregator(null, null)
- {
- @Override
- public void aggregate(ByteBuffer buf, int position)
- {
- SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>)
selector.getObject();
- long firstTime = buf.getLong(position);
- if (pair.lhs < firstTime) {
- buf.putLong(position, pair.lhs);
- buf.putLong(position + Long.BYTES, pair.rhs);
- }
- }
-
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("selector", selector);
- }
- };
- }
- };
+ return new StringFirstFoldingAggregatorFactory(name, name, maxStringBytes);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
- return Collections.singletonList(new LongFirstAggregatorFactory(fieldName,
fieldName));
+ return Collections.singletonList(new
StringFirstAggregatorFactory(fieldName, fieldName, maxStringBytes));
}
@Override
public Object deserialize(Object object)
{
Map map = (Map) object;
- return new SerializablePair<>(((Number) map.get("lhs")).longValue(),
((Number) map.get("rhs")).longValue());
+ return new SerializablePairLongString(((Number)
map.get("lhs")).longValue(), ((String) map.get("rhs")));
}
@Override
public Object finalizeComputation(Object object)
{
- return ((SerializablePair<Long, Long>) object).rhs;
+ return ((SerializablePairLongString) object).rhs;
}
@Override
@@ -184,6 +182,12 @@ public class LongFirstAggregatorFactory extends
AggregatorFactory
return fieldName;
}
+ @JsonProperty
+ public Integer getMaxStringBytes()
+ {
+ return maxStringBytes;
+ }
+
@Override
public List<String> requiredFields()
{
@@ -193,24 +197,22 @@ public class LongFirstAggregatorFactory extends
AggregatorFactory
@Override
public byte[] getCacheKey()
{
- byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
-
- return ByteBuffer.allocate(1 + fieldNameBytes.length)
- .put(AggregatorUtil.LONG_FIRST_CACHE_TYPE_ID)
- .put(fieldNameBytes)
- .array();
+ return new CacheKeyBuilder(AggregatorUtil.STRING_FIRST_CACHE_TYPE_ID)
+ .appendString(fieldName)
+ .appendInt(maxStringBytes)
+ .build();
}
@Override
public String getTypeName()
{
- return "long";
+ return "serializablePairLongString";
}
@Override
public int getMaxIntermediateSize()
{
- return Long.BYTES * 2;
+ return Long.BYTES + Integer.BYTES + maxStringBytes;
}
@Override
@@ -223,25 +225,24 @@ public class LongFirstAggregatorFactory extends
AggregatorFactory
return false;
}
- LongFirstAggregatorFactory that = (LongFirstAggregatorFactory) o;
+ StringFirstAggregatorFactory that = (StringFirstAggregatorFactory) o;
- return fieldName.equals(that.fieldName) && name.equals(that.name);
+ return fieldName.equals(that.fieldName) && name.equals(that.name) &&
maxStringBytes == that.maxStringBytes;
}
@Override
public int hashCode()
{
- int result = name.hashCode();
- result = 31 * result + fieldName.hashCode();
- return result;
+ return Objects.hash(name, fieldName, maxStringBytes);
}
@Override
public String toString()
{
- return "LongFirstAggregatorFactory{" +
+ return "StringFirstAggregatorFactory{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
+ ", maxStringBytes=" + maxStringBytes + '\'' +
'}';
}
}
diff --git
a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstBufferAggregator.java
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstBufferAggregator.java
new file mode 100644
index 0000000..c71cfbf
--- /dev/null
+++
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstBufferAggregator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import io.druid.segment.BaseLongColumnValueSelector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+
+import java.nio.ByteBuffer;
+
+public class StringFirstBufferAggregator implements BufferAggregator
+{
+ private final BaseLongColumnValueSelector timeSelector;
+ private final BaseObjectColumnValueSelector valueSelector;
+ private final int maxStringBytes;
+
+ public StringFirstBufferAggregator(
+ BaseLongColumnValueSelector timeSelector,
+ BaseObjectColumnValueSelector valueSelector,
+ int maxStringBytes
+ )
+ {
+ this.timeSelector = timeSelector;
+ this.valueSelector = valueSelector;
+ this.maxStringBytes = maxStringBytes;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ buf.putLong(position, Long.MAX_VALUE);
+ buf.putInt(position + Long.BYTES, 0);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+
+ Object value = valueSelector.getObject();
+
+ long time = timeSelector.getLong();
+ String firstString = null;
+
+ if (value != null) {
+ if (value instanceof SerializablePairLongString) {
+ SerializablePairLongString serializablePair =
(SerializablePairLongString) value;
+ time = serializablePair.lhs;
+ firstString = serializablePair.rhs;
+ } else if (value instanceof String) {
+ firstString = (String) value;
+ } else {
+ throw new ISE(
+ "Try to aggregate unsuported class type [%s].Supported class
types: String or SerializablePairLongString",
+ value.getClass().getCanonicalName()
+ );
+ }
+ }
+
+ long lastTime = mutationBuffer.getLong(position);
+
+ if (time < lastTime) {
+ if (firstString != null) {
+ if (firstString.length() > maxStringBytes) {
+ firstString = firstString.substring(0, maxStringBytes);
+ }
+
+ byte[] valueBytes = StringUtils.toUtf8(firstString);
+
+ mutationBuffer.putLong(position, time);
+ mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
+
+ mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+ mutationBuffer.put(valueBytes);
+ } else {
+ mutationBuffer.putLong(position, time);
+ mutationBuffer.putInt(position + Long.BYTES, 0);
+ }
+ }
+ }
+
+ @Override
+ public Object get(ByteBuffer buf, int position)
+ {
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+
+ Long timeValue = mutationBuffer.getLong(position);
+ int stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES);
+
+ SerializablePairLongString serializablePair;
+
+ if (stringSizeBytes > 0) {
+ byte[] valueBytes = new byte[stringSizeBytes];
+ mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+ mutationBuffer.get(valueBytes, 0, stringSizeBytes);
+ serializablePair = new SerializablePairLongString(timeValue,
StringUtils.fromUtf8(valueBytes));
+ } else {
+ serializablePair = new SerializablePairLongString(timeValue, null);
+ }
+
+ return serializablePair;
+ }
+
+ @Override
+ public float getFloat(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("StringFirstAggregator does not
support getFloat()");
+ }
+
+ @Override
+ public long getLong(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("StringFirstAggregator does not
support getLong()");
+ }
+
+ @Override
+ public double getDouble(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("StringFirstAggregator does not
support getDouble()");
+ }
+
+ @Override
+ public void close()
+ {
+ // no resources to cleanup
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ inspector.visit("timeSelector", timeSelector);
+ inspector.visit("valueSelector", valueSelector);
+ }
+}
diff --git
a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java
new file mode 100644
index 0000000..b268baf
--- /dev/null
+++
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.druid.java.util.common.StringUtils;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+import io.druid.segment.ColumnSelectorFactory;
+
+import java.nio.ByteBuffer;
+
+@JsonTypeName("stringFirstFold")
+public class StringFirstFoldingAggregatorFactory extends
StringFirstAggregatorFactory
+{
+ public StringFirstFoldingAggregatorFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("fieldName") final String fieldName,
+ @JsonProperty("maxStringBytes") Integer maxStringBytes
+ )
+ {
+ super(name, fieldName, maxStringBytes);
+ }
+
+ @Override
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
+ {
+ final BaseObjectColumnValueSelector selector =
metricFactory.makeColumnValueSelector(getName());
+ return new StringFirstAggregator(null, null, maxStringBytes)
+ {
+ @Override
+ public void aggregate()
+ {
+ SerializablePairLongString pair = (SerializablePairLongString)
selector.getObject();
+ if (pair != null && pair.lhs < firstTime) {
+ firstTime = pair.lhs;
+ firstValue = pair.rhs;
+ }
+ }
+ };
+ }
+
+ @Override
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
+ {
+ final BaseObjectColumnValueSelector selector =
metricFactory.makeColumnValueSelector(getName());
+ return new StringFirstBufferAggregator(null, null, maxStringBytes)
+ {
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ SerializablePairLongString pair = (SerializablePairLongString)
selector.getObject();
+
+ if (pair != null && pair.lhs != null) {
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+
+ long lastTime = mutationBuffer.getLong(position);
+
+ if (pair.lhs < lastTime) {
+ mutationBuffer.putLong(position, pair.lhs);
+
+ if (pair.rhs != null) {
+ byte[] valueBytes = StringUtils.toUtf8(pair.rhs);
+
+ mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
+ mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+ mutationBuffer.put(valueBytes);
+ } else {
+ mutationBuffer.putInt(position + Long.BYTES, 0);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ inspector.visit("selector", selector);
+ }
+ };
+ }
+
+}
diff --git
a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
index ff33195..dc186a9 100644
---
a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
+++
b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
@@ -22,8 +22,8 @@ package io.druid.query.aggregation.last;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import io.druid.java.util.common.StringUtils;
import io.druid.collections.SerializablePair;
+import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.UOE;
import io.druid.query.aggregation.AggregateCombiner;
import io.druid.query.aggregation.Aggregator;
diff --git
a/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregateCombiner.java
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregateCombiner.java
new file mode 100644
index 0000000..6625f08
--- /dev/null
+++
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregateCombiner.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import io.druid.query.aggregation.ObjectAggregateCombiner;
+import io.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+public class StringLastAggregateCombiner extends
ObjectAggregateCombiner<String>
+{
+ private String lastString;
+
+ @Override
+ public void reset(ColumnValueSelector selector)
+ {
+ lastString = (String) selector.getObject();
+ }
+
+ @Override
+ public void fold(ColumnValueSelector selector)
+ {
+ lastString = (String) selector.getObject();
+ }
+
+ @Nullable
+ @Override
+ public String getObject()
+ {
+ return lastString;
+ }
+
+ @Override
+ public Class<String> classOfObject()
+ {
+ return String.class;
+ }
+}
diff --git
a/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregator.java
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregator.java
new file mode 100644
index 0000000..85cd0dd
--- /dev/null
+++
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import io.druid.java.util.common.ISE;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.segment.BaseLongColumnValueSelector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+
+public class StringLastAggregator implements Aggregator
+{
+
+ private final BaseObjectColumnValueSelector valueSelector;
+ private final BaseLongColumnValueSelector timeSelector;
+ private final int maxStringBytes;
+
+ protected long lastTime;
+ protected String lastValue;
+
+ public StringLastAggregator(
+ BaseLongColumnValueSelector timeSelector,
+ BaseObjectColumnValueSelector valueSelector,
+ int maxStringBytes
+ )
+ {
+ this.valueSelector = valueSelector;
+ this.timeSelector = timeSelector;
+ this.maxStringBytes = maxStringBytes;
+
+ lastTime = Long.MIN_VALUE;
+ lastValue = null;
+ }
+
+ @Override
+ public void aggregate()
+ {
+ long time = timeSelector.getLong();
+ if (time >= lastTime) {
+ lastTime = time;
+ Object value = valueSelector.getObject();
+
+ if (value != null) {
+ if (value instanceof String) {
+ lastValue = (String) value;
+ } else if (value instanceof SerializablePairLongString) {
+ lastValue = ((SerializablePairLongString) value).rhs;
+ } else {
+ throw new ISE(
+ "Try to aggregate unsuported class type [%s].Supported class
types: String or SerializablePairLongString",
+ value.getClass().getCanonicalName()
+ );
+ }
+
+ if (lastValue != null && lastValue.length() > maxStringBytes) {
+ lastValue = lastValue.substring(0, maxStringBytes);
+ }
+ } else {
+ lastValue = null;
+ }
+ }
+ }
+
+ @Override
+ public Object get()
+ {
+ return new SerializablePairLongString(lastTime, lastValue);
+ }
+
+ @Override
+ public float getFloat()
+ {
+ throw new UnsupportedOperationException("StringFirstAggregator does not
support getFloat()");
+ }
+
+ @Override
+ public long getLong()
+ {
+ throw new UnsupportedOperationException("StringFirstAggregator does not
support getLong()");
+ }
+
+ @Override
+ public double getDouble()
+ {
+ throw new UnsupportedOperationException("StringFirstAggregator does not
support getDouble()");
+ }
+
+ @Override
+ public void close()
+ {
+ // no resources to cleanup
+ }
+}
diff --git
a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregatorFactory.java
similarity index 52%
copy from
processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
copy to
processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregatorFactory.java
index ff33195..cb4f363 100644
---
a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
+++
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregatorFactory.java
@@ -21,23 +21,19 @@ package io.druid.query.aggregation.last;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
-import io.druid.java.util.common.StringUtils;
-import io.druid.collections.SerializablePair;
-import io.druid.java.util.common.UOE;
import io.druid.query.aggregation.AggregateCombiner;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
-import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
-import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
-import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
-import io.druid.segment.BaseObjectColumnValueSelector;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.aggregation.first.StringFirstAggregatorFactory;
+import io.druid.query.cache.CacheKeyBuilder;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.column.Column;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -45,126 +41,91 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class LongLastAggregatorFactory extends AggregatorFactory
+
+@JsonTypeName("stringLast")
+public class StringLastAggregatorFactory extends AggregatorFactory
{
private final String fieldName;
private final String name;
+ protected final int maxStringBytes;
@JsonCreator
- public LongLastAggregatorFactory(
+ public StringLastAggregatorFactory(
@JsonProperty("name") String name,
- @JsonProperty("fieldName") final String fieldName
+ @JsonProperty("fieldName") final String fieldName,
+ @JsonProperty("maxStringBytes") Integer maxStringBytes
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator
name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null
fieldName");
this.name = name;
this.fieldName = fieldName;
+ this.maxStringBytes = maxStringBytes == null
+ ?
StringFirstAggregatorFactory.DEFAULT_MAX_STRING_SIZE
+ : maxStringBytes;
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
- return new LongLastAggregator(
+ return new StringLastAggregator(
metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
- metricFactory.makeColumnValueSelector(fieldName)
+ metricFactory.makeColumnValueSelector(fieldName),
+ maxStringBytes
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
- return new LongLastBufferAggregator(
+ return new StringLastBufferAggregator(
metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
- metricFactory.makeColumnValueSelector(fieldName)
+ metricFactory.makeColumnValueSelector(fieldName),
+ maxStringBytes
);
}
@Override
public Comparator getComparator()
{
- return LongFirstAggregatorFactory.VALUE_COMPARATOR;
+ return StringFirstAggregatorFactory.VALUE_COMPARATOR;
}
@Override
public Object combine(Object lhs, Object rhs)
{
- return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0
? lhs : rhs;
+ return StringFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0
? lhs : rhs;
}
@Override
public AggregateCombiner makeAggregateCombiner()
{
- throw new UOE("LongLastAggregatorFactory is not supported during ingestion
for rollup");
+ return new StringLastAggregateCombiner();
}
@Override
public AggregatorFactory getCombiningFactory()
{
- return new LongLastAggregatorFactory(name, name)
- {
- @Override
- public Aggregator factorize(ColumnSelectorFactory metricFactory)
- {
- final BaseObjectColumnValueSelector selector =
metricFactory.makeColumnValueSelector(name);
- return new LongLastAggregator(null, null)
- {
- @Override
- public void aggregate()
- {
- SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>)
selector.getObject();
- if (pair.lhs >= lastTime) {
- lastTime = pair.lhs;
- lastValue = pair.rhs;
- }
- }
- };
- }
-
- @Override
- public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
- {
- final BaseObjectColumnValueSelector selector =
metricFactory.makeColumnValueSelector(name);
- return new LongLastBufferAggregator(null, null)
- {
- @Override
- public void aggregate(ByteBuffer buf, int position)
- {
- SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>)
selector.getObject();
- long lastTime = buf.getLong(position);
- if (pair.lhs >= lastTime) {
- buf.putLong(position, pair.lhs);
- buf.putLong(position + Long.BYTES, pair.rhs);
- }
- }
-
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("selector", selector);
- }
- };
- }
- };
+ return new StringLastFoldingAggregatorFactory(name, name, maxStringBytes);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
- return Collections.singletonList(new LongLastAggregatorFactory(fieldName,
fieldName));
+ return Collections.singletonList(new
StringLastAggregatorFactory(fieldName, fieldName, maxStringBytes));
}
@Override
public Object deserialize(Object object)
{
Map map = (Map) object;
- return new SerializablePair<>(((Number) map.get("lhs")).longValue(),
((Number) map.get("rhs")).longValue());
+ return new SerializablePairLongString(((Number)
map.get("lhs")).longValue(), ((String) map.get("rhs")));
}
@Override
public Object finalizeComputation(Object object)
{
- return ((SerializablePair<Long, Long>) object).rhs;
+ return ((SerializablePairLongString) object).rhs;
}
@Override
@@ -180,6 +141,12 @@ public class LongLastAggregatorFactory extends
AggregatorFactory
return fieldName;
}
+ @JsonProperty
+ public Integer getMaxStringBytes()
+ {
+ return maxStringBytes;
+ }
+
@Override
public List<String> requiredFields()
{
@@ -189,24 +156,22 @@ public class LongLastAggregatorFactory extends
AggregatorFactory
@Override
public byte[] getCacheKey()
{
- byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
-
- return ByteBuffer.allocate(1 + fieldNameBytes.length)
- .put(AggregatorUtil.LONG_LAST_CACHE_TYPE_ID)
- .put(fieldNameBytes)
- .array();
+ return new CacheKeyBuilder(AggregatorUtil.STRING_LAST_CACHE_TYPE_ID)
+ .appendString(fieldName)
+ .appendInt(maxStringBytes)
+ .build();
}
@Override
public String getTypeName()
{
- return "long";
+ return "serializablePairLongString";
}
@Override
public int getMaxIntermediateSize()
{
- return Long.BYTES * 2;
+ return Long.BYTES + Integer.BYTES + maxStringBytes;
}
@Override
@@ -219,23 +184,24 @@ public class LongLastAggregatorFactory extends
AggregatorFactory
return false;
}
- LongLastAggregatorFactory that = (LongLastAggregatorFactory) o;
+ StringLastAggregatorFactory that = (StringLastAggregatorFactory) o;
- return name.equals(that.name) && fieldName.equals(that.fieldName);
+ return fieldName.equals(that.fieldName) && name.equals(that.name) &&
maxStringBytes == that.maxStringBytes;
}
@Override
public int hashCode()
{
- return Objects.hash(name, fieldName);
+ return Objects.hash(name, fieldName, maxStringBytes);
}
@Override
public String toString()
{
- return "LongLastAggregatorFactory{" +
+ return "StringFirstAggregatorFactory{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
+ ", maxStringBytes=" + maxStringBytes + '\'' +
'}';
}
}
diff --git
a/processing/src/main/java/io/druid/query/aggregation/last/StringLastBufferAggregator.java
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastBufferAggregator.java
new file mode 100644
index 0000000..12c9948
--- /dev/null
+++
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastBufferAggregator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import io.druid.segment.BaseLongColumnValueSelector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+
+import java.nio.ByteBuffer;
+
+public class StringLastBufferAggregator implements BufferAggregator
+{
+ private final BaseLongColumnValueSelector timeSelector;
+ private final BaseObjectColumnValueSelector valueSelector;
+ private final int maxStringBytes;
+
+ public StringLastBufferAggregator(
+ BaseLongColumnValueSelector timeSelector,
+ BaseObjectColumnValueSelector valueSelector,
+ int maxStringBytes
+ )
+ {
+ this.timeSelector = timeSelector;
+ this.valueSelector = valueSelector;
+ this.maxStringBytes = maxStringBytes;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ buf.putLong(position, Long.MIN_VALUE);
+ buf.putInt(position + Long.BYTES, 0);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+
+ Object value = valueSelector.getObject();
+
+ long time = timeSelector.getLong();
+ String lastString = null;
+
+ if (value != null) {
+ if (value instanceof SerializablePairLongString) {
+ SerializablePairLongString serializablePair =
(SerializablePairLongString) value;
+ time = serializablePair.lhs;
+ lastString = serializablePair.rhs;
+ } else if (value instanceof String) {
+ lastString = (String) value;
+ } else {
+ throw new ISE(
+ "Try to aggregate unsuported class type [%s].Supported class
types: String or SerializablePairLongString",
+ value.getClass().getCanonicalName()
+ );
+ }
+ }
+
+ long lastTime = mutationBuffer.getLong(position);
+
+ if (time >= lastTime) {
+ if (lastString != null) {
+ if (lastString.length() > maxStringBytes) {
+ lastString = lastString.substring(0, maxStringBytes);
+ }
+
+ byte[] valueBytes = StringUtils.toUtf8(lastString);
+
+ mutationBuffer.putLong(position, time);
+ mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
+
+ mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+ mutationBuffer.put(valueBytes);
+ } else {
+ mutationBuffer.putLong(position, time);
+ mutationBuffer.putInt(position + Long.BYTES, 0);
+ }
+ }
+ }
+
+ @Override
+ public Object get(ByteBuffer buf, int position)
+ {
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+
+ Long timeValue = mutationBuffer.getLong(position);
+ Integer stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES);
+
+ SerializablePairLongString serializablePair;
+
+ if (stringSizeBytes > 0) {
+ byte[] valueBytes = new byte[stringSizeBytes];
+ mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+ mutationBuffer.get(valueBytes, 0, stringSizeBytes);
+ serializablePair = new SerializablePairLongString(timeValue,
StringUtils.fromUtf8(valueBytes));
+ } else {
+ serializablePair = new SerializablePairLongString(timeValue, null);
+ }
+
+ return serializablePair;
+ }
+
+ @Override
+ public float getFloat(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("StringFirstAggregator does not
support getFloat()");
+ }
+
+ @Override
+ public long getLong(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("StringFirstAggregator does not
support getLong()");
+ }
+
+ @Override
+ public double getDouble(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("StringFirstAggregator does not
support getDouble()");
+ }
+
+ @Override
+ public void close()
+ {
+ // no resources to cleanup
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ inspector.visit("timeSelector", timeSelector);
+ inspector.visit("valueSelector", valueSelector);
+ }
+}
diff --git
a/processing/src/main/java/io/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java
new file mode 100644
index 0000000..9bd6a64
--- /dev/null
+++
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.druid.java.util.common.StringUtils;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+import io.druid.segment.ColumnSelectorFactory;
+
+import java.nio.ByteBuffer;
+
+@JsonTypeName("stringLastFold")
+public class StringLastFoldingAggregatorFactory extends
StringLastAggregatorFactory
+{
+ public StringLastFoldingAggregatorFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("fieldName") final String fieldName,
+ @JsonProperty("maxStringBytes") Integer maxStringBytes
+ )
+ {
+ super(name, fieldName, maxStringBytes);
+ }
+
+ @Override
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
+ {
+ final BaseObjectColumnValueSelector selector =
metricFactory.makeColumnValueSelector(getName());
+ return new StringLastAggregator(null, null, maxStringBytes)
+ {
+ @Override
+ public void aggregate()
+ {
+ SerializablePairLongString pair = (SerializablePairLongString)
selector.getObject();
+ if (pair != null && pair.lhs >= lastTime) {
+ lastTime = pair.lhs;
+ lastValue = pair.rhs;
+ }
+ }
+ };
+ }
+
+ @Override
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
+ {
+ final BaseObjectColumnValueSelector selector =
metricFactory.makeColumnValueSelector(getName());
+ return new StringLastBufferAggregator(null, null, maxStringBytes)
+ {
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ SerializablePairLongString pair = (SerializablePairLongString)
selector.getObject();
+ if (pair != null && pair.lhs != null) {
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+
+ long lastTime = mutationBuffer.getLong(position);
+
+ if (pair.lhs >= lastTime) {
+ mutationBuffer.putLong(position, pair.lhs);
+ if (pair.rhs != null) {
+ byte[] valueBytes = StringUtils.toUtf8(pair.rhs);
+
+ mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
+ mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+ mutationBuffer.put(valueBytes);
+ } else {
+ mutationBuffer.putInt(position + Long.BYTES, 0);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ inspector.visit("selector", selector);
+ }
+ };
+ }
+}
diff --git
a/processing/src/test/java/io/druid/query/aggregation/first/StringFirstAggregationTest.java
b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstAggregationTest.java
new file mode 100644
index 0000000..8f523c0
--- /dev/null
+++
b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstAggregationTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import io.druid.java.util.common.Pair;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.aggregation.TestLongColumnSelector;
+import io.druid.query.aggregation.TestObjectColumnSelector;
+import io.druid.segment.ColumnSelectorFactory;
+import io.druid.segment.column.Column;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class StringFirstAggregationTest
+{
+ private final Integer MAX_STRING_SIZE = 1024;
+ private StringFirstAggregatorFactory stringLastAggFactory;
+ private StringFirstAggregatorFactory combiningAggFactory;
+ private ColumnSelectorFactory colSelectorFactory;
+ private TestLongColumnSelector timeSelector;
+ private TestObjectColumnSelector<String> valueSelector;
+ private TestObjectColumnSelector objectSelector;
+
+ private String[] strings = {"1111", "2222", "3333", null, "4444"};
+ private long[] times = {8224, 6879, 2436, 3546, 7888};
+ private SerializablePairLongString[] pairs = {
+ new SerializablePairLongString(52782L, "AAAA"),
+ new SerializablePairLongString(65492L, "BBBB"),
+ new SerializablePairLongString(69134L, "CCCC"),
+ new SerializablePairLongString(11111L, "DDDD"),
+ new SerializablePairLongString(51223L, null)
+ };
+
+ @Before
+ public void setup()
+ {
+ stringLastAggFactory = new StringFirstAggregatorFactory("billy", "nilly",
MAX_STRING_SIZE);
+ combiningAggFactory = (StringFirstAggregatorFactory)
stringLastAggFactory.getCombiningFactory();
+ timeSelector = new TestLongColumnSelector(times);
+ valueSelector = new TestObjectColumnSelector<>(strings);
+ objectSelector = new TestObjectColumnSelector<>(pairs);
+ colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
+
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector);
+
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
+
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
+ EasyMock.replay(colSelectorFactory);
+ }
+
+ @Test
+ public void testStringLastAggregator()
+ {
+ StringFirstAggregator agg = (StringFirstAggregator)
stringLastAggFactory.factorize(colSelectorFactory);
+
+ aggregate(agg);
+ aggregate(agg);
+ aggregate(agg);
+ aggregate(agg);
+
+ Pair<Long, String> result = (Pair<Long, String>) agg.get();
+
+ Assert.assertEquals(strings[2], result.rhs);
+ }
+
+ @Test
+ public void testStringLastBufferAggregator()
+ {
+ StringFirstBufferAggregator agg = (StringFirstBufferAggregator)
stringLastAggFactory.factorizeBuffered(
+ colSelectorFactory);
+
+ ByteBuffer buffer = ByteBuffer.wrap(new
byte[stringLastAggFactory.getMaxIntermediateSize()]);
+ agg.init(buffer, 0);
+
+ aggregate(agg, buffer, 0);
+ aggregate(agg, buffer, 0);
+ aggregate(agg, buffer, 0);
+ aggregate(agg, buffer, 0);
+
+ Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
+
+ Assert.assertEquals(strings[2], result.rhs);
+ }
+
+ @Test
+ public void testCombine()
+ {
+ SerializablePairLongString pair1 = new
SerializablePairLongString(1467225000L, "AAAA");
+ SerializablePairLongString pair2 = new
SerializablePairLongString(1467240000L, "BBBB");
+ Assert.assertEquals(pair2, stringLastAggFactory.combine(pair1, pair2));
+ }
+
+ @Test
+ public void testStringLastCombiningAggregator()
+ {
+ StringFirstAggregator agg = (StringFirstAggregator)
combiningAggFactory.factorize(colSelectorFactory);
+
+ aggregate(agg);
+ aggregate(agg);
+ aggregate(agg);
+ aggregate(agg);
+
+ Pair<Long, String> result = (Pair<Long, String>) agg.get();
+ Pair<Long, String> expected = pairs[3];
+
+ Assert.assertEquals(expected.lhs, result.lhs);
+ Assert.assertEquals(expected.rhs, result.rhs);
+ }
+
+ @Test
+ public void testStringFirstCombiningBufferAggregator()
+ {
+ StringFirstBufferAggregator agg = (StringFirstBufferAggregator)
combiningAggFactory.factorizeBuffered(
+ colSelectorFactory);
+
+ ByteBuffer buffer = ByteBuffer.wrap(new
byte[stringLastAggFactory.getMaxIntermediateSize()]);
+ agg.init(buffer, 0);
+
+ aggregate(agg, buffer, 0);
+ aggregate(agg, buffer, 0);
+ aggregate(agg, buffer, 0);
+ aggregate(agg, buffer, 0);
+
+ Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
+ Pair<Long, String> expected = pairs[3];
+
+ Assert.assertEquals(expected.lhs, result.lhs);
+ Assert.assertEquals(expected.rhs, result.rhs);
+ }
+
+ @Test
+ public void testStringFirstAggregateCombiner()
+ {
+ final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+ TestObjectColumnSelector columnSelector = new
TestObjectColumnSelector<>(strings);
+
+ StringFirstAggregateCombiner stringFirstAggregateCombiner =
+ (StringFirstAggregateCombiner)
combiningAggFactory.makeAggregateCombiner();
+
+ stringFirstAggregateCombiner.reset(columnSelector);
+
+ Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
+
+ columnSelector.increment();
+ stringFirstAggregateCombiner.fold(columnSelector);
+
+ Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
+
+ stringFirstAggregateCombiner.reset(columnSelector);
+
+ Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
+ }
+
+ private void aggregate(
+ StringFirstAggregator agg
+ )
+ {
+ agg.aggregate();
+ timeSelector.increment();
+ valueSelector.increment();
+ objectSelector.increment();
+ }
+
+ private void aggregate(
+ StringFirstBufferAggregator agg,
+ ByteBuffer buff,
+ int position
+ )
+ {
+ agg.aggregate(buff, position);
+ timeSelector.increment();
+ valueSelector.increment();
+ objectSelector.increment();
+ }
+}
diff --git
a/processing/src/test/java/io/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
new file mode 100644
index 0000000..8a4a0de
--- /dev/null
+++
b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.aggregation.TestLongColumnSelector;
+import io.druid.query.aggregation.TestObjectColumnSelector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class StringFirstBufferAggregatorTest
+{
+ private void aggregateBuffer(
+ TestLongColumnSelector timeSelector,
+ TestObjectColumnSelector valueSelector,
+ BufferAggregator agg,
+ ByteBuffer buf,
+ int position
+ )
+ {
+ agg.aggregate(buf, position);
+ timeSelector.increment();
+ valueSelector.increment();
+ }
+
+ @Test
+ public void testBufferAggregate() throws Exception
+ {
+
+ final long[] timestamps = {1526724600L, 1526724700L, 1526724800L,
1526725900L, 1526725000L};
+ final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+ Integer maxStringBytes = 1024;
+
+ TestLongColumnSelector longColumnSelector = new
TestLongColumnSelector(timestamps);
+ TestObjectColumnSelector<String> objectColumnSelector = new
TestObjectColumnSelector<>(strings);
+
+ StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
+ "billy", "billy", maxStringBytes
+ );
+
+ StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
+ longColumnSelector,
+ objectColumnSelector,
+ maxStringBytes
+ );
+
+ String testString = "ZZZZ";
+
+ ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+ buf.putLong(1526728500L);
+ buf.putInt(testString.length());
+ buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+ int position = 0;
+
+ agg.init(buf, position);
+ //noinspection ForLoopReplaceableByForEach
+ for (int i = 0; i < timestamps.length; i++) {
+ aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf,
position);
+ }
+
+ SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf,
position));
+
+
+ Assert.assertEquals("expectec last string value", strings[0], sp.rhs);
+ Assert.assertEquals("last string timestamp is the biggest", new
Long(timestamps[0]), new Long(sp.lhs));
+
+ }
+
+ @Test
+ public void testNullBufferAggregate() throws Exception
+ {
+
+ final long[] timestamps = {2222L, 1111L, 3333L, 4444L, 5555L};
+ final String[] strings = {null, "AAAA", "BBBB", "DDDD", "EEEE"};
+ Integer maxStringBytes = 1024;
+
+ TestLongColumnSelector longColumnSelector = new
TestLongColumnSelector(timestamps);
+ TestObjectColumnSelector<String> objectColumnSelector = new
TestObjectColumnSelector<>(strings);
+
+ StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
+ "billy", "billy", maxStringBytes
+ );
+
+ StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
+ longColumnSelector,
+ objectColumnSelector,
+ maxStringBytes
+ );
+
+ String testString = "ZZZZ";
+
+ ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+ buf.putLong(1526728500L);
+ buf.putInt(testString.length());
+ buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+ int position = 0;
+
+ agg.init(buf, position);
+ //noinspection ForLoopReplaceableByForEach
+ for (int i = 0; i < timestamps.length; i++) {
+ aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf,
position);
+ }
+
+ SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf,
position));
+
+
+ Assert.assertEquals("expectec last string value", strings[1], sp.rhs);
+ Assert.assertEquals("last string timestamp is the biggest", new
Long(timestamps[1]), new Long(sp.lhs));
+
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testNoStringValue()
+ {
+
+ final long[] timestamps = {1526724000L, 1526724600L};
+ final Double[] doubles = {null, 2.00};
+ Integer maxStringBytes = 1024;
+
+ TestLongColumnSelector longColumnSelector = new
TestLongColumnSelector(timestamps);
+ TestObjectColumnSelector<Double> objectColumnSelector = new
TestObjectColumnSelector<>(doubles);
+
+ StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
+ "billy", "billy", maxStringBytes
+ );
+
+ StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
+ longColumnSelector,
+ objectColumnSelector,
+ maxStringBytes
+ );
+
+ String testString = "ZZZZ";
+
+ ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+ buf.putLong(1526728500L);
+ buf.putInt(testString.length());
+ buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+ int position = 0;
+
+ agg.init(buf, position);
+ //noinspection ForLoopReplaceableByForEach
+ for (int i = 0; i < timestamps.length; i++) {
+ aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf,
position);
+ }
+ }
+}
diff --git
a/processing/src/test/java/io/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
new file mode 100644
index 0000000..bac9a6d
--- /dev/null
+++
b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.druid.data.input.MapBasedInputRow;
+import io.druid.java.util.common.DateTimes;
+import io.druid.java.util.common.granularity.Granularities;
+import io.druid.query.Druids;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.Result;
+import io.druid.query.aggregation.CountAggregatorFactory;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.timeseries.TimeseriesQueryEngine;
+import io.druid.query.timeseries.TimeseriesResultValue;
+import io.druid.segment.TestHelper;
+import io.druid.segment.incremental.IncrementalIndex;
+import io.druid.segment.incremental.IncrementalIndexSchema;
+import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class StringFirstTimeseriesQueryTest
+{
+
+ @Test
+ public void testTopNWithDistinctCountAgg() throws Exception
+ {
+ TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
+
+ String visitor_id = "visitor_id";
+ String client_type = "client_type";
+
+ IncrementalIndex index = new IncrementalIndex.Builder()
+ .setIndexSchema(
+ new IncrementalIndexSchema.Builder()
+ .withQueryGranularity(Granularities.SECOND)
+ .withMetrics(new CountAggregatorFactory("cnt"))
+ .withMetrics(new StringFirstAggregatorFactory(
+ "last_client_type", "client_type", 1024)
+ )
+ .build()
+ )
+ .setMaxRowCount(1000)
+ .buildOnheap();
+
+
+ DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z");
+ long timestamp = time.getMillis();
+
+ DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z");
+ long timestamp1 = time1.getMillis();
+ index.add(
+ new MapBasedInputRow(
+ timestamp,
+ Lists.newArrayList(visitor_id, client_type),
+ ImmutableMap.<String, Object>of(visitor_id, "0", client_type,
"iphone")
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ timestamp,
+ Lists.newArrayList(visitor_id, client_type),
+ ImmutableMap.<String, Object>of(visitor_id, "1", client_type,
"iphone")
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ timestamp1,
+ Lists.newArrayList(visitor_id, client_type),
+ ImmutableMap.<String, Object>of(visitor_id, "0", client_type,
"android")
+ )
+ );
+
+ TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+ .dataSource(QueryRunnerTestHelper.dataSource)
+ .granularity(QueryRunnerTestHelper.allGran)
+
.intervals(QueryRunnerTestHelper.fullOnInterval)
+ .aggregators(
+ Lists.newArrayList(
+ new StringFirstAggregatorFactory(
+ "last_client_type", client_type,
1024
+ )
+ )
+ )
+ .build();
+
+ final Iterable<Result<TimeseriesResultValue>> results =
+ engine.process(query, new
IncrementalIndexStorageAdapter(index)).toList();
+
+ List<Result<TimeseriesResultValue>> expectedResults =
Collections.singletonList(
+ new Result<>(
+ time,
+ new TimeseriesResultValue(
+ ImmutableMap.<String, Object>of("last_client_type", new
SerializablePairLongString(timestamp, "iphone"))
+ )
+ )
+ );
+ TestHelper.assertExpectedResults(expectedResults, results);
+ }
+}
diff --git
a/processing/src/test/java/io/druid/query/aggregation/last/StringLastAggregationTest.java
b/processing/src/test/java/io/druid/query/aggregation/last/StringLastAggregationTest.java
new file mode 100644
index 0000000..1f2ecc4
--- /dev/null
+++
b/processing/src/test/java/io/druid/query/aggregation/last/StringLastAggregationTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import io.druid.java.util.common.Pair;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.aggregation.TestLongColumnSelector;
+import io.druid.query.aggregation.TestObjectColumnSelector;
+import io.druid.segment.ColumnSelectorFactory;
+import io.druid.segment.column.Column;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class StringLastAggregationTest
+{
+ private final Integer MAX_STRING_SIZE = 1024;
+ private StringLastAggregatorFactory stringLastAggFactory;
+ private StringLastAggregatorFactory combiningAggFactory;
+ private ColumnSelectorFactory colSelectorFactory;
+ private TestLongColumnSelector timeSelector;
+ private TestObjectColumnSelector<String> valueSelector;
+ private TestObjectColumnSelector objectSelector;
+
+ private String[] strings = {"1111", "2222", "3333", null, "4444"};
+ private long[] times = {8224, 6879, 2436, 3546, 7888};
+ private SerializablePairLongString[] pairs = {
+ new SerializablePairLongString(52782L, "AAAA"),
+ new SerializablePairLongString(65492L, "BBBB"),
+ new SerializablePairLongString(69134L, "CCCC"),
+ new SerializablePairLongString(11111L, "DDDD"),
+ new SerializablePairLongString(51223L, null)
+ };
+
+ @Before
+ public void setup()
+ {
+ stringLastAggFactory = new StringLastAggregatorFactory("billy", "nilly",
MAX_STRING_SIZE);
+ combiningAggFactory = (StringLastAggregatorFactory)
stringLastAggFactory.getCombiningFactory();
+ timeSelector = new TestLongColumnSelector(times);
+ valueSelector = new TestObjectColumnSelector<>(strings);
+ objectSelector = new TestObjectColumnSelector<>(pairs);
+ colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
+
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector);
+
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
+
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
+ EasyMock.replay(colSelectorFactory);
+ }
+
+ @Test
+ public void testStringLastAggregator()
+ {
+ StringLastAggregator agg = (StringLastAggregator)
stringLastAggFactory.factorize(colSelectorFactory);
+
+ aggregate(agg);
+ aggregate(agg);
+ aggregate(agg);
+ aggregate(agg);
+
+ Pair<Long, String> result = (Pair<Long, String>) agg.get();
+
+ Assert.assertEquals(strings[0], result.rhs);
+ }
+
+ @Test
+ public void testStringLastBufferAggregator()
+ {
+ StringLastBufferAggregator agg = (StringLastBufferAggregator)
stringLastAggFactory.factorizeBuffered(
+ colSelectorFactory);
+
+ ByteBuffer buffer = ByteBuffer.wrap(new
byte[stringLastAggFactory.getMaxIntermediateSize()]);
+ agg.init(buffer, 0);
+
+ aggregate(agg, buffer, 0);
+ aggregate(agg, buffer, 0);
+ aggregate(agg, buffer, 0);
+ aggregate(agg, buffer, 0);
+
+ Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
+
+ Assert.assertEquals(strings[0], result.rhs);
+ }
+
+ @Test
+ public void testCombine()
+ {
+ SerializablePairLongString pair1 = new
SerializablePairLongString(1467225000L, "AAAA");
+ SerializablePairLongString pair2 = new
SerializablePairLongString(1467240000L, "BBBB");
+ Assert.assertEquals(pair2, stringLastAggFactory.combine(pair1, pair2));
+ }
+
+ @Test
+ public void testStringLastCombiningAggregator()
+ {
+ StringLastAggregator agg = (StringLastAggregator)
combiningAggFactory.factorize(colSelectorFactory);
+
+ aggregate(agg);
+ aggregate(agg);
+ aggregate(agg);
+ aggregate(agg);
+
+ Pair<Long, String> result = (Pair<Long, String>) agg.get();
+ Pair<Long, String> expected = (Pair<Long, String>) pairs[2];
+
+ Assert.assertEquals(expected.lhs, result.lhs);
+ Assert.assertEquals(expected.rhs, result.rhs);
+ }
+
+ @Test
+ public void testStringLastCombiningBufferAggregator()
+ {
+ StringLastBufferAggregator agg = (StringLastBufferAggregator)
combiningAggFactory.factorizeBuffered(
+ colSelectorFactory);
+
+ ByteBuffer buffer = ByteBuffer.wrap(new
byte[stringLastAggFactory.getMaxIntermediateSize()]);
+ agg.init(buffer, 0);
+
+ aggregate(agg, buffer, 0);
+ aggregate(agg, buffer, 0);
+ aggregate(agg, buffer, 0);
+ aggregate(agg, buffer, 0);
+
+ Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
+ Pair<Long, String> expected = (Pair<Long, String>) pairs[2];
+
+ Assert.assertEquals(expected.lhs, result.lhs);
+ Assert.assertEquals(expected.rhs, result.rhs);
+ }
+
+ @Test
+ public void testStringLastAggregateCombiner()
+ {
+ final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+ TestObjectColumnSelector columnSelector = new
TestObjectColumnSelector<>(strings);
+
+ StringLastAggregateCombiner stringFirstAggregateCombiner =
+ (StringLastAggregateCombiner)
combiningAggFactory.makeAggregateCombiner();
+
+ stringFirstAggregateCombiner.reset(columnSelector);
+
+ Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
+
+ columnSelector.increment();
+ stringFirstAggregateCombiner.fold(columnSelector);
+
+ Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
+
+ stringFirstAggregateCombiner.reset(columnSelector);
+
+ Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
+ }
+
+ private void aggregate(
+ StringLastAggregator agg
+ )
+ {
+ agg.aggregate();
+ timeSelector.increment();
+ valueSelector.increment();
+ objectSelector.increment();
+ }
+
+ private void aggregate(
+ StringLastBufferAggregator agg,
+ ByteBuffer buff,
+ int position
+ )
+ {
+ agg.aggregate(buff, position);
+ timeSelector.increment();
+ valueSelector.increment();
+ objectSelector.increment();
+ }
+}
diff --git
a/processing/src/test/java/io/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
b/processing/src/test/java/io/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
new file mode 100644
index 0000000..c7c125b
--- /dev/null
+++
b/processing/src/test/java/io/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.aggregation.TestLongColumnSelector;
+import io.druid.query.aggregation.TestObjectColumnSelector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class StringLastBufferAggregatorTest
+{
+ private void aggregateBuffer(
+ TestLongColumnSelector timeSelector,
+ TestObjectColumnSelector valueSelector,
+ BufferAggregator agg,
+ ByteBuffer buf,
+ int position
+ )
+ {
+ agg.aggregate(buf, position);
+ timeSelector.increment();
+ valueSelector.increment();
+ }
+
+ @Test
+ public void testBufferAggregate() throws Exception
+ {
+
+ final long[] timestamps = {1526724600L, 1526724700L, 1526724800L,
1526725900L, 1526725000L};
+ final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+ Integer maxStringBytes = 1024;
+
+ TestLongColumnSelector longColumnSelector = new
TestLongColumnSelector(timestamps);
+ TestObjectColumnSelector<String> objectColumnSelector = new
TestObjectColumnSelector<>(strings);
+
+ StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
+ "billy", "billy", maxStringBytes
+ );
+
+ StringLastBufferAggregator agg = new StringLastBufferAggregator(
+ longColumnSelector,
+ objectColumnSelector,
+ maxStringBytes
+ );
+
+ String testString = "ZZZZ";
+
+ ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+ buf.putLong(1526728500L);
+ buf.putInt(testString.length());
+ buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+ int position = 0;
+
+ agg.init(buf, position);
+ //noinspection ForLoopReplaceableByForEach
+ for (int i = 0; i < timestamps.length; i++) {
+ aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf,
position);
+ }
+
+ SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf,
position));
+
+
+ Assert.assertEquals("expectec last string value", "DDDD", sp.rhs);
+ Assert.assertEquals("last string timestamp is the biggest", new
Long(1526725900L), new Long(sp.lhs));
+
+ }
+
+ @Test
+ public void testNullBufferAggregate() throws Exception
+ {
+
+ final long[] timestamps = {1111L, 2222L, 6666L, 4444L, 5555L};
+ final String[] strings = {"CCCC", "AAAA", "BBBB", null, "EEEE"};
+ Integer maxStringBytes = 1024;
+
+ TestLongColumnSelector longColumnSelector = new
TestLongColumnSelector(timestamps);
+ TestObjectColumnSelector<String> objectColumnSelector = new
TestObjectColumnSelector<>(strings);
+
+ StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
+ "billy", "billy", maxStringBytes
+ );
+
+ StringLastBufferAggregator agg = new StringLastBufferAggregator(
+ longColumnSelector,
+ objectColumnSelector,
+ maxStringBytes
+ );
+
+ String testString = "ZZZZ";
+
+ ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+ buf.putLong(1526728500L);
+ buf.putInt(testString.length());
+ buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+ int position = 0;
+
+ agg.init(buf, position);
+ //noinspection ForLoopReplaceableByForEach
+ for (int i = 0; i < timestamps.length; i++) {
+ aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf,
position);
+ }
+
+ SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf,
position));
+
+
+ Assert.assertEquals("expectec last string value", strings[2], sp.rhs);
+ Assert.assertEquals("last string timestamp is the biggest", new
Long(timestamps[2]), new Long(sp.lhs));
+
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testNoStringValue()
+ {
+
+ final long[] timestamps = {1526724000L, 1526724600L};
+ final Double[] doubles = {null, 2.00};
+ Integer maxStringBytes = 1024;
+
+ TestLongColumnSelector longColumnSelector = new
TestLongColumnSelector(timestamps);
+ TestObjectColumnSelector<Double> objectColumnSelector = new
TestObjectColumnSelector<>(doubles);
+
+ StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
+ "billy", "billy", maxStringBytes
+ );
+
+ StringLastBufferAggregator agg = new StringLastBufferAggregator(
+ longColumnSelector,
+ objectColumnSelector,
+ maxStringBytes
+ );
+
+ String testString = "ZZZZ";
+
+ ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+ buf.putLong(1526728500L);
+ buf.putInt(testString.length());
+ buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+ int position = 0;
+
+ agg.init(buf, position);
+ //noinspection ForLoopReplaceableByForEach
+ for (int i = 0; i < timestamps.length; i++) {
+ aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf,
position);
+ }
+ }
+}
diff --git
a/processing/src/test/java/io/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
b/processing/src/test/java/io/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
new file mode 100644
index 0000000..a68798e
--- /dev/null
+++
b/processing/src/test/java/io/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.druid.data.input.MapBasedInputRow;
+import io.druid.java.util.common.DateTimes;
+import io.druid.java.util.common.granularity.Granularities;
+import io.druid.query.Druids;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.Result;
+import io.druid.query.aggregation.CountAggregatorFactory;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.timeseries.TimeseriesQueryEngine;
+import io.druid.query.timeseries.TimeseriesResultValue;
+import io.druid.segment.TestHelper;
+import io.druid.segment.incremental.IncrementalIndex;
+import io.druid.segment.incremental.IncrementalIndexSchema;
+import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class StringLastTimeseriesQueryTest
+{
+
+ @Test
+ public void testTopNWithDistinctCountAgg() throws Exception
+ {
+ TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
+
+ String visitor_id = "visitor_id";
+ String client_type = "client_type";
+
+ IncrementalIndex index = new IncrementalIndex.Builder()
+ .setIndexSchema(
+ new IncrementalIndexSchema.Builder()
+ .withQueryGranularity(Granularities.SECOND)
+ .withMetrics(new CountAggregatorFactory("cnt"))
+ .withMetrics(new StringLastAggregatorFactory(
+ "last_client_type", "client_type", 1024)
+ )
+ .build()
+ )
+ .setMaxRowCount(1000)
+ .buildOnheap();
+
+
+ DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z");
+ long timestamp = time.getMillis();
+
+ DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z");
+ long timestamp1 = time1.getMillis();
+ index.add(
+ new MapBasedInputRow(
+ timestamp,
+ Lists.newArrayList(visitor_id, client_type),
+ ImmutableMap.<String, Object>of(visitor_id, "0", client_type,
"iphone")
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ timestamp,
+ Lists.newArrayList(visitor_id, client_type),
+ ImmutableMap.<String, Object>of(visitor_id, "1", client_type,
"iphone")
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ timestamp1,
+ Lists.newArrayList(visitor_id, client_type),
+ ImmutableMap.<String, Object>of(visitor_id, "0", client_type,
"android")
+ )
+ );
+
+ TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+ .dataSource(QueryRunnerTestHelper.dataSource)
+ .granularity(QueryRunnerTestHelper.allGran)
+
.intervals(QueryRunnerTestHelper.fullOnInterval)
+ .aggregators(
+ Lists.newArrayList(
+ new StringLastAggregatorFactory(
+ "last_client_type", client_type,
1024
+ )
+ )
+ )
+ .build();
+
+ final Iterable<Result<TimeseriesResultValue>> results =
+ engine.process(query, new
IncrementalIndexStorageAdapter(index)).toList();
+
+ List<Result<TimeseriesResultValue>> expectedResults =
Collections.singletonList(
+ new Result<>(
+ time,
+ new TimeseriesResultValue(
+ ImmutableMap.<String, Object>of(
+ "last_client_type",
+ new SerializablePairLongString(timestamp1, "android")
+ )
+ )
+ )
+ );
+ TestHelper.assertExpectedResults(expectedResults, results);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]