This is an automated email from the ASF dual-hosted git repository.
pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b64bd81de1 [timeseries] Part-3.1: Add Support for Partial Aggregate
and Complex Intermediate Type (#14631)
b64bd81de1 is described below
commit b64bd81de15859407f18a802573d667070de8768
Author: Ankit Sultana <[email protected]>
AuthorDate: Tue Dec 17 09:45:29 2024 -0600
[timeseries] Part-3.1: Add Support for Partial Aggregate and Complex
Intermediate Type (#14631)
* [timeseries] Add Support for Partial Aggregation and Complex Intermediate
Type
* Fix tests + add tests + cleanup
* address feedback
* add missing todo
---
.../response/PinotBrokerTimeSeriesResponse.java | 2 +-
.../TimeSeriesAggregationOperatorTest.java | 2 +-
.../core/query/executor/QueryExecutorTest.java | 14 +--
.../pinot/tsdb/m3ql/M3TimeSeriesPlanner.java | 3 +-
.../tsdb/m3ql/operator/KeepLastValueOperator.java | 2 +-
.../tsdb/m3ql/operator/TransformNullOperator.java | 2 +-
.../timeseries/serde/TimeSeriesBlockSerde.java | 124 +++++++++++++++++++--
.../PhysicalTimeSeriesServerPlanVisitorTest.java | 2 +-
.../TimeSeriesExchangeReceiveOperatorTest.java | 12 +-
.../timeseries/serde/TimeSeriesBlockSerdeTest.java | 43 ++++++-
.../tsdb/planner/TimeSeriesPlanFragmenter.java | 13 ++-
.../java/org/apache/pinot/tsdb/spi/AggInfo.java | 30 ++++-
.../tsdb/spi/plan/LeafTimeSeriesPlanNode.java | 5 +
.../tsdb/spi/series/BaseTimeSeriesBuilder.java | 18 +--
.../apache/pinot/tsdb/spi/series/TimeSeries.java | 19 +++-
.../tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java | 9 +-
.../spi/plan/serde/TimeSeriesPlanSerdeTest.java | 4 +-
.../pinot/tsdb/spi/series/TimeSeriesTest.java | 54 +++++++++
18 files changed, 301 insertions(+), 57 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
index 96320b8326..4a1f347d16 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
@@ -118,7 +118,7 @@ public class PinotBrokerTimeSeriesResponse {
for (TimeSeries timeSeries : listOfTimeSeries) {
Object[][] values = new Object[timeValues.length][];
for (int i = 0; i < timeValues.length; i++) {
- Object nullableValue = timeSeries.getValues()[i];
+ Object nullableValue = timeSeries.getDoubleValues()[i];
values[i] = new Object[]{timeValues[i], nullableValue == null ? null
: nullableValue.toString()};
}
result.add(new PinotBrokerTimeSeriesResponse.Value(metricMap, values));
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
index eea81a4ba1..b6e97c849f 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
@@ -44,7 +44,7 @@ public class TimeSeriesAggregationOperatorTest {
private static final Random RANDOM = new Random();
private static final String DUMMY_TIME_COLUMN = "someTimeColumn";
private static final String GROUP_BY_COLUMN = "city";
- private static final AggInfo AGG_INFO = new AggInfo("SUM",
Collections.emptyMap());
+ private static final AggInfo AGG_INFO = new AggInfo("SUM", false,
Collections.emptyMap());
private static final ExpressionContext VALUE_EXPRESSION =
ExpressionContext.forIdentifier("someValueColumn");
private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000,
Duration.ofSeconds(100), 10);
private static final int NUM_DOCS_IN_DUMMY_DATA = 1000;
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 4a171128c8..0b59468e0d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -223,7 +223,7 @@ public class QueryExecutorTest {
ExpressionContext valueExpression =
ExpressionContext.forIdentifier("orderAmount");
TimeSeriesContext timeSeriesContext =
new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME,
TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
- 0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", null));
+ 0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", false,
Collections.emptyMap()));
QueryContext queryContext =
getQueryContextForTimeSeries(timeSeriesContext, Collections.emptyList());
ServerQueryRequest serverQueryRequest =
new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(),
ServerMetrics.get());
@@ -232,8 +232,8 @@ public class QueryExecutorTest {
TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock)
instanceResponse.getResultsBlock();
TimeSeriesBlock timeSeriesBlock =
resultsBlock.getTimeSeriesBuilderBlock().build();
assertEquals(timeSeriesBlock.getSeriesMap().size(), 1);
-
assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[0]);
-
assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[1],
29885544.0);
+
assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[0]);
+
assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[1],
29885544.0);
}
@Test
@@ -242,7 +242,7 @@ public class QueryExecutorTest {
ExpressionContext valueExpression =
ExpressionContext.forIdentifier("orderItemCount");
TimeSeriesContext timeSeriesContext =
new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME,
TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
- 0L /* offsetSeconds */, valueExpression, new AggInfo("MAX", null));
+ 0L /* offsetSeconds */, valueExpression, new AggInfo("MAX", false,
Collections.emptyMap()));
QueryContext queryContext =
getQueryContextForTimeSeries(timeSeriesContext);
ServerQueryRequest serverQueryRequest =
new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(),
ServerMetrics.get());
@@ -260,7 +260,7 @@ public class QueryExecutorTest {
assertFalse(foundNewYork, "Found multiple time-series for New York");
foundNewYork = true;
Optional<Double> maxValue =
-
Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).max(Comparator.naturalOrder());
+
Arrays.stream(timeSeries.getDoubleValues()).filter(Objects::nonNull).max(Comparator.naturalOrder());
assertTrue(maxValue.isPresent());
assertEquals(maxValue.get().longValue(), 4L);
}
@@ -274,7 +274,7 @@ public class QueryExecutorTest {
ExpressionContext valueExpression =
ExpressionContext.forIdentifier("orderItemCount");
TimeSeriesContext timeSeriesContext =
new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME,
TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
- 0L /* offsetSeconds */, valueExpression, new AggInfo("MIN", null));
+ 0L /* offsetSeconds */, valueExpression, new AggInfo("MIN", false,
Collections.emptyMap()));
QueryContext queryContext =
getQueryContextForTimeSeries(timeSeriesContext);
ServerQueryRequest serverQueryRequest =
new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(),
ServerMetrics.get());
@@ -292,7 +292,7 @@ public class QueryExecutorTest {
assertFalse(foundChicago, "Found multiple time-series for Chicago");
foundChicago = true;
Optional<Double> minValue =
-
Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).min(Comparator.naturalOrder());
+
Arrays.stream(timeSeries.getDoubleValues()).filter(Objects::nonNull).min(Comparator.naturalOrder());
assertTrue(minValue.isPresent());
assertEquals(minValue.get().longValue(), 0L);
}
diff --git
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
index 53844048a7..42515083c0 100644
---
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
+++
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
@@ -20,6 +20,7 @@ package org.apache.pinot.tsdb.m3ql;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
@@ -84,7 +85,7 @@ public class M3TimeSeriesPlanner implements
TimeSeriesLogicalPlanner {
case "max":
Preconditions.checkState(commandId == 1, "Aggregation should be the
second command (fetch should be first)");
Preconditions.checkState(aggInfo == null, "Aggregation already set.
Only single agg allowed.");
- aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH), null);
+ aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH), false,
Collections.emptyMap());
if (commands.get(commandId).size() > 1) {
String[] cols = commands.get(commandId).get(1).split(",");
groupByColumns =
Stream.of(cols).map(String::trim).collect(Collectors.toList());
diff --git
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java
index 0330dff13b..cef90b69af 100644
---
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java
+++
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java
@@ -34,7 +34,7 @@ public class KeepLastValueOperator extends
BaseTimeSeriesOperator {
TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock();
seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries
-> {
for (TimeSeries series : unionOfSeries) {
- Double[] values = series.getValues();
+ Double[] values = series.getDoubleValues();
Double lastValue = null;
for (int index = 0; index < values.length; index++) {
if (values[index] != null) {
diff --git
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java
index ca971c932c..661e4de498 100644
---
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java
+++
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java
@@ -37,7 +37,7 @@ public class TransformNullOperator extends
BaseTimeSeriesOperator {
TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock();
seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries
-> {
for (TimeSeries series : unionOfSeries) {
- Double[] values = series.getValues();
+ Double[] values = series.getDoubleValues();
for (int index = 0; index < values.length; index++) {
values[index] = values[index] == null ? _defaultValue :
values[index];
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
index cdbf668123..5978e29507 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
@@ -18,10 +18,12 @@
*/
package org.apache.pinot.query.runtime.timeseries.serde;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
@@ -29,10 +31,14 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.tsdb.spi.TimeBuckets;
@@ -51,7 +57,7 @@ import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
* the last column. As an example, consider the following, where FBV
represents the first bucket value of TimeBuckets.
* <pre>
*
+-------------+------------+-------------+---------------------------------+
- * | tag-0 | tag-1 | tag-n | values
|
+ * | tag-0 | tag-1 | tag-n | values (String[] or
double[]) |
*
+-------------+------------+-------------+---------------------------------+
* | null | null | null | [FBV, bucketSize,
numBuckets] |
*
+-------------+------------+-------------+---------------------------------+
@@ -74,6 +80,7 @@ public class TimeSeriesBlockSerde {
* Using Double.MIN_VALUE is better than using Double.NaN since Double.NaN
can help detect divide by 0.
* TODO(timeseries): Check if we can get rid of boxed Doubles altogether.
*/
+ private static final String VALUES_COLUMN_NAME = "__ts_serde_values";
private static final double NULL_PLACEHOLDER = Double.MIN_VALUE;
private TimeSeriesBlockSerde() {
@@ -85,12 +92,13 @@ public class TimeSeriesBlockSerde {
TransferableBlock transferableBlock =
TransferableBlockUtils.wrap(dataBlock);
List<String> tagNames =
generateTagNames(Objects.requireNonNull(transferableBlock.getDataSchema(),
"Missing data schema in TransferableBlock"));
+ final DataSchema dataSchema = transferableBlock.getDataSchema();
List<Object[]> container = transferableBlock.getContainer();
- TimeBuckets timeBuckets = timeBucketsFromRow(container.get(0));
+ TimeBuckets timeBuckets = timeBucketsFromRow(container.get(0), dataSchema);
Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
for (int index = 1; index < container.size(); index++) {
Object[] row = container.get(index);
- TimeSeries timeSeries = timeSeriesFromRow(tagNames, row, timeBuckets);
+ TimeSeries timeSeries = timeSeriesFromRow(tagNames, row, timeBuckets,
dataSchema);
long seriesId = Long.parseLong(timeSeries.getId());
seriesMap.computeIfAbsent(seriesId, x -> new
ArrayList<>()).add(timeSeries);
}
@@ -112,17 +120,77 @@ public class TimeSeriesBlockSerde {
return DataBlockUtils.toByteString(transferableBlock.getDataBlock());
}
+ /**
+ * This method is only used for encoding time-bucket-values to byte arrays,
when the TimeSeries value type
+ * is byte[][].
+ */
+ @VisibleForTesting
+ static byte[][] toBytesArray(double[] values) {
+ byte[][] result = new byte[values.length][8];
+ for (int index = 0; index < values.length; index++) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(result[index]);
+ byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
+ byteBuffer.putDouble(values[index]);
+ }
+ return result;
+ }
+
+ /**
+ * This method is only used for decoding time-bucket-values from byte
arrays, when the TimeSeries value type
+ * is byte[][].
+ */
+ @VisibleForTesting
+ static double[] fromBytesArray(byte[][] bytes) {
+ double[] result = new double[bytes.length];
+ for (int index = 0; index < bytes.length; index++) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes[index]);
+ byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
+ result[index] = byteBuffer.getDouble();
+ }
+ return result;
+ }
+
+ /**
+ * Since {@link DataBlockBuilder} does not support {@link
ColumnDataType#BYTES_ARRAY}, we have to encode the
+ * transmitted bytes as Hex to use String[].
+ */
+ @VisibleForTesting
+ static String[] encodeAsHex(byte[][] byteValues) {
+ String[] result = new String[byteValues.length];
+ for (int index = 0; index < result.length; index++) {
+ result[index] = Hex.encodeHexString(byteValues[index]);
+ }
+ return result;
+ }
+
+ /**
+ * Used for decoding Hex strings. See {@link
TimeSeriesBlockSerde#encodeAsHex} for more.
+ */
+ @VisibleForTesting
+ static byte[][] decodeFromHex(String[] hexEncodedValues) {
+ byte[][] result = new byte[hexEncodedValues.length][];
+ for (int index = 0; index < hexEncodedValues.length; index++) {
+ try {
+ result[index] = Hex.decodeHex(hexEncodedValues[index]);
+ } catch (DecoderException e) {
+ throw new RuntimeException("Error decoding byte[] value from encoded
hex string", e);
+ }
+ }
+ return result;
+ }
+
private static DataSchema generateDataSchema(TimeSeriesBlock
timeSeriesBlock) {
TimeSeries sampledTimeSeries =
sampleTimeSeries(timeSeriesBlock).orElse(null);
int numTags = sampledTimeSeries == null ? 0 :
sampledTimeSeries.getTagNames().size();
ColumnDataType[] dataTypes = new ColumnDataType[numTags + 1];
+ final ColumnDataType valueDataType = inferValueDataType(sampledTimeSeries);
String[] columnNames = new String[numTags + 1];
for (int tagIndex = 0; tagIndex < numTags; tagIndex++) {
columnNames[tagIndex] = sampledTimeSeries.getTagNames().get(tagIndex);
dataTypes[tagIndex] = ColumnDataType.STRING;
}
- columnNames[numTags] = "__ts_values";
- dataTypes[numTags] = ColumnDataType.DOUBLE_ARRAY;
+ columnNames[numTags] = VALUES_COLUMN_NAME;
+ dataTypes[numTags] = valueDataType;
return new DataSchema(columnNames, dataTypes);
}
@@ -144,6 +212,14 @@ public class TimeSeriesBlockSerde {
return Optional.of(timeSeriesList.get(0));
}
+ private static ColumnDataType inferValueDataType(@Nullable TimeSeries
timeSeries) {
+ if (timeSeries == null || timeSeries.getValues() instanceof Double[]) {
+ return ColumnDataType.DOUBLE_ARRAY;
+ }
+ // Byte values are encoded as hex array
+ return ColumnDataType.STRING_ARRAY;
+ }
+
private static Object[] timeBucketsToRow(TimeBuckets timeBuckets, DataSchema
dataSchema) {
int numColumns = dataSchema.getColumnNames().length;
Object[] result = new Object[numColumns];
@@ -153,12 +229,27 @@ public class TimeSeriesBlockSerde {
double firstBucketValue = timeBuckets.getTimeBuckets()[0];
double bucketSizeSeconds = timeBuckets.getBucketSize().getSeconds();
double numBuckets = timeBuckets.getNumBuckets();
- result[numColumns - 1] = new double[]{firstBucketValue, bucketSizeSeconds,
numBuckets};
+ final ColumnDataType valuesDataType =
dataSchema.getColumnDataTypes()[numColumns - 1];
+ final double[] bucketsEncodedAsDouble = new double[]{firstBucketValue,
bucketSizeSeconds, numBuckets};
+ if (valuesDataType == ColumnDataType.DOUBLE_ARRAY) {
+ result[numColumns - 1] = bucketsEncodedAsDouble;
+ } else {
+ Preconditions.checkState(valuesDataType == ColumnDataType.STRING_ARRAY,
+ "Expected bytes_array column type. Found: %s", valuesDataType);
+ result[numColumns - 1] =
encodeAsHex(toBytesArray(bucketsEncodedAsDouble));
+ }
return result;
}
- private static TimeBuckets timeBucketsFromRow(Object[] row) {
- double[] values = (double[]) row[row.length - 1];
+ private static TimeBuckets timeBucketsFromRow(Object[] row, DataSchema
dataSchema) {
+ int numColumns = dataSchema.getColumnDataTypes().length;
+ double[] values;
+ if (dataSchema.getColumnDataTypes()[numColumns - 1] ==
ColumnDataType.STRING_ARRAY) {
+ byte[][] byteValues = decodeFromHex((String[]) row[row.length - 1]);
+ values = fromBytesArray(byteValues);
+ } else {
+ values = (double[]) row[row.length - 1];
+ }
long fbv = (long) values[0];
Duration window = Duration.ofSeconds((long) values[1]);
int numBuckets = (int) values[2];
@@ -172,14 +263,25 @@ public class TimeSeriesBlockSerde {
Object tagValue = timeSeries.getTagValues()[index];
result[index] = tagValue == null ? "null" : tagValue.toString();
}
- result[numColumns - 1] = unboxDoubleArray(timeSeries.getValues());
+ if (dataSchema.getColumnDataTypes()[numColumns - 1] ==
ColumnDataType.DOUBLE_ARRAY) {
+ result[numColumns - 1] = unboxDoubleArray(timeSeries.getDoubleValues());
+ } else {
+ result[numColumns - 1] = encodeAsHex(timeSeries.getBytesValues());
+ }
return result;
}
- private static TimeSeries timeSeriesFromRow(List<String> tagNames, Object[]
row, TimeBuckets timeBuckets) {
- Double[] values = boxDoubleArray((double[]) row[row.length - 1]);
+ private static TimeSeries timeSeriesFromRow(List<String> tagNames, Object[]
row, TimeBuckets timeBuckets,
+ DataSchema dataSchema) {
+ int numColumns = dataSchema.getColumnDataTypes().length;
Object[] tagValues = new Object[row.length - 1];
System.arraycopy(row, 0, tagValues, 0, row.length - 1);
+ Object[] values;
+ if (dataSchema.getColumnDataTypes()[numColumns - 1] ==
ColumnDataType.DOUBLE_ARRAY) {
+ values = boxDoubleArray((double[]) row[row.length - 1]);
+ } else {
+ values = decodeFromHex((String[]) row[row.length - 1]);
+ }
return new TimeSeries(Long.toString(TimeSeries.hash(tagValues)), null,
timeBuckets, values, tagNames, tagValues);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
index e85d17cf6c..43b3496dfb 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
@@ -45,7 +45,7 @@ public class PhysicalTimeSeriesServerPlanVisitorTest {
final String planId = "id";
final String tableName = "orderTable";
final String timeColumn = "orderTime";
- final AggInfo aggInfo = new AggInfo("SUM", null);
+ final AggInfo aggInfo = new AggInfo("SUM", false, Collections.emptyMap());
final String filterExpr = "cityName = 'Chicago'";
PhysicalTimeSeriesServerPlanVisitor serverPlanVisitor = new
PhysicalTimeSeriesServerPlanVisitor(
mock(QueryExecutor.class), mock(ExecutorService.class),
mock(ServerMetrics.class));
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
index c9fd929333..5a9079de2c 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
@@ -39,7 +39,7 @@ import static org.testng.Assert.*;
public class TimeSeriesExchangeReceiveOperatorTest {
private static final int NUM_SERVERS_QUERIED = 3;
- private static final AggInfo SUM_AGG_INFO = new AggInfo("SUM", null);
+ private static final AggInfo SUM_AGG_INFO = new AggInfo("SUM", false,
Collections.emptyMap());
private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000,
Duration.ofSeconds(200), 4);
private static final List<String> TAG_NAMES = ImmutableList.of("city",
"zip");
private static final Object[] CHICAGO_SERIES_VALUES = new
Object[]{"Chicago", "60605"};
@@ -65,10 +65,10 @@ public class TimeSeriesExchangeReceiveOperatorTest {
assertEquals(block.getSeriesMap().get(CHICAGO_SERIES_HASH).size(), 1,
"Expected 1 series for Chicago");
assertEquals(block.getSeriesMap().get(SF_SERIES_HASH).size(), 1, "Expected
1 series for SF");
// Ensure Chicago had series addition performed
- Double[] chicagoSeriesValues =
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getValues();
+ Double[] chicagoSeriesValues =
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getDoubleValues();
assertEquals(chicagoSeriesValues, new Double[]{20.0, 20.0, 20.0, 20.0});
// Ensure SF had input series unmodified
- Double[] sanFranciscoSeriesValues =
block.getSeriesMap().get(SF_SERIES_HASH).get(0).getValues();
+ Double[] sanFranciscoSeriesValues =
block.getSeriesMap().get(SF_SERIES_HASH).get(0).getDoubleValues();
assertEquals(sanFranciscoSeriesValues, new Double[]{10.0, 10.0, 10.0,
10.0});
}
@@ -89,12 +89,12 @@ public class TimeSeriesExchangeReceiveOperatorTest {
assertEquals(block.getSeriesMap().get(CHICAGO_SERIES_HASH).size(), 2,
"Expected 2 series for Chicago");
assertEquals(block.getSeriesMap().get(SF_SERIES_HASH).size(), 1, "Expected
1 series for SF");
// Ensure Chicago has unmodified series values
- Double[] firstChicagoSeriesValues =
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getValues();
- Double[] secondChicagoSeriesValues =
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(1).getValues();
+ Double[] firstChicagoSeriesValues =
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getDoubleValues();
+ Double[] secondChicagoSeriesValues =
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(1).getDoubleValues();
assertEquals(firstChicagoSeriesValues, new Double[]{10.0, 10.0, 10.0,
10.0});
assertEquals(secondChicagoSeriesValues, new Double[]{10.0, 10.0, 10.0,
10.0});
// Ensure SF has input unmodified series values
- Double[] sanFranciscoSeriesValues =
block.getSeriesMap().get(SF_SERIES_HASH).get(0).getValues();
+ Double[] sanFranciscoSeriesValues =
block.getSeriesMap().get(SF_SERIES_HASH).get(0).getDoubleValues();
assertEquals(sanFranciscoSeriesValues, new Double[]{10.0, 10.0, 10.0,
10.0});
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
index f08d39ca0a..d488d8fbd0 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
@@ -47,7 +47,7 @@ public class TimeSeriesBlockSerdeTest {
// 4. Compare ByteString-1 and ByteString-2.
// 5. Compare values of Block-1 and Block-2.
List<TimeSeriesBlock> blocks = List.of(buildBlockWithNoTags(),
buildBlockWithSingleTag(),
- buildBlockWithMultipleTags());
+ buildBlockWithMultipleTags(), buildBlockWithByteValues());
for (TimeSeriesBlock block1 : blocks) {
// Serialize, deserialize and serialize again
ByteString byteString1 =
TimeSeriesBlockSerde.serializeTimeSeriesBlock(block1);
@@ -61,6 +61,31 @@ public class TimeSeriesBlockSerdeTest {
}
}
+ @Test
+ public void testFromToBytesArray() {
+ // Encode and decode a double[] array to confirm the values turn out to be
the same.
+ double[][] inputs = new double[][]{
+ {131.0, 1.31, 0.0},
+ {1.0, 1231.0, 1.0}
+ };
+ for (double[] input : inputs) {
+ byte[][] encodedBytes = TimeSeriesBlockSerde.toBytesArray(input);
+ double[] decodedValues =
TimeSeriesBlockSerde.fromBytesArray(encodedBytes);
+ assertEquals(decodedValues, input);
+ }
+ }
+
+ @Test
+ public void testFromToHex() {
+ byte[][] input = new byte[][]{
+ {0x1a}, {0x00}, {0x77}, {Byte.MIN_VALUE},
+ {Byte.MAX_VALUE}, {0x13}, {0x19}, {0x77}
+ };
+ String[] encodedValues = TimeSeriesBlockSerde.encodeAsHex(input);
+ byte[][] decodedValues = TimeSeriesBlockSerde.decodeFromHex(encodedValues);
+ assertEquals(decodedValues, input);
+ }
+
/**
* Compares time series blocks in a way which makes it easy to debug test
failures when/if they happen in CI.
*/
@@ -132,4 +157,20 @@ public class TimeSeriesBlockSerdeTest {
new Double[]{Double.NaN, -1.0, -1231231.0, 3.14}, tagNames,
seriesTwoValues)));
return new TimeSeriesBlock(timeBuckets, seriesMap);
}
+
+ private static TimeSeriesBlock buildBlockWithByteValues() {
+ TimeBuckets timeBuckets = TIME_BUCKETS;
+ // Series are: [cityId=Chicago, zip=60605] and [cityId=San Francisco,
zip=94107]
+ List<String> tagNames = ImmutableList.of("cityId", "zip");
+ Object[] seriesOneValues = new Object[]{"Chicago", "60605"};
+ Object[] seriesTwoValues = new Object[]{"San Francisco", "94107"};
+ long seriesOneHash = TimeSeries.hash(seriesOneValues);
+ long seriesTwoHash = TimeSeries.hash(seriesTwoValues);
+ Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+ seriesMap.put(seriesOneHash, ImmutableList.of(new
TimeSeries(Long.toString(seriesOneHash), null, timeBuckets,
+ new byte[][]{{0x13}, {0x1b}, {0x12}, {0x00}}, tagNames,
seriesOneValues)));
+ seriesMap.put(seriesTwoHash, ImmutableList.of(new
TimeSeries(Long.toString(seriesTwoHash), null, timeBuckets,
+ new byte[][]{{0x00}, {0x00}, {Byte.MIN_VALUE}, {0x7f}}, tagNames,
seriesTwoValues)));
+ return new TimeSeriesBlock(timeBuckets, seriesMap);
+ }
}
diff --git
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
index 46a3f68c31..32287f4d83 100644
---
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
+++
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
@@ -18,10 +18,12 @@
*/
package org.apache.pinot.tsdb.planner;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
@@ -102,8 +104,15 @@ public class TimeSeriesPlanFragmenter {
private static BaseTimeSeriesPlanNode
fragmentRecursively(BaseTimeSeriesPlanNode planNode, Context context) {
if (planNode instanceof LeafTimeSeriesPlanNode) {
LeafTimeSeriesPlanNode leafNode = (LeafTimeSeriesPlanNode) planNode;
- context._fragments.add(leafNode.withInputs(Collections.emptyList()));
- return new TimeSeriesExchangeNode(planNode.getId(),
Collections.emptyList(), leafNode.getAggInfo());
+ AggInfo currentAggInfo = leafNode.getAggInfo();
+ if (currentAggInfo == null) {
+ context._fragments.add(leafNode.withInputs(Collections.emptyList()));
+ } else {
+ Preconditions.checkState(!currentAggInfo.getIsPartial(),
+ "Leaf node in the logical plan should not have partial agg");
+
context._fragments.add(leafNode.withAggInfo(currentAggInfo.withPartialAggregation()));
+ }
+ return new TimeSeriesExchangeNode(planNode.getId(),
Collections.emptyList(), currentAggInfo);
}
List<BaseTimeSeriesPlanNode> newInputs = new ArrayList<>();
for (BaseTimeSeriesPlanNode input : planNode.getInputs()) {
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
index 0dc3e0502d..33b66bff1f 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.Map;
-import javax.annotation.Nullable;
/**
@@ -41,24 +40,47 @@ import javax.annotation.Nullable;
* Example usage:
* Map<String, String> params = new HashMap<>();
* params.put("window", "5m");
- * AggInfo aggInfo = new AggInfo("rate", params);
+ * AggInfo aggInfo = new AggInfo("rate", true, params);
*/
public class AggInfo {
private final String _aggFunction;
+ /**
+ * Denotes whether an aggregate is partial or full. When returning the
logical plan, language developers must not
+ * set this to true. This is used during Physical planning, and Pinot may
set this to true if the corresponding
+ * aggregate node is not guaranteed to have the full data. In such cases,
the physical plan will always add a
+ * complimentary full aggregate.
+ * <p>
+ * TODO(timeseries): Ideally we should remove this from the logical plan
completely.
+ * </p>
+ */
+ private final boolean _isPartial;
private final Map<String, String> _params;
@JsonCreator
- public AggInfo(@JsonProperty("aggFunction") String aggFunction,
- @JsonProperty("params") @Nullable Map<String, String> params) {
+ public AggInfo(@JsonProperty("aggFunction") String aggFunction,
@JsonProperty("isPartial") boolean isPartial,
+ @JsonProperty("params") Map<String, String> params) {
Preconditions.checkNotNull(aggFunction, "Received null aggFunction in
AggInfo");
_aggFunction = aggFunction;
+ _isPartial = isPartial;
_params = params != null ? params : Collections.emptyMap();
}
+ public AggInfo withPartialAggregation() {
+ return new AggInfo(_aggFunction, true, _params);
+ }
+
+ public AggInfo withFullAggregation() {
+ return new AggInfo(_aggFunction, false, _params);
+ }
+
public String getAggFunction() {
return _aggFunction;
}
+ public boolean getIsPartial() {
+ return _isPartial;
+ }
+
public Map<String, String> getParams() {
return Collections.unmodifiableMap(_params);
}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
index 1986f4713d..3deb4c68e6 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
@@ -64,6 +64,11 @@ public class LeafTimeSeriesPlanNode extends
BaseTimeSeriesPlanNode {
_groupByExpressions = groupByExpressions;
}
+ public LeafTimeSeriesPlanNode withAggInfo(AggInfo newAggInfo) {
+ return new LeafTimeSeriesPlanNode(_id, _inputs, _tableName, _timeColumn,
_timeUnit, _offsetSeconds,
+ _filterExpression, _valueExpression, newAggInfo, _groupByExpressions);
+ }
+
@Override
public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode>
newInputs) {
return new LeafTimeSeriesPlanNode(_id, newInputs, _tableName, _timeColumn,
_timeUnit, _offsetSeconds,
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
index 20ac1714a8..9cca55ebcb 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
@@ -19,7 +19,6 @@
package org.apache.pinot.tsdb.spi.series;
import java.util.List;
-import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.pinot.tsdb.spi.TimeBuckets;
@@ -61,19 +60,14 @@ public abstract class BaseTimeSeriesBuilder {
public abstract void addValue(long timeValue, Double value);
- public void mergeSeries(TimeSeries series) {
- int numDataPoints = series.getValues().length;
- Long[] timeValues = Objects.requireNonNull(series.getTimeValues(),
- "Cannot merge series: found null timeValues");
- for (int i = 0; i < numDataPoints; i++) {
- addValue(timeValues[i], series.getValues()[i]);
- }
- }
-
+ /**
+ * Assumes Double[] values and attempts to merge the given series with this
builder. Implementations are
+ * recommended to override this to either optimize, or add bytes[][] values
from the input Series.
+ */
public void mergeAlignedSeries(TimeSeries series) {
- int numDataPoints = series.getValues().length;
+ int numDataPoints = series.getDoubleValues().length;
for (int i = 0; i < numDataPoints; i++) {
- addValueAtIndex(i, series.getValues()[i]);
+ addValueAtIndex(i, series.getDoubleValues()[i]);
}
}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java
index 55e2a9a730..4a2e452116 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.tsdb.spi.series;
+import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -67,12 +68,16 @@ public class TimeSeries {
private final String _id;
private final Long[] _timeValues;
private final TimeBuckets _timeBuckets;
- private final Double[] _values;
+ private final Object[] _values;
private final List<String> _tagNames;
private final Object[] _tagValues;
- public TimeSeries(String id, @Nullable Long[] timeValues, @Nullable
TimeBuckets timeBuckets, Double[] values,
+ // TODO(timeseries): Time series may also benefit from storing
extremal/outlier value traces, similar to Monarch.
+ // TODO(timeseries): It may make sense to allow types other than Double and
byte[] arrays.
+ public TimeSeries(String id, @Nullable Long[] timeValues, @Nullable
TimeBuckets timeBuckets, Object[] values,
List<String> tagNames, Object[] tagValues) {
+ Preconditions.checkArgument(values instanceof Double[] || values
instanceof byte[][],
+ "Time Series can only take Double[] or byte[][] values");
_id = id;
_timeValues = timeValues;
_timeBuckets = timeBuckets;
@@ -95,10 +100,18 @@ public class TimeSeries {
return _timeBuckets;
}
- public Double[] getValues() {
+ public Object[] getValues() {
return _values;
}
+ public Double[] getDoubleValues() {
+ return (Double[]) _values;
+ }
+
+ public byte[][] getBytesValues() {
+ return (byte[][]) _values;
+ }
+
public List<String> getTagNames() {
return _tagNames;
}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
index 011cb6fbc6..d326ed49b5 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
@@ -44,7 +44,7 @@ public class LeafTimeSeriesPlanNodeTest {
{
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE,
TIME_COLUMN, TIME_UNIT, 0L, "", "value_col",
- new AggInfo("SUM", null), Collections.singletonList("cityName"));
+ new AggInfo("SUM", false, null),
Collections.singletonList("cityName"));
assertEquals(planNode.getEffectiveFilter(timeBuckets),
"orderTime > " + expectedStartTimeInFilter + " AND orderTime <= " +
expectedEndTimeInFilter);
}
@@ -52,7 +52,7 @@ public class LeafTimeSeriesPlanNodeTest {
{
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE,
TIME_COLUMN, TIME_UNIT, 123L, "", "value_col",
- new AggInfo("SUM", null), Collections.singletonList("cityName"));
+ new AggInfo("SUM", false, null),
Collections.singletonList("cityName"));
assertEquals(planNode.getEffectiveFilter(timeBuckets),
"orderTime > " + (expectedStartTimeInFilter - 123) + " AND orderTime
<= " + (expectedEndTimeInFilter - 123));
}
@@ -60,7 +60,7 @@ public class LeafTimeSeriesPlanNodeTest {
{
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE,
TIME_COLUMN, TIME_UNIT, 123L, nonEmptyFilter,
- "value_col", new AggInfo("SUM", null),
Collections.singletonList("cityName"));
+ "value_col", new AggInfo("SUM", false, Collections.emptyMap()),
Collections.singletonList("cityName"));
assertEquals(planNode.getEffectiveFilter(timeBuckets),
String.format("(%s) AND (orderTime > %s AND orderTime <= %s)",
nonEmptyFilter,
(expectedStartTimeInFilter - 123), (expectedEndTimeInFilter -
123)));
@@ -69,7 +69,8 @@ public class LeafTimeSeriesPlanNodeTest {
{
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE,
TIME_COLUMN, TimeUnit.MILLISECONDS, 123L,
- nonEmptyFilter, "value_col", new AggInfo("SUM", null),
Collections.singletonList("cityName"));
+ nonEmptyFilter, "value_col", new AggInfo("SUM", false,
Collections.emptyMap()),
+ Collections.singletonList("cityName"));
assertEquals(planNode.getEffectiveFilter(timeBuckets),
String.format("(%s) AND (orderTime > %s AND orderTime <= %s)",
nonEmptyFilter,
(expectedStartTimeInFilter * 1000 - 123 * 1000),
(expectedEndTimeInFilter * 1000 - 123 * 1000)));
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
index 4bd5c37a5a..71bf2323fd 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
@@ -28,6 +28,7 @@ import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -40,7 +41,7 @@ public class TimeSeriesPlanSerdeTest {
LeafTimeSeriesPlanNode leafTimeSeriesPlanNode =
new LeafTimeSeriesPlanNode("sfp#0", new ArrayList<>(), "myTable",
"myTimeColumn", TimeUnit.MILLISECONDS, 0L,
- "myFilterExpression", "myValueExpression", new AggInfo("SUM",
aggParams), new ArrayList<>());
+ "myFilterExpression", "myValueExpression", new AggInfo("SUM",
false, aggParams), new ArrayList<>());
BaseTimeSeriesPlanNode planNode =
TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(leafTimeSeriesPlanNode));
assertTrue(planNode instanceof LeafTimeSeriesPlanNode);
@@ -52,6 +53,7 @@ public class TimeSeriesPlanSerdeTest {
assertEquals(deserializedNode.getFilterExpression(), "myFilterExpression");
assertEquals(deserializedNode.getValueExpression(), "myValueExpression");
assertNotNull(deserializedNode.getAggInfo());
+ assertFalse(deserializedNode.getAggInfo().getIsPartial());
assertNotNull(deserializedNode.getAggInfo().getParams());
assertEquals(deserializedNode.getAggInfo().getParams().get("window"),
"5m");
assertEquals(deserializedNode.getGroupByExpressions().size(), 0);
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/series/TimeSeriesTest.java
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/series/TimeSeriesTest.java
new file mode 100644
index 0000000000..db651785e8
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/series/TimeSeriesTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series;
+
+import java.time.Duration;
+import java.util.Collections;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesTest {
+ private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(100,
Duration.ofSeconds(10), 10);
+
+ @Test
+ public void testTimeSeriesAcceptsDoubleValues() {
+ Double[] values = new Double[10];
+ TimeSeries timeSeries = new TimeSeries("anything", null, TIME_BUCKETS,
values, Collections.emptyList(),
+ new Object[0]);
+ assertEquals(timeSeries.getDoubleValues(), values);
+ }
+
+ @Test
+ public void testTimeSeriesAcceptsBytesValues() {
+ byte[][] byteValues = new byte[10][1231];
+ TimeSeries timeSeries = new TimeSeries("anything", null, TIME_BUCKETS,
byteValues, Collections.emptyList(),
+ new Object[0]);
+ assertEquals(timeSeries.getBytesValues(), byteValues);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testTimeSeriesDeniesWhenValuesNotDoubleOrBytes() {
+ Object[] someValues = new Long[10];
+ TimeSeries timeSeries = new TimeSeries("anything", null, TIME_BUCKETS,
someValues, Collections.emptyList(),
+ new Object[0]);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]