This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch 0.17.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.17.0 by this push:
new 8114861 first/last aggregators and nulls (#9161) (#9233)
8114861 is described below
commit 81148619552e2baeda9ae9a1c5ecfbce51a25a96
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Jan 21 09:01:35 2020 -0800
first/last aggregators and nulls (#9161) (#9233)
* null handling for numeric first/last aggregators, refactor to not extend
nullable numeric agg since they are complex typed aggs
* initially null or not based on config
* review stuff, make string first/last consistent with null handling of
numeric columns, more tests
* docs
* handle nil selectors, revert to primitive first/last types so groupby v1
works...
---
.../apache/druid/collections/SerializablePair.java | 22 ++
docs/querying/aggregations.md | 12 +-
docs/querying/sql.md | 4 +-
.../aggregation/first/DoubleFirstAggregator.java | 31 +-
.../first/DoubleFirstAggregatorFactory.java | 107 ++++--
.../first/DoubleFirstBufferAggregator.java | 46 +--
.../aggregation/first/FloatFirstAggregator.java | 33 +-
.../first/FloatFirstAggregatorFactory.java | 103 ++++--
.../first/FloatFirstBufferAggregator.java | 46 +--
.../aggregation/first/LongFirstAggregator.java | 31 +-
.../first/LongFirstAggregatorFactory.java | 102 ++++--
.../first/LongFirstBufferAggregator.java | 46 +--
...Aggregator.java => NumericFirstAggregator.java} | 61 ++--
...ator.java => NumericFirstBufferAggregator.java} | 78 ++--
.../aggregation/first/StringFirstAggregator.java | 9 +-
.../first/StringFirstAggregatorFactory.java | 62 +++-
.../first/StringFirstBufferAggregator.java | 16 +-
.../aggregation/last/DoubleLastAggregator.java | 31 +-
.../last/DoubleLastAggregatorFactory.java | 107 ++++--
.../last/DoubleLastBufferAggregator.java | 46 +--
.../aggregation/last/FloatLastAggregator.java | 33 +-
.../last/FloatLastAggregatorFactory.java | 99 +++--
.../last/FloatLastBufferAggregator.java | 51 +--
.../query/aggregation/last/LongLastAggregator.java | 38 +-
.../last/LongLastAggregatorFactory.java | 97 +++--
.../aggregation/last/LongLastBufferAggregator.java | 46 +--
...tAggregator.java => NumericLastAggregator.java} | 63 ++--
.../last/NumericLastBufferAggregator.java | 115 ++++++
.../aggregation/last/StringLastAggregator.java | 9 +-
.../last/StringLastAggregatorFactory.java | 62 +++-
.../last/StringLastBufferAggregator.java | 16 +-
.../first/DoubleFirstAggregationTest.java | 29 +-
.../first/FloatFirstAggregationTest.java | 24 +-
.../first/LongFirstAggregationTest.java | 16 +-
.../first/StringFirstBufferAggregatorTest.java | 4 +-
.../first/StringFirstTimeseriesQueryTest.java | 3 +-
.../last/DoubleLastAggregationTest.java | 16 +-
.../aggregation/last/FloatLastAggregationTest.java | 20 +-
.../aggregation/last/LongLastAggregationTest.java | 16 +-
.../apache/druid/sql/calcite/CalciteQueryTest.java | 399 ++++++++++++++++++++-
40 files changed, 1455 insertions(+), 694 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/collections/SerializablePair.java
b/core/src/main/java/org/apache/druid/collections/SerializablePair.java
index 46b3995..22847cb 100644
--- a/core/src/main/java/org/apache/druid/collections/SerializablePair.java
+++ b/core/src/main/java/org/apache/druid/collections/SerializablePair.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.Pair;
import javax.annotation.Nullable;
+import java.util.Comparator;
public class SerializablePair<T1, T2> extends Pair<T1, T2>
{
@@ -45,4 +46,25 @@ public class SerializablePair<T1, T2> extends Pair<T1, T2>
{
return rhs;
}
+
+ public static <T1, T2> Comparator<SerializablePair<T1, T2>>
createNullHandlingComparator(
+ Comparator<T2> delegate,
+ boolean nullsFirst
+ )
+ {
+ final int firstIsNull = nullsFirst ? -1 : 1;
+ final int secondIsNull = nullsFirst ? 1 : -1;
+ return (o1, o2) -> {
+ if (o1 == null || o1.rhs == null) {
+ if (o2 == null || o2.rhs == null) {
+ return 0;
+ }
+ return firstIsNull;
+ }
+ if (o2 == null || o2.rhs == null) {
+ return secondIsNull;
+ }
+ return delegate.compare(o1.rhs, o2.rhs);
+ };
+ }
}
diff --git a/docs/querying/aggregations.md b/docs/querying/aggregations.md
index c1d8d65..688b9ae 100644
--- a/docs/querying/aggregations.md
+++ b/docs/querying/aggregations.md
@@ -136,7 +136,7 @@ Note that queries with first/last aggregators on a segment
created with rollup e
#### `doubleFirst` aggregator
-`doubleFirst` computes the metric value with the minimum timestamp or 0 if no
row exist
+`doubleFirst` computes the metric value with the minimum timestamp or 0 in
default mode or `null` in SQL compatible mode if no row exist
```json
{
@@ -148,7 +148,7 @@ Note that queries with first/last aggregators on a segment
created with rollup e
#### `doubleLast` aggregator
-`doubleLast` computes the metric value with the maximum timestamp or 0 if no
row exist
+`doubleLast` computes the metric value with the maximum timestamp or 0 in
default mode or `null` in SQL compatible mode if no row exist
```json
{
@@ -160,7 +160,7 @@ Note that queries with first/last aggregators on a segment
created with rollup e
#### `floatFirst` aggregator
-`floatFirst` computes the metric value with the minimum timestamp or 0 if no
row exist
+`floatFirst` computes the metric value with the minimum timestamp or 0 in
default mode or `null` in SQL compatible mode if no row exist
```json
{
@@ -172,7 +172,7 @@ Note that queries with first/last aggregators on a segment
created with rollup e
#### `floatLast` aggregator
-`floatLast` computes the metric value with the maximum timestamp or 0 if no
row exist
+`floatLast` computes the metric value with the maximum timestamp or 0 in
default mode or `null` in SQL compatible mode if no row exist
```json
{
@@ -184,7 +184,7 @@ Note that queries with first/last aggregators on a segment
created with rollup e
#### `longFirst` aggregator
-`longFirst` computes the metric value with the minimum timestamp or 0 if no
row exist
+`longFirst` computes the metric value with the minimum timestamp or 0 in
default mode or `null` in SQL compatible mode if no row exist
```json
{
@@ -196,7 +196,7 @@ Note that queries with first/last aggregators on a segment
created with rollup e
#### `longLast` aggregator
-`longLast` computes the metric value with the maximum timestamp or 0 if no row
exist
+`longLast` computes the metric value with the maximum timestamp or 0 in
default mode or `null` in SQL compatible mode if no row exist
```json
{
diff --git a/docs/querying/sql.md b/docs/querying/sql.md
index 8aa1367..f29618d 100644
--- a/docs/querying/sql.md
+++ b/docs/querying/sql.md
@@ -199,9 +199,9 @@ Only the COUNT aggregation can accept DISTINCT.
|`STDDEV_POP(expr)`|Computes standard deviation population of `expr`. See
[stats extension](../development/extensions-core/stats.html) documentation for
additional details.|
|`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats
extension](../development/extensions-core/stats.html) documentation for
additional details.|
|`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats
extension](../development/extensions-core/stats.html) documentation for
additional details.|
-|`EARLIEST(expr)`|Returns the earliest non-null value of `expr`, which must be
numeric. If `expr` comes from a relation with a timestamp column (like a Druid
datasource) then "earliest" is the value first encountered with the minimum
overall timestamp of all values being aggregated. If `expr` does not come from
a relation with a timestamp, then it is simply the first value encountered.|
+|`EARLIEST(expr)`|Returns the earliest value of `expr`, which must be numeric.
If `expr` comes from a relation with a timestamp column (like a Druid
datasource) then "earliest" is the value first encountered with the minimum
overall timestamp of all values being aggregated. If `expr` does not come from
a relation with a timestamp, then it is simply the first value encountered.|
|`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings.
The `maxBytesPerString` parameter determines how much aggregation space to
allocate per string. Strings longer than this limit will be truncated. This
parameter should be set as low as possible, since high values will lead to
wasted memory.|
-|`LATEST(expr)`|Returns the latest non-null value of `expr`, which must be
numeric. If `expr` comes from a relation with a timestamp column (like a Druid
datasource) then "latest" is the value last encountered with the maximum
overall timestamp of all values being aggregated. If `expr` does not come from
a relation with a timestamp, then it is simply the last value encountered.|
+|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If
`expr` comes from a relation with a timestamp column (like a Druid datasource)
then "latest" is the value last encountered with the maximum overall timestamp
of all values being aggregated. If `expr` does not come from a relation with a
timestamp, then it is simply the last value encountered.|
|`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The
`maxBytesPerString` parameter determines how much aggregation space to allocate
per string. Strings longer than this limit will be truncated. This parameter
should be set as low as possible, since high values will lead to wasted memory.|
For advice on choosing approximate aggregation functions, check out our
[approximate aggregations documentation](aggregations.html#approx).
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregator.java
index 11a3558..8b4a89d 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregator.java
@@ -20,42 +20,29 @@
package org.apache.druid.query.aggregation.first;
import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
-public class DoubleFirstAggregator implements Aggregator
+public class DoubleFirstAggregator extends
NumericFirstAggregator<BaseDoubleColumnValueSelector>
{
-
- private final BaseDoubleColumnValueSelector valueSelector;
- private final BaseLongColumnValueSelector timeSelector;
-
- protected long firstTime;
- protected double firstValue;
+ double firstValue;
public DoubleFirstAggregator(BaseLongColumnValueSelector timeSelector,
BaseDoubleColumnValueSelector valueSelector)
{
- this.valueSelector = valueSelector;
- this.timeSelector = timeSelector;
-
- firstTime = Long.MAX_VALUE;
+ super(timeSelector, valueSelector);
firstValue = 0;
}
@Override
- public void aggregate()
+ void setCurrentValue()
{
- long time = timeSelector.getLong();
- if (time < firstTime) {
- firstTime = time;
- firstValue = valueSelector.getDouble();
- }
+ firstValue = valueSelector.getDouble();
}
@Override
public Object get()
{
- return new SerializablePair<>(firstTime, firstValue);
+ return new SerializablePair<>(firstTime, rhsNull ? null : firstValue);
}
@Override
@@ -75,11 +62,5 @@ public class DoubleFirstAggregator implements Aggregator
{
return (long) firstValue;
}
-
- @Override
- public void close()
- {
-
- }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java
index eb39e35..0920ee3 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java
@@ -30,10 +30,11 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
@@ -45,10 +46,34 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class DoubleFirstAggregatorFactory extends
NullableNumericAggregatorFactory<ColumnValueSelector>
+public class DoubleFirstAggregatorFactory extends AggregatorFactory
{
+ private static final Aggregator NIL_AGGREGATOR = new DoubleFirstAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance()
+ )
+ {
+ @Override
+ public void aggregate()
+ {
+ // no-op
+ }
+ };
+
+ private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new
DoubleFirstBufferAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance()
+ )
+ {
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ // no-op
+ }
+ };
+
public static final Comparator<SerializablePair<Long, Double>>
VALUE_COMPARATOR =
- Comparator.comparingDouble(o -> o.rhs);
+ SerializablePair.createNullHandlingComparator(Double::compare, true);
private final String fieldName;
private final String name;
@@ -69,24 +94,31 @@ public class DoubleFirstAggregatorFactory extends
NullableNumericAggregatorFacto
}
@Override
- protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory)
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
- return metricFactory.makeColumnValueSelector(fieldName);
- }
-
- @Override
- protected Aggregator factorize(ColumnSelectorFactory metricFactory,
ColumnValueSelector selector)
- {
- return new
DoubleFirstAggregator(metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
selector);
+ final BaseDoubleColumnValueSelector valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_AGGREGATOR;
+ } else {
+ return new DoubleFirstAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector
+ );
+ }
}
@Override
- protected BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory, ColumnValueSelector selector)
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
- return new DoubleFirstBufferAggregator(
- metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
- selector
- );
+ final BaseDoubleColumnValueSelector valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_BUFFER_AGGREGATOR;
+ } else {
+ return new DoubleFirstBufferAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector
+ );
+ }
}
@Override
@@ -126,35 +158,54 @@ public class DoubleFirstAggregatorFactory extends
NullableNumericAggregatorFacto
return new DoubleFirstAggregatorFactory(name, name)
{
@Override
- public Aggregator factorize(ColumnSelectorFactory metricFactory,
ColumnValueSelector selector)
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
+ final ColumnValueSelector<SerializablePair<Long, Double>> selector =
+ metricFactory.makeColumnValueSelector(name);
return new DoubleFirstAggregator(null, null)
{
@Override
public void aggregate()
{
- SerializablePair<Long, Double> pair = (SerializablePair<Long,
Double>) selector.getObject();
+ SerializablePair<Long, Double> pair = selector.getObject();
if (pair.lhs < firstTime) {
firstTime = pair.lhs;
- firstValue = pair.rhs;
+ if (pair.rhs != null) {
+ firstValue = pair.rhs;
+ rhsNull = false;
+ } else {
+ rhsNull = true;
+ }
}
}
};
}
@Override
- public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory, ColumnValueSelector selector)
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
+ final ColumnValueSelector<SerializablePair<Long, Double>> selector =
+ metricFactory.makeColumnValueSelector(name);
return new DoubleFirstBufferAggregator(null, null)
{
@Override
+ public void putValue(ByteBuffer buf, int position)
+ {
+ SerializablePair<Long, Double> pair = selector.getObject();
+ buf.putDouble(position, pair.rhs);
+ }
+
+ @Override
public void aggregate(ByteBuffer buf, int position)
{
SerializablePair<Long, Double> pair = (SerializablePair<Long,
Double>) selector.getObject();
long firstTime = buf.getLong(position);
if (pair.lhs < firstTime) {
- buf.putLong(position, pair.lhs);
- buf.putDouble(position + Long.BYTES, pair.rhs);
+ if (pair.rhs != null) {
+ updateTimeWithValue(buf, position, pair.lhs);
+ } else {
+ updateTimeWithNull(buf, position, pair.lhs);
+ }
}
}
@@ -178,6 +229,9 @@ public class DoubleFirstAggregatorFactory extends
NullableNumericAggregatorFacto
public Object deserialize(Object object)
{
Map map = (Map) object;
+ if (map.get("rhs") == null) {
+ return new SerializablePair<>(((Number) map.get("lhs")).longValue(),
null);
+ }
return new SerializablePair<>(((Number) map.get("lhs")).longValue(),
((Number) map.get("rhs")).doubleValue());
}
@@ -221,16 +275,15 @@ public class DoubleFirstAggregatorFactory extends
NullableNumericAggregatorFacto
@Override
public String getTypeName()
{
- if (storeDoubleAsFloat) {
- return "float";
- }
- return "double";
+ // if we don't pretend to be a primitive, group by v1 gets sad and doesn't
work because no complex type serde
+ return storeDoubleAsFloat ? "float" : "double";
}
@Override
public int getMaxIntermediateSize()
{
- return Long.BYTES + Double.BYTES;
+ // timestamp, is null, value
+ return Long.BYTES + Byte.BYTES + Double.BYTES;
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstBufferAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstBufferAggregator.java
index e917c68..dabade4 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstBufferAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstBufferAggregator.java
@@ -20,79 +20,55 @@
package org.apache.druid.query.aggregation.first;
import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import java.nio.ByteBuffer;
-public class DoubleFirstBufferAggregator implements BufferAggregator
+public class DoubleFirstBufferAggregator extends
NumericFirstBufferAggregator<BaseDoubleColumnValueSelector>
{
- private final BaseLongColumnValueSelector timeSelector;
- private final BaseDoubleColumnValueSelector valueSelector;
-
public DoubleFirstBufferAggregator(
BaseLongColumnValueSelector timeSelector,
BaseDoubleColumnValueSelector valueSelector
)
{
- this.timeSelector = timeSelector;
- this.valueSelector = valueSelector;
+ super(timeSelector, valueSelector);
}
@Override
- public void init(ByteBuffer buf, int position)
+ void initValue(ByteBuffer buf, int position)
{
- buf.putLong(position, Long.MAX_VALUE);
- buf.putDouble(position + Long.BYTES, 0);
+ buf.putDouble(position, 0);
}
@Override
- public void aggregate(ByteBuffer buf, int position)
+ void putValue(ByteBuffer buf, int position)
{
- long time = timeSelector.getLong();
- long firstTime = buf.getLong(position);
- if (time < firstTime) {
- buf.putLong(position, time);
- buf.putDouble(position + Long.BYTES, valueSelector.getDouble());
- }
+ buf.putDouble(position, valueSelector.getDouble());
}
@Override
public Object get(ByteBuffer buf, int position)
{
- return new SerializablePair<>(buf.getLong(position),
buf.getDouble(position + Long.BYTES));
+ final boolean rhsNull = isValueNull(buf, position);
+ return new SerializablePair<>(buf.getLong(position), rhsNull ? null :
buf.getDouble(position + VALUE_OFFSET));
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
- return (float) buf.getDouble(position + Long.BYTES);
+ return (float) buf.getDouble(position + VALUE_OFFSET);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
- return (long) buf.getDouble(position + Long.BYTES);
+ return (long) buf.getDouble(position + VALUE_OFFSET);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
- return buf.getDouble(position + Long.BYTES);
- }
-
- @Override
- public void close()
- {
- // no resources to cleanup
- }
-
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("timeSelector", timeSelector);
- inspector.visit("valueSelector", valueSelector);
+ return buf.getDouble(position + VALUE_OFFSET);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregator.java
index af7167c..2c0f629 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregator.java
@@ -20,45 +20,32 @@
package org.apache.druid.query.aggregation.first;
import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
-public class FloatFirstAggregator implements Aggregator
+public class FloatFirstAggregator extends
NumericFirstAggregator<BaseFloatColumnValueSelector>
{
-
- private final BaseFloatColumnValueSelector valueSelector;
- private final BaseLongColumnValueSelector timeSelector;
-
- protected long firstTime;
- protected float firstValue;
+ float firstValue;
public FloatFirstAggregator(
BaseLongColumnValueSelector timeSelector,
BaseFloatColumnValueSelector valueSelector
)
{
- this.valueSelector = valueSelector;
- this.timeSelector = timeSelector;
-
- firstTime = Long.MAX_VALUE;
+ super(timeSelector, valueSelector);
firstValue = 0;
}
@Override
- public void aggregate()
+ void setCurrentValue()
{
- long time = timeSelector.getLong();
- if (time < firstTime) {
- firstTime = time;
- firstValue = valueSelector.getFloat();
- }
+ firstValue = valueSelector.getFloat();
}
@Override
public Object get()
{
- return new SerializablePair<>(firstTime, firstValue);
+ return new SerializablePair<>(firstTime, rhsNull ? null : firstValue);
}
@Override
@@ -70,7 +57,7 @@ public class FloatFirstAggregator implements Aggregator
@Override
public double getDouble()
{
- return (double) firstValue;
+ return firstValue;
}
@Override
@@ -78,11 +65,5 @@ public class FloatFirstAggregator implements Aggregator
{
return (long) firstValue;
}
-
- @Override
- public void close()
- {
-
- }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java
index 4a2d1c1..74d10fa 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java
@@ -30,10 +30,11 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
@@ -45,10 +46,34 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class FloatFirstAggregatorFactory extends
NullableNumericAggregatorFactory<ColumnValueSelector>
+public class FloatFirstAggregatorFactory extends AggregatorFactory
{
+ private static final Aggregator NIL_AGGREGATOR = new FloatFirstAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance()
+ )
+ {
+ @Override
+ public void aggregate()
+ {
+ // no-op
+ }
+ };
+
+ private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new
FloatFirstBufferAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance()
+ )
+ {
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ // no-op
+ }
+ };
+
public static final Comparator<SerializablePair<Long, Float>>
VALUE_COMPARATOR =
- Comparator.comparingDouble(o -> o.rhs);
+ SerializablePair.createNullHandlingComparator(Float::compare, true);
private final String fieldName;
private final String name;
@@ -67,24 +92,31 @@ public class FloatFirstAggregatorFactory extends
NullableNumericAggregatorFactor
}
@Override
- protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory)
- {
- return metricFactory.makeColumnValueSelector(fieldName);
- }
-
- @Override
- protected Aggregator factorize(ColumnSelectorFactory metricFactory,
ColumnValueSelector selector)
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
- return new
FloatFirstAggregator(metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
selector);
+ final BaseFloatColumnValueSelector valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_AGGREGATOR;
+ } else {
+ return new FloatFirstAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector
+ );
+ }
}
@Override
- protected BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory, ColumnValueSelector selector)
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
- return new FloatFirstBufferAggregator(
- metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
- selector
- );
+ final BaseFloatColumnValueSelector valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_BUFFER_AGGREGATOR;
+ } else {
+ return new FloatFirstBufferAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector
+ );
+ }
}
@Override
@@ -121,38 +153,56 @@ public class FloatFirstAggregatorFactory extends
NullableNumericAggregatorFactor
@Override
public AggregatorFactory getCombiningFactory()
{
+
return new FloatFirstAggregatorFactory(name, name)
{
@Override
- public Aggregator factorize(ColumnSelectorFactory metricFactory,
ColumnValueSelector selector)
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
+ final ColumnValueSelector<SerializablePair<Long, Float>> selector =
metricFactory.makeColumnValueSelector(name);
return new FloatFirstAggregator(null, null)
{
@Override
public void aggregate()
{
- SerializablePair<Long, Float> pair = (SerializablePair<Long,
Float>) selector.getObject();
+ SerializablePair<Long, Float> pair = selector.getObject();
if (pair.lhs < firstTime) {
firstTime = pair.lhs;
- firstValue = pair.rhs;
+ if (pair.rhs != null) {
+ firstValue = pair.rhs;
+ rhsNull = false;
+ } else {
+ rhsNull = true;
+ }
}
}
};
}
@Override
- public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory, ColumnValueSelector selector)
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
+ final ColumnValueSelector<SerializablePair<Long, Float>> selector =
metricFactory.makeColumnValueSelector(name);
return new FloatFirstBufferAggregator(null, null)
{
@Override
+ public void putValue(ByteBuffer buf, int position)
+ {
+ SerializablePair<Long, Float> pair = selector.getObject();
+ buf.putFloat(position, pair.rhs);
+ }
+
+ @Override
public void aggregate(ByteBuffer buf, int position)
{
- SerializablePair<Long, Float> pair = (SerializablePair<Long,
Float>) selector.getObject();
+ SerializablePair<Long, Float> pair = selector.getObject();
long firstTime = buf.getLong(position);
if (pair.lhs < firstTime) {
- buf.putLong(position, pair.lhs);
- buf.putFloat(position + Long.BYTES, pair.rhs);
+ if (pair.rhs != null) {
+ updateTimeWithValue(buf, position, pair.lhs);
+ } else {
+ updateTimeWithNull(buf, position, pair.lhs);
+ }
}
}
@@ -176,6 +226,9 @@ public class FloatFirstAggregatorFactory extends
NullableNumericAggregatorFactor
public Object deserialize(Object object)
{
Map map = (Map) object;
+ if (map.get("rhs") == null) {
+ return new SerializablePair<>(((Number) map.get("lhs")).longValue(),
null);
+ }
return new SerializablePair<>(((Number) map.get("lhs")).longValue(),
((Number) map.get("rhs")).floatValue());
}
@@ -219,13 +272,15 @@ public class FloatFirstAggregatorFactory extends
NullableNumericAggregatorFactor
@Override
public String getTypeName()
{
+ // if we don't pretend to be a primitive, group by v1 gets sad and doesn't
work because no complex type serde
return "float";
}
@Override
public int getMaxIntermediateSize()
{
- return Long.BYTES + Float.BYTES;
+ // timestamp, is null, value
+ return Long.BYTES + Byte.BYTES + Float.BYTES;
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstBufferAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstBufferAggregator.java
index d7797b5..cf7d272 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstBufferAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstBufferAggregator.java
@@ -20,79 +20,55 @@
package org.apache.druid.query.aggregation.first;
import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import java.nio.ByteBuffer;
-public class FloatFirstBufferAggregator implements BufferAggregator
+public class FloatFirstBufferAggregator extends
NumericFirstBufferAggregator<BaseFloatColumnValueSelector>
{
- private final BaseLongColumnValueSelector timeSelector;
- private final BaseFloatColumnValueSelector valueSelector;
-
public FloatFirstBufferAggregator(
BaseLongColumnValueSelector timeSelector,
BaseFloatColumnValueSelector valueSelector
)
{
- this.timeSelector = timeSelector;
- this.valueSelector = valueSelector;
+ super(timeSelector, valueSelector);
}
@Override
- public void init(ByteBuffer buf, int position)
+ void initValue(ByteBuffer buf, int position)
{
- buf.putLong(position, Long.MAX_VALUE);
- buf.putFloat(position + Long.BYTES, 0);
+ buf.putFloat(position, 0);
}
@Override
- public void aggregate(ByteBuffer buf, int position)
+ void putValue(ByteBuffer buf, int position)
{
- long time = timeSelector.getLong();
- long firstTime = buf.getLong(position);
- if (time < firstTime) {
- buf.putLong(position, time);
- buf.putFloat(position + Long.BYTES, valueSelector.getFloat());
- }
+ buf.putFloat(position, valueSelector.getFloat());
}
@Override
public Object get(ByteBuffer buf, int position)
{
- return new SerializablePair<>(buf.getLong(position), buf.getFloat(position
+ Long.BYTES));
+ final boolean rhsNull = isValueNull(buf, position);
+ return new SerializablePair<>(buf.getLong(position), rhsNull ? null :
buf.getFloat(position + VALUE_OFFSET));
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
- return buf.getFloat(position + Long.BYTES);
+ return buf.getFloat(position + VALUE_OFFSET);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
- return (long) buf.getFloat(position + Long.BYTES);
+ return (long) buf.getFloat(position + VALUE_OFFSET);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
- return (double) buf.getFloat(position + Long.BYTES);
- }
-
- @Override
- public void close()
- {
- // no resources to cleanup
- }
-
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("timeSelector", timeSelector);
- inspector.visit("valueSelector", valueSelector);
+ return buf.getFloat(position + VALUE_OFFSET);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregator.java
index d1fd9ae..8cda544 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregator.java
@@ -20,41 +20,28 @@
package org.apache.druid.query.aggregation.first;
import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseLongColumnValueSelector;
-public class LongFirstAggregator implements Aggregator
+public class LongFirstAggregator extends
NumericFirstAggregator<BaseLongColumnValueSelector>
{
-
- private final BaseLongColumnValueSelector valueSelector;
- private final BaseLongColumnValueSelector timeSelector;
-
- protected long firstTime;
- protected long firstValue;
+ long firstValue;
public LongFirstAggregator(BaseLongColumnValueSelector timeSelector,
BaseLongColumnValueSelector valueSelector)
{
- this.valueSelector = valueSelector;
- this.timeSelector = timeSelector;
-
- firstTime = Long.MAX_VALUE;
+ super(timeSelector, valueSelector);
firstValue = 0;
}
@Override
- public void aggregate()
+ void setCurrentValue()
{
- long time = timeSelector.getLong();
- if (time < firstTime) {
- firstTime = time;
- firstValue = valueSelector.getLong();
- }
+ firstValue = valueSelector.getLong();
}
@Override
public Object get()
{
- return new SerializablePair<>(firstTime, firstValue);
+ return new SerializablePair<>(firstTime, rhsNull ? null : firstValue);
}
@Override
@@ -74,10 +61,4 @@ public class LongFirstAggregator implements Aggregator
{
return firstValue;
}
-
- @Override
- public void close()
- {
-
- }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java
index 399f1fd..5b7e16d 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java
@@ -30,10 +30,11 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
@@ -44,10 +45,34 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
-public class LongFirstAggregatorFactory extends
NullableNumericAggregatorFactory<ColumnValueSelector>
+public class LongFirstAggregatorFactory extends AggregatorFactory
{
+ private static final Aggregator NIL_AGGREGATOR = new LongFirstAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance()
+ )
+ {
+ @Override
+ public void aggregate()
+ {
+ // no-op
+ }
+ };
+
+ private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new
LongFirstBufferAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance()
+ )
+ {
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ // no-op
+ }
+ };
+
public static final Comparator<SerializablePair<Long, Long>>
VALUE_COMPARATOR =
- Comparator.comparingLong(o -> o.rhs);
+ SerializablePair.createNullHandlingComparator(Long::compare, true);
private final String fieldName;
private final String name;
@@ -66,24 +91,31 @@ public class LongFirstAggregatorFactory extends
NullableNumericAggregatorFactory
}
@Override
- protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory)
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
- return metricFactory.makeColumnValueSelector(fieldName);
- }
-
- @Override
- protected Aggregator factorize(ColumnSelectorFactory metricFactory,
ColumnValueSelector selector)
- {
- return new
LongFirstAggregator(metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
selector);
+ final BaseLongColumnValueSelector valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_AGGREGATOR;
+ } else {
+ return new LongFirstAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector
+ );
+ }
}
@Override
- protected BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory, ColumnValueSelector selector)
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
- return new LongFirstBufferAggregator(
- metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
- selector
- );
+ final BaseLongColumnValueSelector valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_BUFFER_AGGREGATOR;
+ } else {
+ return new LongFirstBufferAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector
+ );
+ }
}
@Override
@@ -123,35 +155,52 @@ public class LongFirstAggregatorFactory extends
NullableNumericAggregatorFactory
return new LongFirstAggregatorFactory(name, name)
{
@Override
- public Aggregator factorize(ColumnSelectorFactory metricFactory,
ColumnValueSelector selector)
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
+ final ColumnValueSelector<SerializablePair<Long, Long>> selector =
metricFactory.makeColumnValueSelector(name);
return new LongFirstAggregator(null, null)
{
@Override
public void aggregate()
{
- SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>)
selector.getObject();
+ SerializablePair<Long, Long> pair = selector.getObject();
if (pair.lhs < firstTime) {
firstTime = pair.lhs;
- firstValue = pair.rhs;
+ if (pair.rhs != null) {
+ firstValue = pair.rhs;
+ rhsNull = false;
+ } else {
+ rhsNull = true;
+ }
}
}
};
}
@Override
- public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory, ColumnValueSelector selector)
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
+ final ColumnValueSelector<SerializablePair<Long, Long>> selector =
metricFactory.makeColumnValueSelector(name);
return new LongFirstBufferAggregator(null, null)
{
@Override
+ public void putValue(ByteBuffer buf, int position)
+ {
+ SerializablePair<Long, Long> pair = selector.getObject();
+ buf.putLong(position, pair.rhs);
+ }
+
+ @Override
public void aggregate(ByteBuffer buf, int position)
{
- SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>)
selector.getObject();
+ SerializablePair<Long, Long> pair = selector.getObject();
long firstTime = buf.getLong(position);
if (pair.lhs < firstTime) {
- buf.putLong(position, pair.lhs);
- buf.putLong(position + Long.BYTES, pair.rhs);
+ if (pair.rhs != null) {
+ updateTimeWithValue(buf, position, pair.lhs);
+ } else {
+ updateTimeWithNull(buf, position, pair.lhs);
+ }
}
}
@@ -175,6 +224,9 @@ public class LongFirstAggregatorFactory extends
NullableNumericAggregatorFactory
public Object deserialize(Object object)
{
Map map = (Map) object;
+ if (map.get("rhs") == null) {
+ return new SerializablePair<>(((Number) map.get("lhs")).longValue(),
null);
+ }
return new SerializablePair<>(((Number) map.get("lhs")).longValue(),
((Number) map.get("rhs")).longValue());
}
@@ -218,13 +270,15 @@ public class LongFirstAggregatorFactory extends
NullableNumericAggregatorFactory
@Override
public String getTypeName()
{
+ // if we don't pretend to be a primitive, group by v1 gets sad and doesn't
work because no complex type serde
return "long";
}
@Override
public int getMaxIntermediateSize()
{
- return Long.BYTES * 2;
+ // timestamp, is null, value
+ return Long.BYTES + Byte.BYTES + Long.BYTES;
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstBufferAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstBufferAggregator.java
index 2ef812a..582cda1 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstBufferAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstBufferAggregator.java
@@ -20,75 +20,51 @@
package org.apache.druid.query.aggregation.first;
import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import java.nio.ByteBuffer;
-public class LongFirstBufferAggregator implements BufferAggregator
+public class LongFirstBufferAggregator extends
NumericFirstBufferAggregator<BaseLongColumnValueSelector>
{
- private final BaseLongColumnValueSelector timeSelector;
- private final BaseLongColumnValueSelector valueSelector;
-
public LongFirstBufferAggregator(BaseLongColumnValueSelector timeSelector,
BaseLongColumnValueSelector valueSelector)
{
- this.timeSelector = timeSelector;
- this.valueSelector = valueSelector;
+ super(timeSelector, valueSelector);
}
@Override
- public void init(ByteBuffer buf, int position)
+ void initValue(ByteBuffer buf, int position)
{
- buf.putLong(position, Long.MAX_VALUE);
- buf.putLong(position + Long.BYTES, 0);
+ buf.putLong(position, 0);
}
@Override
- public void aggregate(ByteBuffer buf, int position)
+ void putValue(ByteBuffer buf, int position)
{
- long time = timeSelector.getLong();
- long firstTime = buf.getLong(position);
- if (time < firstTime) {
- buf.putLong(position, time);
- buf.putLong(position + Long.BYTES, valueSelector.getLong());
- }
+ buf.putLong(position, valueSelector.getLong());
}
@Override
public Object get(ByteBuffer buf, int position)
{
- return new SerializablePair<>(buf.getLong(position), buf.getLong(position
+ Long.BYTES));
+ final boolean rhsNull = isValueNull(buf, position);
+ return new SerializablePair<>(buf.getLong(position), rhsNull ? null :
buf.getLong(position + VALUE_OFFSET));
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
- return (float) buf.getLong(position + Long.BYTES);
+ return (float) buf.getLong(position + VALUE_OFFSET);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
- return buf.getLong(position + Long.BYTES);
+ return buf.getLong(position + VALUE_OFFSET);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
- return (double) buf.getLong(position + Long.BYTES);
- }
-
- @Override
- public void close()
- {
- // no resources to cleanup
- }
-
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("timeSelector", timeSelector);
- inspector.visit("valueSelector", valueSelector);
+ return (double) buf.getLong(position + VALUE_OFFSET);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
similarity index 60%
copy from
processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregator.java
copy to
processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
index d1fd9ae..e7b60b9 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
@@ -19,65 +19,56 @@
package org.apache.druid.query.aggregation.first;
-import org.apache.druid.collections.SerializablePair;
+import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseLongColumnValueSelector;
+import org.apache.druid.segment.BaseNullableColumnValueSelector;
-public class LongFirstAggregator implements Aggregator
+/**
+ * Base type for on heap 'first' aggregator for primitive numeric column
selectors
+ */
+public abstract class NumericFirstAggregator<TSelector extends
BaseNullableColumnValueSelector> implements Aggregator
{
-
- private final BaseLongColumnValueSelector valueSelector;
+ private final boolean useDefault = NullHandling.replaceWithDefault();
private final BaseLongColumnValueSelector timeSelector;
- protected long firstTime;
- protected long firstValue;
+ final TSelector valueSelector;
+
+ long firstTime;
+ boolean rhsNull;
- public LongFirstAggregator(BaseLongColumnValueSelector timeSelector,
BaseLongColumnValueSelector valueSelector)
+ public NumericFirstAggregator(BaseLongColumnValueSelector timeSelector,
TSelector valueSelector)
{
- this.valueSelector = valueSelector;
this.timeSelector = timeSelector;
+ this.valueSelector = valueSelector;
firstTime = Long.MAX_VALUE;
- firstValue = 0;
+ rhsNull = !useDefault;
}
+ /**
+ * Store the current primitive typed 'first' value
+ */
+ abstract void setCurrentValue();
+
@Override
public void aggregate()
{
long time = timeSelector.getLong();
if (time < firstTime) {
firstTime = time;
- firstValue = valueSelector.getLong();
+ if (useDefault || !valueSelector.isNull()) {
+ setCurrentValue();
+ rhsNull = false;
+ } else {
+ rhsNull = true;
+ }
}
}
@Override
- public Object get()
- {
- return new SerializablePair<>(firstTime, firstValue);
- }
-
- @Override
- public float getFloat()
- {
- return (float) firstValue;
- }
-
- @Override
- public double getDouble()
- {
- return (double) firstValue;
- }
-
- @Override
- public long getLong()
- {
- return firstValue;
- }
-
- @Override
public void close()
{
-
+ // nothing to close
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstBufferAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
similarity index 50%
copy from
processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstBufferAggregator.java
copy to
processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
index e917c68..ebb0a87 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstBufferAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
@@ -19,68 +19,82 @@
package org.apache.druid.query.aggregation.first;
-import org.apache.druid.collections.SerializablePair;
+import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
-import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
+import org.apache.druid.segment.BaseNullableColumnValueSelector;
import java.nio.ByteBuffer;
-public class DoubleFirstBufferAggregator implements BufferAggregator
+/**
+ * Base type for buffer based 'first' aggregator for primitive numeric column
selectors
+ */
+public abstract class NumericFirstBufferAggregator<TSelector extends
BaseNullableColumnValueSelector>
+ implements BufferAggregator
{
+ static final int NULL_OFFSET = Long.BYTES;
+ static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES;
+
+ private final boolean useDefault = NullHandling.replaceWithDefault();
private final BaseLongColumnValueSelector timeSelector;
- private final BaseDoubleColumnValueSelector valueSelector;
- public DoubleFirstBufferAggregator(
- BaseLongColumnValueSelector timeSelector,
- BaseDoubleColumnValueSelector valueSelector
- )
+ final TSelector valueSelector;
+
+ public NumericFirstBufferAggregator(BaseLongColumnValueSelector
timeSelector, TSelector valueSelector)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
}
- @Override
- public void init(ByteBuffer buf, int position)
- {
- buf.putLong(position, Long.MAX_VALUE);
- buf.putDouble(position + Long.BYTES, 0);
- }
+ /**
+ * Initialize the buffer value at the position of {@link #VALUE_OFFSET}
+ */
+ abstract void initValue(ByteBuffer buf, int position);
- @Override
- public void aggregate(ByteBuffer buf, int position)
+ /**
+ * Place the primitive value in the buffer at the position of {@link
#VALUE_OFFSET}
+ */
+ abstract void putValue(ByteBuffer buf, int position);
+
+ void updateTimeWithValue(ByteBuffer buf, int position, long time)
{
- long time = timeSelector.getLong();
- long firstTime = buf.getLong(position);
- if (time < firstTime) {
- buf.putLong(position, time);
- buf.putDouble(position + Long.BYTES, valueSelector.getDouble());
- }
+ buf.putLong(position, time);
+ buf.put(position + NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
+ putValue(buf, position + VALUE_OFFSET);
}
- @Override
- public Object get(ByteBuffer buf, int position)
+ void updateTimeWithNull(ByteBuffer buf, int position, long time)
{
- return new SerializablePair<>(buf.getLong(position),
buf.getDouble(position + Long.BYTES));
+ buf.putLong(position, time);
+ buf.put(position + NULL_OFFSET, NullHandling.IS_NULL_BYTE);
}
- @Override
- public float getFloat(ByteBuffer buf, int position)
+ boolean isValueNull(ByteBuffer buf, int position)
{
- return (float) buf.getDouble(position + Long.BYTES);
+ return buf.get(position + NULL_OFFSET) == NullHandling.IS_NULL_BYTE;
}
@Override
- public long getLong(ByteBuffer buf, int position)
+ public void init(ByteBuffer buf, int position)
{
- return (long) buf.getDouble(position + Long.BYTES);
+ buf.putLong(position, Long.MAX_VALUE);
+ buf.put(position + NULL_OFFSET, useDefault ? NullHandling.IS_NOT_NULL_BYTE
: NullHandling.IS_NULL_BYTE);
+ initValue(buf, position + VALUE_OFFSET);
}
@Override
- public double getDouble(ByteBuffer buf, int position)
+ public void aggregate(ByteBuffer buf, int position)
{
- return buf.getDouble(position + Long.BYTES);
+ long time = timeSelector.getLong();
+ long firstTime = buf.getLong(position);
+ if (time < firstTime) {
+ if (useDefault || !valueSelector.isNull()) {
+ updateTimeWithValue(buf, position, time);
+ } else {
+ updateTimeWithNull(buf, position, time);
+ }
+ }
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
index 2d5ee99..5b581d5 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
@@ -64,7 +64,7 @@ public class StringFirstAggregator implements Aggregator
valueSelector
);
- if (inPair != null && inPair.rhs != null && inPair.lhs < firstTime) {
+ if (inPair != null && inPair.lhs < firstTime) {
firstTime = inPair.lhs;
firstValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes);
}
@@ -73,11 +73,8 @@ public class StringFirstAggregator implements Aggregator
if (time < firstTime) {
final String value =
DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
-
- if (value != null) {
- firstTime = time;
- firstValue = StringUtils.fastLooseChop(value, maxStringBytes);
- }
+ firstTime = time;
+ firstValue = StringUtils.fastLooseChop(value, maxStringBytes);
}
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
index 6ad8558..ada3698 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
@@ -34,9 +34,11 @@ import
org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -47,6 +49,34 @@ import java.util.Objects;
@JsonTypeName("stringFirst")
public class StringFirstAggregatorFactory extends AggregatorFactory
{
+ private static final Aggregator NIL_AGGREGATOR = new StringFirstAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance(),
+ 0,
+ false
+ )
+ {
+ @Override
+ public void aggregate()
+ {
+ // no-op
+ }
+ };
+
+ private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new
StringFirstBufferAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance(),
+ 0,
+ false
+ )
+ {
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ // no-op
+ }
+ };
+
public static final int DEFAULT_MAX_STRING_SIZE = 1024;
public static final Comparator TIME_COMPARATOR = (o1, o2) -> Longs.compare(
@@ -120,24 +150,32 @@ public class StringFirstAggregatorFactory extends
AggregatorFactory
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final BaseObjectColumnValueSelector<?> valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
- return new StringFirstAggregator(
- metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
- valueSelector,
- maxStringBytes,
- StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector,
metricFactory.getColumnCapabilities(fieldName))
- );
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_AGGREGATOR;
+ } else {
+ return new StringFirstAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector,
+ maxStringBytes,
+ StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector,
metricFactory.getColumnCapabilities(fieldName))
+ );
+ }
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
final BaseObjectColumnValueSelector<?> valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
- return new StringFirstBufferAggregator(
- metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
- valueSelector,
- maxStringBytes,
- StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector,
metricFactory.getColumnCapabilities(fieldName))
- );
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_BUFFER_AGGREGATOR;
+ } else {
+ return new StringFirstBufferAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector,
+ maxStringBytes,
+ StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector,
metricFactory.getColumnCapabilities(fieldName))
+ );
+ }
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
index b7d5ac8..d84793e 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
@@ -71,7 +71,7 @@ public class StringFirstBufferAggregator implements
BufferAggregator
valueSelector
);
- if (inPair != null && inPair.rhs != null) {
+ if (inPair != null) {
final long firstTime = buf.getLong(position);
if (inPair.lhs < firstTime) {
StringFirstLastUtils.writePair(
@@ -89,14 +89,12 @@ public class StringFirstBufferAggregator implements
BufferAggregator
if (time < firstTime) {
final String value =
DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
- if (value != null) {
- StringFirstLastUtils.writePair(
- buf,
- position,
- new SerializablePairLongString(time, value),
- maxStringBytes
- );
- }
+ StringFirstLastUtils.writePair(
+ buf,
+ position,
+ new SerializablePairLongString(time, value),
+ maxStringBytes
+ );
}
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregator.java
index d6678d7..3f6a150 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregator.java
@@ -20,42 +20,29 @@
package org.apache.druid.query.aggregation.last;
import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
-public class DoubleLastAggregator implements Aggregator
+public class DoubleLastAggregator extends
NumericLastAggregator<BaseDoubleColumnValueSelector>
{
-
- private final BaseDoubleColumnValueSelector valueSelector;
- private final BaseLongColumnValueSelector timeSelector;
-
- protected long lastTime;
- protected double lastValue;
+ double lastValue;
public DoubleLastAggregator(BaseLongColumnValueSelector timeSelector,
BaseDoubleColumnValueSelector valueSelector)
{
- this.valueSelector = valueSelector;
- this.timeSelector = timeSelector;
-
- lastTime = Long.MIN_VALUE;
+ super(timeSelector, valueSelector);
lastValue = 0;
}
@Override
- public void aggregate()
+ void setCurrentValue()
{
- long time = timeSelector.getLong();
- if (time >= lastTime) {
- lastTime = time;
- lastValue = valueSelector.getDouble();
- }
+ lastValue = valueSelector.getDouble();
}
@Override
public Object get()
{
- return new SerializablePair<>(lastTime, lastValue);
+ return new SerializablePair<>(lastTime, rhsNull ? null : lastValue);
}
@Override
@@ -75,10 +62,4 @@ public class DoubleLastAggregator implements Aggregator
{
return lastValue;
}
-
- @Override
- public void close()
- {
-
- }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java
index ad90857..dd2503e 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java
@@ -30,12 +30,13 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
@@ -47,8 +48,31 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class DoubleLastAggregatorFactory extends
NullableNumericAggregatorFactory<ColumnValueSelector>
+public class DoubleLastAggregatorFactory extends AggregatorFactory
{
+ private static final Aggregator NIL_AGGREGATOR = new DoubleLastAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance()
+ )
+ {
+ @Override
+ public void aggregate()
+ {
+ // no-op
+ }
+ };
+
+ private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new
DoubleLastBufferAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance()
+ )
+ {
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ // no-op
+ }
+ };
private final String fieldName;
private final String name;
@@ -68,24 +92,31 @@ public class DoubleLastAggregatorFactory extends
NullableNumericAggregatorFactor
}
@Override
- protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory)
- {
- return metricFactory.makeColumnValueSelector(fieldName);
- }
-
- @Override
- protected Aggregator factorize(ColumnSelectorFactory metricFactory,
ColumnValueSelector selector)
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
- return new
DoubleLastAggregator(metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
selector);
+ final BaseDoubleColumnValueSelector valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_AGGREGATOR;
+ } else {
+ return new DoubleLastAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector
+ );
+ }
}
@Override
- protected BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory, ColumnValueSelector selector)
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
- return new DoubleLastBufferAggregator(
- metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
- selector
- );
+ final BaseDoubleColumnValueSelector valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_BUFFER_AGGREGATOR;
+ } else {
+ return new DoubleLastBufferAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector
+ );
+ }
}
@Override
@@ -125,35 +156,54 @@ public class DoubleLastAggregatorFactory extends
NullableNumericAggregatorFactor
return new DoubleLastAggregatorFactory(name, name)
{
@Override
- public Aggregator factorize(ColumnSelectorFactory metricFactory,
ColumnValueSelector selector)
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
+ final ColumnValueSelector<SerializablePair<Long, Double>> selector =
+ metricFactory.makeColumnValueSelector(name);
return new DoubleLastAggregator(null, null)
{
@Override
public void aggregate()
{
- SerializablePair<Long, Double> pair = (SerializablePair<Long,
Double>) selector.getObject();
+ SerializablePair<Long, Double> pair = selector.getObject();
if (pair.lhs >= lastTime) {
lastTime = pair.lhs;
- lastValue = pair.rhs;
+ if (pair.rhs != null) {
+ lastValue = pair.rhs;
+ rhsNull = false;
+ } else {
+ rhsNull = true;
+ }
}
}
};
}
@Override
- public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory, ColumnValueSelector selector)
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
+ final ColumnValueSelector<SerializablePair<Long, Double>> selector =
+ metricFactory.makeColumnValueSelector(name);
return new DoubleLastBufferAggregator(null, null)
{
@Override
+ public void putValue(ByteBuffer buf, int position)
+ {
+ SerializablePair<Long, Double> pair = selector.getObject();
+ buf.putDouble(position, pair.rhs);
+ }
+
+ @Override
public void aggregate(ByteBuffer buf, int position)
{
- SerializablePair<Long, Double> pair = (SerializablePair<Long,
Double>) selector.getObject();
+ SerializablePair<Long, Double> pair = selector.getObject();
long lastTime = buf.getLong(position);
if (pair.lhs >= lastTime) {
- buf.putLong(position, pair.lhs);
- buf.putDouble(position + Long.BYTES, pair.rhs);
+ if (pair.rhs != null) {
+ updateTimeWithValue(buf, position, pair.lhs);
+ } else {
+ updateTimeWithNull(buf, position, pair.lhs);
+ }
}
}
@@ -177,6 +227,9 @@ public class DoubleLastAggregatorFactory extends
NullableNumericAggregatorFactor
public Object deserialize(Object object)
{
Map map = (Map) object;
+ if (map.get("rhs") == null) {
+ return new SerializablePair<>(((Number) map.get("lhs")).longValue(),
null);
+ }
return new SerializablePair<>(((Number) map.get("lhs")).longValue(),
((Number) map.get("rhs")).doubleValue());
}
@@ -220,17 +273,15 @@ public class DoubleLastAggregatorFactory extends
NullableNumericAggregatorFactor
@Override
public String getTypeName()
{
-
- if (storeDoubleAsFloat) {
- return "float";
- }
- return "double";
+ // if we don't pretend to be a primitive, group by v1 gets sad and doesn't
work because no complex type serde
+ return storeDoubleAsFloat ? "float" : "double";
}
@Override
public int getMaxIntermediateSize()
{
- return Long.BYTES + Double.BYTES;
+ // timestamp, is null, value
+ return Long.BYTES + Byte.BYTES + Double.BYTES;
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastBufferAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastBufferAggregator.java
index b389b5f..8acddce 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastBufferAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastBufferAggregator.java
@@ -20,79 +20,55 @@
package org.apache.druid.query.aggregation.last;
import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import java.nio.ByteBuffer;
-public class DoubleLastBufferAggregator implements BufferAggregator
+public class DoubleLastBufferAggregator extends
NumericLastBufferAggregator<BaseDoubleColumnValueSelector>
{
- private final BaseLongColumnValueSelector timeSelector;
- private final BaseDoubleColumnValueSelector valueSelector;
-
public DoubleLastBufferAggregator(
BaseLongColumnValueSelector timeSelector,
BaseDoubleColumnValueSelector valueSelector
)
{
- this.timeSelector = timeSelector;
- this.valueSelector = valueSelector;
+ super(timeSelector, valueSelector);
}
@Override
- public void init(ByteBuffer buf, int position)
+ void initValue(ByteBuffer buf, int position)
{
- buf.putLong(position, Long.MIN_VALUE);
- buf.putDouble(position + Long.BYTES, 0);
+ buf.putDouble(position, 0);
}
@Override
- public void aggregate(ByteBuffer buf, int position)
+ void putValue(ByteBuffer buf, int position)
{
- long time = timeSelector.getLong();
- long lastTime = buf.getLong(position);
- if (time >= lastTime) {
- buf.putLong(position, time);
- buf.putDouble(position + Long.BYTES, valueSelector.getDouble());
- }
+ buf.putDouble(position, valueSelector.getDouble());
}
@Override
public Object get(ByteBuffer buf, int position)
{
- return new SerializablePair<>(buf.getLong(position),
buf.getDouble(position + Long.BYTES));
+ final boolean rhsNull = isValueNull(buf, position);
+ return new SerializablePair<>(buf.getLong(position), rhsNull ? null :
buf.getDouble(position + VALUE_OFFSET));
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
- return (float) buf.getDouble(position + Long.BYTES);
+ return (float) buf.getDouble(position + VALUE_OFFSET);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
- return (long) buf.getDouble(position + Long.BYTES);
+ return (long) buf.getDouble(position + VALUE_OFFSET);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
- return buf.getDouble(position + Long.BYTES);
- }
-
- @Override
- public void close()
- {
- // no resources to cleanup
- }
-
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("timeSelector", timeSelector);
- inspector.visit("valueSelector", valueSelector);
+ return buf.getDouble(position + VALUE_OFFSET);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregator.java
index 77e3455..1381ccb 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregator.java
@@ -20,42 +20,29 @@
package org.apache.druid.query.aggregation.last;
import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
-public class FloatLastAggregator implements Aggregator
+public class FloatLastAggregator extends
NumericLastAggregator<BaseFloatColumnValueSelector>
{
-
- private final BaseFloatColumnValueSelector valueSelector;
- private final BaseLongColumnValueSelector timeSelector;
-
- protected long lastTime;
- protected float lastValue;
+ float lastValue;
public FloatLastAggregator(BaseLongColumnValueSelector timeSelector,
BaseFloatColumnValueSelector valueSelector)
{
- this.valueSelector = valueSelector;
- this.timeSelector = timeSelector;
-
- lastTime = Long.MIN_VALUE;
+ super(timeSelector, valueSelector);
lastValue = 0;
}
@Override
- public void aggregate()
+ void setCurrentValue()
{
- long time = timeSelector.getLong();
- if (time >= lastTime) {
- lastTime = time;
- lastValue = valueSelector.getFloat();
- }
+ lastValue = valueSelector.getFloat();
}
@Override
public Object get()
{
- return new SerializablePair<>(lastTime, lastValue);
+ return new SerializablePair<>(lastTime, rhsNull ? null : lastValue);
}
@Override
@@ -73,12 +60,6 @@ public class FloatLastAggregator implements Aggregator
@Override
public double getDouble()
{
- return (double) lastValue;
- }
-
- @Override
- public void close()
- {
-
+ return lastValue;
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java
index 0e42f9f..78024f9 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java
@@ -30,12 +30,13 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
@@ -47,8 +48,31 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class FloatLastAggregatorFactory extends
NullableNumericAggregatorFactory<ColumnValueSelector>
+public class FloatLastAggregatorFactory extends AggregatorFactory
{
+ private static final Aggregator NIL_AGGREGATOR = new FloatLastAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance()
+ )
+ {
+ @Override
+ public void aggregate()
+ {
+ // no-op
+ }
+ };
+
+ private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new
FloatLastBufferAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance()
+ )
+ {
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ // no-op
+ }
+ };
private final String fieldName;
private final String name;
@@ -66,24 +90,31 @@ public class FloatLastAggregatorFactory extends
NullableNumericAggregatorFactory
}
@Override
- protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory)
- {
- return metricFactory.makeColumnValueSelector(fieldName);
- }
-
- @Override
- protected Aggregator factorize(ColumnSelectorFactory metricFactory,
ColumnValueSelector selector)
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
- return new
FloatLastAggregator(metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
selector);
+ final BaseFloatColumnValueSelector valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_AGGREGATOR;
+ } else {
+ return new FloatLastAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector
+ );
+ }
}
@Override
- protected BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory, ColumnValueSelector selector)
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
- return new FloatLastBufferAggregator(
- metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
- selector
- );
+ final BaseFloatColumnValueSelector valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_BUFFER_AGGREGATOR;
+ } else {
+ return new FloatLastBufferAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector
+ );
+ }
}
@Override
@@ -123,35 +154,52 @@ public class FloatLastAggregatorFactory extends
NullableNumericAggregatorFactory
return new FloatLastAggregatorFactory(name, name)
{
@Override
- public Aggregator factorize(ColumnSelectorFactory metricFactory,
ColumnValueSelector selector)
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
+ ColumnValueSelector<SerializablePair<Long, Float>> selector =
metricFactory.makeColumnValueSelector(name);
return new FloatLastAggregator(null, null)
{
@Override
public void aggregate()
{
- SerializablePair<Long, Float> pair = (SerializablePair<Long,
Float>) selector.getObject();
+ SerializablePair<Long, Float> pair = selector.getObject();
if (pair.lhs >= lastTime) {
lastTime = pair.lhs;
- lastValue = pair.rhs;
+ if (pair.rhs != null) {
+ lastValue = pair.rhs;
+ rhsNull = false;
+ } else {
+ rhsNull = true;
+ }
}
}
};
}
@Override
- public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory, ColumnValueSelector selector)
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
+ ColumnValueSelector<SerializablePair<Long, Float>> selector =
metricFactory.makeColumnValueSelector(name);
return new FloatLastBufferAggregator(null, null)
{
@Override
+ public void putValue(ByteBuffer buf, int position)
+ {
+ SerializablePair<Long, Float> pair = selector.getObject();
+ buf.putFloat(position, pair.rhs);
+ }
+
+ @Override
public void aggregate(ByteBuffer buf, int position)
{
- SerializablePair<Long, Float> pair = (SerializablePair<Long,
Float>) selector.getObject();
+ SerializablePair<Long, Float> pair = selector.getObject();
long lastTime = buf.getLong(position);
if (pair.lhs >= lastTime) {
- buf.putLong(position, pair.lhs);
- buf.putFloat(position + Long.BYTES, pair.rhs);
+ if (pair.rhs != null) {
+ updateTimeWithValue(buf, position, pair.lhs);
+ } else {
+ updateTimeWithNull(buf, position, pair.lhs);
+ }
}
}
@@ -175,6 +223,9 @@ public class FloatLastAggregatorFactory extends
NullableNumericAggregatorFactory
public Object deserialize(Object object)
{
Map map = (Map) object;
+ if (map.get("rhs") == null) {
+ return new SerializablePair<>(((Number) map.get("lhs")).longValue(),
null);
+ }
return new SerializablePair<>(((Number) map.get("lhs")).longValue(),
((Number) map.get("rhs")).floatValue());
}
@@ -219,13 +270,15 @@ public class FloatLastAggregatorFactory extends
NullableNumericAggregatorFactory
@Override
public String getTypeName()
{
+ // if we don't pretend to be a primitive, group by v1 gets sad and doesn't
work because no complex type serde
return "float";
}
@Override
public int getMaxIntermediateSize()
{
- return Long.BYTES + Float.BYTES;
+ // timestamp, is null, value
+ return Long.BYTES + Byte.BYTES + Float.BYTES;
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastBufferAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastBufferAggregator.java
index 59d8c0b..95ad6fe 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastBufferAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastBufferAggregator.java
@@ -20,76 +20,55 @@
package org.apache.druid.query.aggregation.last;
import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import java.nio.ByteBuffer;
-public class FloatLastBufferAggregator implements BufferAggregator
+public class FloatLastBufferAggregator extends
NumericLastBufferAggregator<BaseFloatColumnValueSelector>
{
- private final BaseLongColumnValueSelector timeSelector;
- private final BaseFloatColumnValueSelector valueSelector;
-
- public FloatLastBufferAggregator(BaseLongColumnValueSelector timeSelector,
BaseFloatColumnValueSelector valueSelector)
+ public FloatLastBufferAggregator(
+ BaseLongColumnValueSelector timeSelector,
+ BaseFloatColumnValueSelector valueSelector
+ )
{
- this.timeSelector = timeSelector;
- this.valueSelector = valueSelector;
+ super(timeSelector, valueSelector);
}
@Override
- public void init(ByteBuffer buf, int position)
+ void initValue(ByteBuffer buf, int position)
{
- buf.putLong(position, Long.MIN_VALUE);
- buf.putFloat(position + Long.BYTES, 0);
+ buf.putFloat(position, 0);
}
@Override
- public void aggregate(ByteBuffer buf, int position)
+ void putValue(ByteBuffer buf, int position)
{
- long time = timeSelector.getLong();
- long lastTime = buf.getLong(position);
- if (time >= lastTime) {
- buf.putLong(position, time);
- buf.putFloat(position + Long.BYTES, valueSelector.getFloat());
- }
+ buf.putFloat(position, valueSelector.getFloat());
}
@Override
public Object get(ByteBuffer buf, int position)
{
- return new SerializablePair<>(buf.getLong(position), buf.getFloat(position
+ Long.BYTES));
+ final boolean rhsNull = isValueNull(buf, position);
+ return new SerializablePair<>(buf.getLong(position), rhsNull ? null :
buf.getFloat(position + VALUE_OFFSET));
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
- return buf.getFloat(position + Long.BYTES);
+ return buf.getFloat(position + VALUE_OFFSET);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
- return (long) buf.getFloat(position + Long.BYTES);
+ return (long) buf.getFloat(position + VALUE_OFFSET);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
- return (double) buf.getFloat(position + Long.BYTES);
- }
-
- @Override
- public void close()
- {
- // no resources to cleanup
- }
-
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("timeSelector", timeSelector);
- inspector.visit("valueSelector", valueSelector);
+ return buf.getFloat(position + VALUE_OFFSET);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregator.java
index b44371f..59a159d 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregator.java
@@ -20,46 +20,28 @@
package org.apache.druid.query.aggregation.last;
import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseLongColumnValueSelector;
-public class LongLastAggregator implements Aggregator
+public class LongLastAggregator extends
NumericLastAggregator<BaseLongColumnValueSelector>
{
- private final BaseLongColumnValueSelector valueSelector;
- private final BaseLongColumnValueSelector timeSelector;
-
- protected long lastTime;
- protected long lastValue;
+ long lastValue;
public LongLastAggregator(BaseLongColumnValueSelector timeSelector,
BaseLongColumnValueSelector valueSelector)
{
- this.valueSelector = valueSelector;
- this.timeSelector = timeSelector;
-
- lastTime = Long.MIN_VALUE;
+ super(timeSelector, valueSelector);
lastValue = 0;
}
@Override
- public void aggregate()
- {
- long time = timeSelector.getLong();
- if (time >= lastTime) {
- lastTime = time;
- lastValue = valueSelector.getLong();
- }
- }
-
- @Override
- public double getDouble()
+ void setCurrentValue()
{
- return (double) lastValue;
+ lastValue = valueSelector.getLong();
}
@Override
public Object get()
{
- return new SerializablePair<>(lastTime, lastValue);
+ return new SerializablePair<>(lastTime, rhsNull ? null : lastValue);
}
@Override
@@ -69,14 +51,14 @@ public class LongLastAggregator implements Aggregator
}
@Override
- public long getLong()
+ public double getDouble()
{
- return lastValue;
+ return (double) lastValue;
}
@Override
- public void close()
+ public long getLong()
{
-
+ return lastValue;
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java
index 4a00a63..3cc333f 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java
@@ -30,11 +30,12 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
@@ -46,8 +47,32 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class LongLastAggregatorFactory extends
NullableNumericAggregatorFactory<ColumnValueSelector>
+public class LongLastAggregatorFactory extends AggregatorFactory
{
+ private static final Aggregator NIL_AGGREGATOR = new LongLastAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance()
+ )
+ {
+ @Override
+ public void aggregate()
+ {
+ // no-op
+ }
+ };
+
+ private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new
LongLastBufferAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance()
+ )
+ {
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ // no-op
+ }
+ };
+
private final String fieldName;
private final String name;
@@ -64,21 +89,31 @@ public class LongLastAggregatorFactory extends
NullableNumericAggregatorFactory<
}
@Override
- protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory)
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
- return metricFactory.makeColumnValueSelector(fieldName);
- }
-
- @Override
- protected Aggregator factorize(ColumnSelectorFactory metricFactory,
ColumnValueSelector selector)
- {
- return new
LongLastAggregator(metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
selector);
+ final BaseLongColumnValueSelector valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_AGGREGATOR;
+ } else {
+ return new LongLastAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector
+ );
+ }
}
@Override
- protected BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory, ColumnValueSelector selector)
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
- return new
LongLastBufferAggregator(metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
selector);
+ final BaseLongColumnValueSelector valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_BUFFER_AGGREGATOR;
+ } else {
+ return new LongLastBufferAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector
+ );
+ }
}
@Override
@@ -118,35 +153,52 @@ public class LongLastAggregatorFactory extends
NullableNumericAggregatorFactory<
return new LongLastAggregatorFactory(name, name)
{
@Override
- public Aggregator factorize(ColumnSelectorFactory metricFactory,
ColumnValueSelector selector)
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
+ final ColumnValueSelector<SerializablePair<Long, Long>> selector =
metricFactory.makeColumnValueSelector(name);
return new LongLastAggregator(null, null)
{
@Override
public void aggregate()
{
- SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>)
selector.getObject();
+ SerializablePair<Long, Long> pair = selector.getObject();
if (pair.lhs >= lastTime) {
lastTime = pair.lhs;
- lastValue = pair.rhs;
+ if (pair.rhs != null) {
+ lastValue = pair.rhs;
+ rhsNull = false;
+ } else {
+ rhsNull = true;
+ }
}
}
};
}
@Override
- public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory, ColumnValueSelector selector)
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
+ final ColumnValueSelector<SerializablePair<Long, Long>> selector =
metricFactory.makeColumnValueSelector(name);
return new LongLastBufferAggregator(null, null)
{
@Override
+ public void putValue(ByteBuffer buf, int position)
+ {
+ SerializablePair<Long, Long> pair = selector.getObject();
+ buf.putLong(position, pair.rhs);
+ }
+
+ @Override
public void aggregate(ByteBuffer buf, int position)
{
- SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>)
selector.getObject();
+ SerializablePair<Long, Long> pair = selector.getObject();
long lastTime = buf.getLong(position);
if (pair.lhs >= lastTime) {
- buf.putLong(position, pair.lhs);
- buf.putLong(position + Long.BYTES, pair.rhs);
+ if (pair.rhs != null) {
+ updateTimeWithValue(buf, position, pair.lhs);
+ } else {
+ updateTimeWithNull(buf, position, pair.lhs);
+ }
}
}
@@ -170,6 +222,9 @@ public class LongLastAggregatorFactory extends
NullableNumericAggregatorFactory<
public Object deserialize(Object object)
{
Map map = (Map) object;
+ if (map.get("rhs") == null) {
+ return new SerializablePair<>(((Number) map.get("lhs")).longValue(),
null);
+ }
return new SerializablePair<>(((Number) map.get("lhs")).longValue(),
((Number) map.get("rhs")).longValue());
}
@@ -213,13 +268,15 @@ public class LongLastAggregatorFactory extends
NullableNumericAggregatorFactory<
@Override
public String getTypeName()
{
+ // if we don't pretend to be a primitive, group by v1 gets sad and doesn't
work because no complex type serde
return "long";
}
@Override
public int getMaxIntermediateSize()
{
- return Long.BYTES * 2;
+ // timestamp, is null, value
+ return Long.BYTES + Byte.BYTES + Long.BYTES;
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastBufferAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastBufferAggregator.java
index 490c1e4..981ba3e 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastBufferAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastBufferAggregator.java
@@ -20,75 +20,51 @@
package org.apache.druid.query.aggregation.last;
import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import java.nio.ByteBuffer;
-public class LongLastBufferAggregator implements BufferAggregator
+public class LongLastBufferAggregator extends
NumericLastBufferAggregator<BaseLongColumnValueSelector>
{
- private final BaseLongColumnValueSelector timeSelector;
- private final BaseLongColumnValueSelector valueSelector;
-
public LongLastBufferAggregator(BaseLongColumnValueSelector timeSelector,
BaseLongColumnValueSelector valueSelector)
{
- this.timeSelector = timeSelector;
- this.valueSelector = valueSelector;
+ super(timeSelector, valueSelector);
}
@Override
- public void init(ByteBuffer buf, int position)
+ void initValue(ByteBuffer buf, int position)
{
- buf.putLong(position, Long.MIN_VALUE);
- buf.putLong(position + Long.BYTES, 0);
+ buf.putLong(position, 0);
}
@Override
- public void aggregate(ByteBuffer buf, int position)
+ void putValue(ByteBuffer buf, int position)
{
- long time = timeSelector.getLong();
- long lastTime = buf.getLong(position);
- if (time >= lastTime) {
- buf.putLong(position, time);
- buf.putLong(position + Long.BYTES, valueSelector.getLong());
- }
+ buf.putLong(position, valueSelector.getLong());
}
@Override
public Object get(ByteBuffer buf, int position)
{
- return new SerializablePair<>(buf.getLong(position), buf.getLong(position
+ Long.BYTES));
+ boolean rhsNull = isValueNull(buf, position);
+ return new SerializablePair<>(buf.getLong(position), rhsNull ? null :
buf.getLong(position + VALUE_OFFSET));
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
- return (float) buf.getLong(position + Long.BYTES);
+ return (float) buf.getLong(position + VALUE_OFFSET);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
- return buf.getLong(position + Long.BYTES);
+ return buf.getLong(position + VALUE_OFFSET);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
- return buf.getLong(position + Long.BYTES);
- }
-
- @Override
- public void close()
- {
- // no resources to cleanup
- }
-
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("timeSelector", timeSelector);
- inspector.visit("valueSelector", valueSelector);
+ return buf.getLong(position + VALUE_OFFSET);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
similarity index 57%
copy from
processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregator.java
copy to
processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
index d6678d7..6506f97 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
@@ -19,27 +19,32 @@
package org.apache.druid.query.aggregation.last;
-import org.apache.druid.collections.SerializablePair;
+import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.Aggregator;
-import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
+import org.apache.druid.segment.BaseNullableColumnValueSelector;
-public class DoubleLastAggregator implements Aggregator
+/**
+ * Base type for on heap 'last' aggregator for primitive numeric column
selectors..
+ *
+ * This could probably share a base class with {@link
org.apache.druid.query.aggregation.first.NumericFirstAggregator}
+ */
+public abstract class NumericLastAggregator<TSelector extends
BaseNullableColumnValueSelector> implements Aggregator
{
-
- private final BaseDoubleColumnValueSelector valueSelector;
+ private final boolean useDefault = NullHandling.replaceWithDefault();
private final BaseLongColumnValueSelector timeSelector;
- protected long lastTime;
- protected double lastValue;
+ final TSelector valueSelector;
+ long lastTime;
+ boolean rhsNull;
- public DoubleLastAggregator(BaseLongColumnValueSelector timeSelector,
BaseDoubleColumnValueSelector valueSelector)
+ public NumericLastAggregator(BaseLongColumnValueSelector timeSelector,
TSelector valueSelector)
{
- this.valueSelector = valueSelector;
this.timeSelector = timeSelector;
+ this.valueSelector = valueSelector;
lastTime = Long.MIN_VALUE;
- lastValue = 0;
+ rhsNull = !useDefault;
}
@Override
@@ -48,37 +53,23 @@ public class DoubleLastAggregator implements Aggregator
long time = timeSelector.getLong();
if (time >= lastTime) {
lastTime = time;
- lastValue = valueSelector.getDouble();
+ if (useDefault || !valueSelector.isNull()) {
+ setCurrentValue();
+ rhsNull = false;
+ } else {
+ rhsNull = true;
+ }
}
}
@Override
- public Object get()
- {
- return new SerializablePair<>(lastTime, lastValue);
- }
-
- @Override
- public float getFloat()
- {
- return (float) lastValue;
- }
-
- @Override
- public long getLong()
- {
- return (long) lastValue;
- }
-
- @Override
- public double getDouble()
- {
- return lastValue;
- }
-
- @Override
public void close()
{
-
+ // nothing to close
}
+
+ /**
+ * Store the current primitive typed 'first' value
+ */
+ abstract void setCurrentValue();
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
new file mode 100644
index 0000000..7c90aad
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.last;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.BaseLongColumnValueSelector;
+import org.apache.druid.segment.BaseNullableColumnValueSelector;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Base type for buffer based 'last' aggregator for primitive numeric column
selectors
+ *
+ * This could probably share a base type with
+ * {@link
org.apache.druid.query.aggregation.first.NumericFirstBufferAggregator} ...
+ */
+public abstract class NumericLastBufferAggregator<TSelector extends
BaseNullableColumnValueSelector>
+ implements BufferAggregator
+{
+ static final int NULL_OFFSET = Long.BYTES;
+ static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES;
+
+ private final boolean useDefault = NullHandling.replaceWithDefault();
+ private final BaseLongColumnValueSelector timeSelector;
+
+ final TSelector valueSelector;
+
+ public NumericLastBufferAggregator(BaseLongColumnValueSelector timeSelector,
TSelector valueSelector)
+ {
+ this.timeSelector = timeSelector;
+ this.valueSelector = valueSelector;
+ }
+
+ /**
+ * Initialize the buffer value at the position of {@link #VALUE_OFFSET}
+ */
+ abstract void initValue(ByteBuffer buf, int position);
+
+ /**
+ * Place the primitive value in the buffer at the position of {@link
#VALUE_OFFSET}
+ */
+ abstract void putValue(ByteBuffer buf, int position);
+
+ boolean isValueNull(ByteBuffer buf, int position)
+ {
+ return buf.get(position + NULL_OFFSET) == NullHandling.IS_NULL_BYTE;
+ }
+
+ void updateTimeWithValue(ByteBuffer buf, int position, long time)
+ {
+ buf.putLong(position, time);
+ buf.put(position + NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
+ putValue(buf, position + VALUE_OFFSET);
+ }
+
+ void updateTimeWithNull(ByteBuffer buf, int position, long time)
+ {
+ buf.putLong(position, time);
+ buf.put(position + NULL_OFFSET, NullHandling.IS_NULL_BYTE);
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ buf.putLong(position, Long.MIN_VALUE);
+ buf.put(position + NULL_OFFSET, useDefault ? NullHandling.IS_NOT_NULL_BYTE
: NullHandling.IS_NULL_BYTE);
+ initValue(buf, position + VALUE_OFFSET);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ long time = timeSelector.getLong();
+ long lastTime = buf.getLong(position);
+ if (time >= lastTime) {
+ if (useDefault || !valueSelector.isNull()) {
+ updateTimeWithValue(buf, position, time);
+ } else {
+ updateTimeWithNull(buf, position, time);
+ }
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ // no resources to cleanup
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ inspector.visit("timeSelector", timeSelector);
+ inspector.visit("valueSelector", valueSelector);
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
index ea37ff4..0c5fe33 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
@@ -65,7 +65,7 @@ public class StringLastAggregator implements Aggregator
valueSelector
);
- if (inPair != null && inPair.rhs != null && inPair.lhs >= lastTime) {
+ if (inPair != null && inPair.lhs >= lastTime) {
lastTime = inPair.lhs;
lastValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes);
}
@@ -74,11 +74,8 @@ public class StringLastAggregator implements Aggregator
if (time >= lastTime) {
final String value =
DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
-
- if (value != null) {
- lastTime = time;
- lastValue = StringUtils.fastLooseChop(value, maxStringBytes);
- }
+ lastTime = time;
+ lastValue = StringUtils.fastLooseChop(value, maxStringBytes);
}
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
index 9a3264f..e97f8b5 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
@@ -36,9 +36,11 @@ import
org.apache.druid.query.aggregation.first.StringFirstLastUtils;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -48,6 +50,34 @@ import java.util.Objects;
@JsonTypeName("stringLast")
public class StringLastAggregatorFactory extends AggregatorFactory
{
+ private static final Aggregator NIL_AGGREGATOR = new StringLastAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance(),
+ 0,
+ false
+ )
+ {
+ @Override
+ public void aggregate()
+ {
+ // no-op
+ }
+ };
+
+ private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new
StringLastBufferAggregator(
+ NilColumnValueSelector.instance(),
+ NilColumnValueSelector.instance(),
+ 0,
+ false
+ )
+ {
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ // no-op
+ }
+ };
+
private final String fieldName;
private final String name;
protected final int maxStringBytes;
@@ -77,24 +107,32 @@ public class StringLastAggregatorFactory extends
AggregatorFactory
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final BaseObjectColumnValueSelector<?> valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
- return new StringLastAggregator(
- metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
- valueSelector,
- maxStringBytes,
- StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector,
metricFactory.getColumnCapabilities(fieldName))
- );
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_AGGREGATOR;
+ } else {
+ return new StringLastAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector,
+ maxStringBytes,
+ StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector,
metricFactory.getColumnCapabilities(fieldName))
+ );
+ }
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory
metricFactory)
{
final BaseObjectColumnValueSelector<?> valueSelector =
metricFactory.makeColumnValueSelector(fieldName);
- return new StringLastBufferAggregator(
- metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
- valueSelector,
- maxStringBytes,
- StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector,
metricFactory.getColumnCapabilities(fieldName))
- );
+ if (valueSelector instanceof NilColumnValueSelector) {
+ return NIL_BUFFER_AGGREGATOR;
+ } else {
+ return new StringLastBufferAggregator(
+ metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+ valueSelector,
+ maxStringBytes,
+ StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector,
metricFactory.getColumnCapabilities(fieldName))
+ );
+ }
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
index 09e3276..9da9852 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
@@ -72,7 +72,7 @@ public class StringLastBufferAggregator implements
BufferAggregator
valueSelector
);
- if (inPair != null && inPair.rhs != null) {
+ if (inPair != null) {
final long lastTime = buf.getLong(position);
if (inPair.lhs >= lastTime) {
StringFirstLastUtils.writePair(
@@ -90,14 +90,12 @@ public class StringLastBufferAggregator implements
BufferAggregator
if (time >= lastTime) {
final String value =
DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
- if (value != null) {
- StringFirstLastUtils.writePair(
- buf,
- position,
- new SerializablePairLongString(time, value),
- maxStringBytes
- );
- }
+ StringFirstLastUtils.writePair(
+ buf,
+ position,
+ new SerializablePairLongString(time, value),
+ maxStringBytes
+ );
}
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java
index 19ceaa5..c85b6b1 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java
@@ -30,14 +30,16 @@ import
org.apache.druid.query.aggregation.TestLongColumnSelector;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.util.Comparator;
-public class DoubleFirstAggregationTest
+public class DoubleFirstAggregationTest extends InitializedNullHandlingTest
{
private DoubleFirstAggregatorFactory doubleFirstAggFactory;
private DoubleFirstAggregatorFactory combiningAggFactory;
@@ -119,6 +121,31 @@ public class DoubleFirstAggregationTest
}
@Test
+ public void testComparator()
+ {
+ SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621);
+ SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4);
+ Comparator comparator = doubleFirstAggFactory.getComparator();
+ Assert.assertEquals(-1, comparator.compare(pair1, pair2));
+ Assert.assertEquals(0, comparator.compare(pair1, pair1));
+ Assert.assertEquals(0, comparator.compare(pair2, pair2));
+ Assert.assertEquals(1, comparator.compare(pair2, pair1));
+ }
+
+ @Test
+ public void testComparatorWithNulls()
+ {
+ SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621);
+ SerializablePair pair2 = new SerializablePair<>(1467240000L, null);
+ Comparator comparator = doubleFirstAggFactory.getComparator();
+ Assert.assertEquals(1, comparator.compare(pair1, pair2));
+ Assert.assertEquals(0, comparator.compare(pair1, pair1));
+ Assert.assertEquals(0, comparator.compare(pair2, pair2));
+ Assert.assertEquals(-1, comparator.compare(pair2, pair1));
+ }
+
+
+ @Test
public void testDoubleFirstCombiningAggregator()
{
Aggregator agg = combiningAggFactory.factorize(colSelectorFactory);
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java
index 1e9721d..b9b37f8 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java
@@ -30,14 +30,16 @@ import
org.apache.druid.query.aggregation.TestLongColumnSelector;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.util.Comparator;
-public class FloatFirstAggregationTest
+public class FloatFirstAggregationTest extends InitializedNullHandlingTest
{
private FloatFirstAggregatorFactory floatFirstAggregatorFactory;
private FloatFirstAggregatorFactory combiningAggFactory;
@@ -65,8 +67,8 @@ public class FloatFirstAggregationTest
objectSelector = new TestObjectColumnSelector<>(pairs);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
-
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
-
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
+
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector).atLeastOnce();
+
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector).atLeastOnce();
EasyMock.replay(colSelectorFactory);
}
@@ -113,12 +115,24 @@ public class FloatFirstAggregationTest
@Test
public void testCombine()
{
- SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621);
- SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4);
+ SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621f);
+ SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4f);
Assert.assertEquals(pair1, floatFirstAggregatorFactory.combine(pair1,
pair2));
}
@Test
+ public void testComparatorWithNulls()
+ {
+ SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621f);
+ SerializablePair pair2 = new SerializablePair<>(1467240000L, null);
+ Comparator comparator = floatFirstAggregatorFactory.getComparator();
+ Assert.assertEquals(1, comparator.compare(pair1, pair2));
+ Assert.assertEquals(0, comparator.compare(pair1, pair1));
+ Assert.assertEquals(0, comparator.compare(pair2, pair2));
+ Assert.assertEquals(-1, comparator.compare(pair2, pair1));
+ }
+
+ @Test
public void testDoubleFirstCombiningAggregator()
{
Aggregator agg = combiningAggFactory.factorize(colSelectorFactory);
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java
index 4f3df59..7fe9256 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java
@@ -29,14 +29,16 @@ import
org.apache.druid.query.aggregation.TestLongColumnSelector;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.util.Comparator;
-public class LongFirstAggregationTest
+public class LongFirstAggregationTest extends InitializedNullHandlingTest
{
private LongFirstAggregatorFactory longFirstAggFactory;
private LongFirstAggregatorFactory combiningAggFactory;
@@ -118,6 +120,18 @@ public class LongFirstAggregationTest
}
@Test
+ public void testComparatorWithNulls()
+ {
+ SerializablePair pair1 = new SerializablePair<>(1467225000L, 1263L);
+ SerializablePair pair2 = new SerializablePair<>(1467240000L, null);
+ Comparator comparator = longFirstAggFactory.getComparator();
+ Assert.assertEquals(1, comparator.compare(pair1, pair2));
+ Assert.assertEquals(0, comparator.compare(pair1, pair1));
+ Assert.assertEquals(0, comparator.compare(pair2, pair2));
+ Assert.assertEquals(-1, comparator.compare(pair2, pair1));
+ }
+
+ @Test
public void testLongFirstCombiningAggregator()
{
Aggregator agg = combiningAggFactory.factorize(colSelectorFactory);
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
index 3b4ef69..8d88677 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
@@ -189,7 +189,7 @@ public class StringFirstBufferAggregatorTest
SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf,
position));
- Assert.assertEquals(1526724600L, (long) sp.lhs);
- Assert.assertEquals("2.0", sp.rhs);
+ Assert.assertEquals(1526724000L, (long) sp.lhs);
+ Assert.assertEquals(null, sp.rhs);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
index 3ae8384..445c6c9 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
@@ -43,6 +43,7 @@ import
org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.serde.ComplexMetrics;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;
@@ -50,7 +51,7 @@ import org.junit.Test;
import java.util.Collections;
import java.util.List;
-public class StringFirstTimeseriesQueryTest
+public class StringFirstTimeseriesQueryTest extends InitializedNullHandlingTest
{
private static final String VISITOR_ID = "visitor_id";
private static final String CLIENT_TYPE = "client_type";
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java
index 21d6cad..847194f 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java
@@ -30,14 +30,16 @@ import
org.apache.druid.query.aggregation.TestLongColumnSelector;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.util.Comparator;
-public class DoubleLastAggregationTest
+public class DoubleLastAggregationTest extends InitializedNullHandlingTest
{
private DoubleLastAggregatorFactory doubleLastAggFactory;
private DoubleLastAggregatorFactory combiningAggFactory;
@@ -119,6 +121,18 @@ public class DoubleLastAggregationTest
}
@Test
+ public void testComparatorWithNulls()
+ {
+ SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621);
+ SerializablePair pair2 = new SerializablePair<>(1467240000L, null);
+ Comparator comparator = doubleLastAggFactory.getComparator();
+ Assert.assertEquals(1, comparator.compare(pair1, pair2));
+ Assert.assertEquals(0, comparator.compare(pair1, pair1));
+ Assert.assertEquals(0, comparator.compare(pair2, pair2));
+ Assert.assertEquals(-1, comparator.compare(pair2, pair1));
+ }
+
+ @Test
public void testDoubleLastCombiningAggregator()
{
Aggregator agg = combiningAggFactory.factorize(colSelectorFactory);
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java
index f094ecf..86b6a99 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java
@@ -30,14 +30,16 @@ import
org.apache.druid.query.aggregation.TestLongColumnSelector;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.util.Comparator;
-public class FloatLastAggregationTest
+public class FloatLastAggregationTest extends InitializedNullHandlingTest
{
private FloatLastAggregatorFactory floatLastAggregatorFactory;
private FloatLastAggregatorFactory combiningAggFactory;
@@ -113,12 +115,24 @@ public class FloatLastAggregationTest
@Test
public void testCombine()
{
- SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621);
- SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4);
+ SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621f);
+ SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4f);
Assert.assertEquals(pair2, floatLastAggregatorFactory.combine(pair1,
pair2));
}
@Test
+ public void testComparatorWithNulls()
+ {
+ SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621f);
+ SerializablePair pair2 = new SerializablePair<>(1467240000L, null);
+ Comparator comparator = floatLastAggregatorFactory.getComparator();
+ Assert.assertEquals(1, comparator.compare(pair1, pair2));
+ Assert.assertEquals(0, comparator.compare(pair1, pair1));
+ Assert.assertEquals(0, comparator.compare(pair2, pair2));
+ Assert.assertEquals(-1, comparator.compare(pair2, pair1));
+ }
+
+ @Test
public void testDoubleLastCombiningAggregator()
{
Aggregator agg = combiningAggFactory.factorize(colSelectorFactory);
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java
index 1324bee..a9b2fad 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java
@@ -29,14 +29,16 @@ import
org.apache.druid.query.aggregation.TestLongColumnSelector;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.util.Comparator;
-public class LongLastAggregationTest
+public class LongLastAggregationTest extends InitializedNullHandlingTest
{
private LongLastAggregatorFactory longLastAggFactory;
private LongLastAggregatorFactory combiningAggFactory;
@@ -118,6 +120,18 @@ public class LongLastAggregationTest
}
@Test
+ public void testComparatorWithNulls()
+ {
+ SerializablePair pair1 = new SerializablePair<>(1467225000L, 1263L);
+ SerializablePair pair2 = new SerializablePair<>(1467240000L, null);
+ Comparator comparator = longLastAggFactory.getComparator();
+ Assert.assertEquals(1, comparator.compare(pair1, pair2));
+ Assert.assertEquals(0, comparator.compare(pair1, pair1));
+ Assert.assertEquals(0, comparator.compare(pair2, pair2));
+ Assert.assertEquals(-1, comparator.compare(pair2, pair1));
+ }
+
+ @Test
public void testLongLastCombiningAggregator()
{
Aggregator agg = combiningAggFactory.factorize(colSelectorFactory);
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 1cc2161..a94ef2b 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -1254,7 +1254,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.build()
),
ImmutableList.of(
- new Object[]{1L, 1.0f, NullHandling.sqlCompatible() ? "" : "10.1",
2L, 2.0f, "1"}
+ new Object[]{1L, 1.0f, "", 2L, 2.0f, "1"}
)
);
}
@@ -1465,12 +1465,407 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
.build()
),
ImmutableList.of(
- new Object[]{NullHandling.sqlCompatible() ? 12.1 : 11.1}
+ // default mode subquery results:
+ //[, 10.1]
+ //[a, ]
+ //[abc, def]
+ // sql compatible mode subquery results:
+ //[null, 10.1]
+ //[, 2]
+ //[a, ]
+ //[abc, def]
+ new Object[]{NullHandling.sqlCompatible() ? 12.1 : 10.1}
)
);
}
@Test
+ public void testEarliestAggregatorsNumericNulls() throws Exception
+ {
+ // Cannot vectorize EARLIEST aggregator.
+ skipVectorize();
+
+ testQuery(
+ "SELECT EARLIEST(l1), EARLIEST(d1), EARLIEST(f1) FROM druid.numfoo",
+ ImmutableList.of(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE3)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.ALL)
+ .aggregators(
+ aggregators(
+ new LongFirstAggregatorFactory("a0", "l1"),
+ new DoubleFirstAggregatorFactory("a1", "d1"),
+ new FloatFirstAggregatorFactory("a2", "f1")
+ )
+ )
+ .context(TIMESERIES_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{7L, 1.0, 1.0f}
+ )
+ );
+ }
+
+ @Test
+ public void testLatestAggregatorsNumericNull() throws Exception
+ {
+ // Cannot vectorize LATEST aggregator.
+ skipVectorize();
+
+ testQuery(
+ "SELECT LATEST(l1), LATEST(d1), LATEST(f1) FROM druid.numfoo",
+ ImmutableList.of(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE3)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.ALL)
+ .aggregators(
+ aggregators(
+ new LongLastAggregatorFactory("a0", "l1"),
+ new DoubleLastAggregatorFactory("a1", "d1"),
+ new FloatLastAggregatorFactory("a2", "f1")
+ )
+ )
+ .context(TIMESERIES_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{NullHandling.defaultLongValue(),
NullHandling.defaultDoubleValue(), NullHandling.defaultFloatValue()}
+ )
+ );
+ }
+
+ @Test
+ public void testFirstLatestAggregatorsSkipNulls() throws Exception
+ {
+ // Cannot vectorize LATEST aggregator.
+ skipVectorize();
+
+ final DimFilter filter;
+ if (useDefault) {
+ filter = not(selector("dim1", null, null));
+ } else {
+ filter = and(
+ not(selector("dim1", null, null)),
+ not(selector("l1", null, null)),
+ not(selector("d1", null, null)),
+ not(selector("f1", null, null))
+ );
+ }
+ testQuery(
+ "SELECT EARLIEST(dim1, 32), LATEST(l1), LATEST(d1), LATEST(f1) "
+ + "FROM druid.numfoo "
+ + "WHERE dim1 IS NOT NULL AND l1 IS NOT NULL AND d1 IS NOT NULL AND f1
is NOT NULL",
+ ImmutableList.of(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE3)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.ALL)
+ .filters(filter)
+ .aggregators(
+ aggregators(
+ new StringFirstAggregatorFactory("a0", "dim1", 32),
+ new LongLastAggregatorFactory("a1", "l1"),
+ new DoubleLastAggregatorFactory("a2", "d1"),
+ new FloatLastAggregatorFactory("a3", "f1")
+ )
+ )
+ .context(TIMESERIES_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ // first row of dim1 is empty string, which is null in default
mode, last non-null numeric rows are zeros
+ new Object[]{useDefault ? "10.1" : "", 0L, 0.0, 0.0f}
+ )
+ );
+ }
+
+ @Test
+ public void testOrderByEarliestFloat() throws Exception
+ {
+ // Cannot vectorize EARLIEST aggregator.
+ skipVectorize();
+ List<Object[]> expected;
+ if (NullHandling.replaceWithDefault()) {
+ expected = ImmutableList.of(
+ new Object[]{"1", 0.0f},
+ new Object[]{"2", 0.0f},
+ new Object[]{"abc", 0.0f},
+ new Object[]{"def", 0.0f},
+ new Object[]{"10.1", 0.1f},
+ new Object[]{"", 1.0f}
+ );
+ } else {
+ expected = ImmutableList.of(
+ new Object[]{"1", null},
+ new Object[]{"abc", null},
+ new Object[]{"def", null},
+ new Object[]{"2", 0.0f},
+ new Object[]{"10.1", 0.1f},
+ new Object[]{"", 1.0f}
+ );
+ }
+ testQuery(
+ "SELECT dim1, EARLIEST(f1) FROM druid.numfoo GROUP BY 1 ORDER BY 2
LIMIT 10",
+ ImmutableList.of(
+ new TopNQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE3)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.ALL)
+ .dimension(new DefaultDimensionSpec("dim1", "_d0"))
+ .aggregators(
+ aggregators(
+ new FloatFirstAggregatorFactory("a0", "f1")
+ )
+ )
+ .metric(new InvertedTopNMetricSpec(new
NumericTopNMetricSpec("a0")))
+ .threshold(10)
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ expected
+ );
+ }
+
+ @Test
+ public void testOrderByEarliestDouble() throws Exception
+ {
+ // Cannot vectorize EARLIEST aggregator.
+ skipVectorize();
+ List<Object[]> expected;
+ if (NullHandling.replaceWithDefault()) {
+ expected = ImmutableList.of(
+ new Object[]{"1", 0.0},
+ new Object[]{"2", 0.0},
+ new Object[]{"abc", 0.0},
+ new Object[]{"def", 0.0},
+ new Object[]{"", 1.0},
+ new Object[]{"10.1", 1.7}
+ );
+ } else {
+ expected = ImmutableList.of(
+ new Object[]{"1", null},
+ new Object[]{"abc", null},
+ new Object[]{"def", null},
+ new Object[]{"2", 0.0},
+ new Object[]{"", 1.0},
+ new Object[]{"10.1", 1.7}
+ );
+ }
+ testQuery(
+ "SELECT dim1, EARLIEST(d1) FROM druid.numfoo GROUP BY 1 ORDER BY 2
LIMIT 10",
+ ImmutableList.of(
+ new TopNQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE3)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.ALL)
+ .dimension(new DefaultDimensionSpec("dim1", "_d0"))
+ .aggregators(
+ aggregators(
+ new DoubleFirstAggregatorFactory("a0", "d1")
+ )
+ )
+ .metric(new InvertedTopNMetricSpec(new
NumericTopNMetricSpec("a0")))
+ .threshold(10)
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ expected
+ );
+ }
+
+ @Test
+ public void testOrderByEarliestLong() throws Exception
+ {
+ // Cannot vectorize EARLIEST aggregator.
+ skipVectorize();
+ List<Object[]> expected;
+ if (NullHandling.replaceWithDefault()) {
+ expected = ImmutableList.of(
+ new Object[]{"1", 0L},
+ new Object[]{"2", 0L},
+ new Object[]{"abc", 0L},
+ new Object[]{"def", 0L},
+ new Object[]{"", 7L},
+ new Object[]{"10.1", 325323L}
+ );
+ } else {
+ expected = ImmutableList.of(
+ new Object[]{"1", null},
+ new Object[]{"abc", null},
+ new Object[]{"def", null},
+ new Object[]{"2", 0L},
+ new Object[]{"", 7L},
+ new Object[]{"10.1", 325323L}
+ );
+ }
+ testQuery(
+ "SELECT dim1, EARLIEST(l1) FROM druid.numfoo GROUP BY 1 ORDER BY 2
LIMIT 10",
+ ImmutableList.of(
+ new TopNQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE3)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.ALL)
+ .dimension(new DefaultDimensionSpec("dim1", "_d0"))
+ .aggregators(
+ aggregators(
+ new LongFirstAggregatorFactory("a0", "l1")
+ )
+ )
+ .metric(new InvertedTopNMetricSpec(new
NumericTopNMetricSpec("a0")))
+ .threshold(10)
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ expected
+ );
+ }
+
+ @Test
+ public void testOrderByLatestFloat() throws Exception
+ {
+ // Cannot vectorize LATEST aggregator.
+ skipVectorize();
+ List<Object[]> expected;
+ if (NullHandling.replaceWithDefault()) {
+ expected = ImmutableList.of(
+ new Object[]{"1", 0.0f},
+ new Object[]{"2", 0.0f},
+ new Object[]{"abc", 0.0f},
+ new Object[]{"def", 0.0f},
+ new Object[]{"10.1", 0.1f},
+ new Object[]{"", 1.0f}
+ );
+ } else {
+ expected = ImmutableList.of(
+ new Object[]{"1", null},
+ new Object[]{"abc", null},
+ new Object[]{"def", null},
+ new Object[]{"2", 0.0f},
+ new Object[]{"10.1", 0.1f},
+ new Object[]{"", 1.0f}
+ );
+ }
+
+ testQuery(
+ "SELECT dim1, LATEST(f1) FROM druid.numfoo GROUP BY 1 ORDER BY 2 LIMIT
10",
+ ImmutableList.of(
+ new TopNQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE3)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.ALL)
+ .dimension(new DefaultDimensionSpec("dim1", "_d0"))
+ .aggregators(
+ aggregators(
+ new FloatLastAggregatorFactory("a0", "f1")
+ )
+ )
+ .metric(new InvertedTopNMetricSpec(new
NumericTopNMetricSpec("a0")))
+ .threshold(10)
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ expected
+ );
+ }
+
+ @Test
+ public void testOrderByLatestDouble() throws Exception
+ {
+ // Cannot vectorize LATEST aggregator.
+ skipVectorize();
+ List<Object[]> expected;
+ if (NullHandling.replaceWithDefault()) {
+ expected = ImmutableList.of(
+ new Object[]{"1", 0.0},
+ new Object[]{"2", 0.0},
+ new Object[]{"abc", 0.0},
+ new Object[]{"def", 0.0},
+ new Object[]{"", 1.0},
+ new Object[]{"10.1", 1.7}
+ );
+ } else {
+ expected = ImmutableList.of(
+ new Object[]{"1", null},
+ new Object[]{"abc", null},
+ new Object[]{"def", null},
+ new Object[]{"2", 0.0},
+ new Object[]{"", 1.0},
+ new Object[]{"10.1", 1.7}
+ );
+ }
+ testQuery(
+ "SELECT dim1, LATEST(d1) FROM druid.numfoo GROUP BY 1 ORDER BY 2 LIMIT
10",
+ ImmutableList.of(
+ new TopNQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE3)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.ALL)
+ .dimension(new DefaultDimensionSpec("dim1", "_d0"))
+ .aggregators(
+ aggregators(
+ new DoubleLastAggregatorFactory("a0", "d1")
+ )
+ )
+ .metric(new InvertedTopNMetricSpec(new
NumericTopNMetricSpec("a0")))
+ .threshold(10)
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ expected
+ );
+ }
+
+ @Test
+ public void testOrderByLatestLong() throws Exception
+ {
+ // Cannot vectorize LATEST aggregator.
+ skipVectorize();
+ List<Object[]> expected;
+ if (NullHandling.replaceWithDefault()) {
+ expected = ImmutableList.of(
+ new Object[]{"1", 0L},
+ new Object[]{"2", 0L},
+ new Object[]{"abc", 0L},
+ new Object[]{"def", 0L},
+ new Object[]{"", 7L},
+ new Object[]{"10.1", 325323L}
+ );
+ } else {
+ expected = ImmutableList.of(
+ new Object[]{"1", null},
+ new Object[]{"abc", null},
+ new Object[]{"def", null},
+ new Object[]{"2", 0L},
+ new Object[]{"", 7L},
+ new Object[]{"10.1", 325323L}
+ );
+ }
+ testQuery(
+ "SELECT dim1, LATEST(l1) FROM druid.numfoo GROUP BY 1 ORDER BY 2 LIMIT
10",
+ ImmutableList.of(
+ new TopNQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE3)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.ALL)
+ .dimension(new DefaultDimensionSpec("dim1", "_d0"))
+ .aggregators(
+ aggregators(
+ new LongLastAggregatorFactory("a0", "l1")
+ )
+ )
+ .metric(new InvertedTopNMetricSpec(new
NumericTopNMetricSpec("a0")))
+ .threshold(10)
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ expected
+ );
+ }
+
+ @Test
public void testGroupByLong() throws Exception
{
testQuery(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]