This is an automated email from the ASF dual-hosted git repository.
pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1b5858b5b4 [timeseries] Part-3: Add Time Series Exchange Operator,
Plan Node and Serde (#14611)
1b5858b5b4 is described below
commit 1b5858b5b4b5dffe45d5af73f3604d85bfa36132
Author: Ankit Sultana <[email protected]>
AuthorDate: Tue Dec 10 11:39:27 2024 -0600
[timeseries] Part-3: Add Time Series Exchange Operator, Plan Node and Serde
(#14611)
* Part-3: Add Time Series Exchange Operator, Plan Node and Serde
* add tests + fragmenter
* add license
* address feedback
* address more feedback
---
.../TimeSeriesExchangeReceiveOperator.java | 182 +++++++++++++++++++
.../TimeSeriesExchangeReceivePlanNode.java | 74 ++++++++
.../timeseries/serde/TimeSeriesBlockSerde.java | 201 +++++++++++++++++++++
.../TimeSeriesExchangeReceiveOperatorTest.java | 156 ++++++++++++++++
.../timeseries/serde/TimeSeriesBlockSerdeTest.java | 135 ++++++++++++++
.../pinot/tsdb/planner/TimeSeriesExchangeNode.java | 75 ++++++++
.../tsdb/planner/TimeSeriesPlanFragmenter.java | 118 ++++++++++++
.../tsdb/planner/TimeSeriesPlanFragmenterTest.java | 167 +++++++++++++++++
8 files changed, 1108 insertions(+)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
new file mode 100644
index 0000000000..79d49e0b46
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
+
+
+/**
+ * <h2>Overview</h2>
+ * Receives and optionally aggregates the response from all servers for the
corresponding plan node.
+ *
+ * <h3>Aggregate Receive</h3>
+ * When a non-null {@link AggInfo} is passed, this operator will aggregate the
received data using the corresponding
+ * series builder created via {@link TimeSeriesBuilderFactory}.
+ *
+ * <h3>Non-Aggregate Receive</h3>
+ * When a null AggInfo is passed, then we don't perform any aggregation. If we
receive series with the same ID from
+ * different servers, we will simply append them to the list, creating a union.
+ */
+public class TimeSeriesExchangeReceiveOperator extends BaseTimeSeriesOperator {
+ /**
+ * Receiver will receive either TimeSeriesBlock or Throwable. And will have
at most _numServersQueried objects that
+ * can be polled.
+ */
+ private final BlockingQueue<Object> _receiver;
+ private final long _deadlineMs;
+ private final int _numServersQueried;
+ @Nullable
+ private final AggInfo _aggInfo;
+ private final TimeSeriesBuilderFactory _factory;
+
+ public TimeSeriesExchangeReceiveOperator(BlockingQueue<Object> receiver,
long deadlineMs, int numServersQueried,
+ @Nullable AggInfo aggInfo, TimeSeriesBuilderFactory
seriesBuilderFactory) {
+ super(Collections.emptyList());
+ Preconditions.checkArgument(numServersQueried > 0, "No servers to query in
receive operator");
+ _receiver = receiver;
+ _deadlineMs = deadlineMs;
+ _numServersQueried = numServersQueried;
+ _aggInfo = aggInfo;
+ _factory = seriesBuilderFactory;
+ }
+
+ @Override
+ public TimeSeriesBlock getNextBlock() {
+ try {
+ if (_aggInfo == null) {
+ return getNextBlockNoAggregation();
+ } else {
+ return getNextBlockWithAggregation();
+ }
+ } catch (Throwable t) {
+ throw new RuntimeException(t.getMessage(), t);
+ }
+ }
+
+ @Override
+ public String getExplainName() {
+ return "TIME_SERIES_EXCHANGE_RECEIVE";
+ }
+
+ @VisibleForTesting
+ protected Object poll(long remainingTimeMs)
+ throws InterruptedException {
+ return _receiver.poll(remainingTimeMs, TimeUnit.MILLISECONDS);
+ }
+
+ private TimeSeriesBlock getNextBlockWithAggregation()
+ throws Throwable {
+ TimeBuckets timeBuckets = null;
+ Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>();
+ for (int index = 0; index < _numServersQueried; index++) {
+ // Step-1: Poll, and ensure we received a TimeSeriesBlock.
+ long remainingTimeMs = _deadlineMs - System.currentTimeMillis();
+ Preconditions.checkState(remainingTimeMs > 0,
+ "Timed out before polling all servers. Successfully Polled: %s of
%s", index, _numServersQueried);
+ Object result = poll(remainingTimeMs);
+ Preconditions.checkNotNull(result, "Timed out waiting for response.
Waited: %s ms", remainingTimeMs);
+ if (result instanceof Throwable) {
+ throw (Throwable) result;
+ }
+ Preconditions.checkState(result instanceof TimeSeriesBlock,
+ "Found unexpected object. This is a bug: %s", result.getClass());
+ // Step-2: Init timeBuckets and ensure they are the same across all
servers.
+ TimeSeriesBlock blockToMerge = (TimeSeriesBlock) result;
+ if (timeBuckets == null) {
+ timeBuckets = blockToMerge.getTimeBuckets();
+ } else {
+
Preconditions.checkState(timeBuckets.equals(blockToMerge.getTimeBuckets()),
+ "Found unequal time buckets from server response");
+ }
+ // Step-3: Merge new block with existing block.
+ for (var entry : blockToMerge.getSeriesMap().entrySet()) {
+ long seriesHash = entry.getKey();
+ List<TimeSeries> currentSeriesList = entry.getValue();
+ TimeSeries sampledTimeSeries = currentSeriesList.get(0);
+ // Init seriesBuilder if required
+ BaseTimeSeriesBuilder seriesBuilder = seriesBuilderMap.get(seriesHash);
+ if (seriesBuilder == null) {
+ seriesBuilder = _factory.newTimeSeriesBuilder(
+ _aggInfo, Long.toString(seriesHash), timeBuckets,
sampledTimeSeries.getTagNames(),
+ sampledTimeSeries.getTagValues());
+ seriesBuilderMap.put(seriesHash, seriesBuilder);
+ }
+ for (TimeSeries timeSeries : currentSeriesList) {
+ seriesBuilder.mergeAlignedSeries(timeSeries);
+ }
+ }
+ }
+ // Convert series builders to series and return.
+ Map<Long, List<TimeSeries>> seriesMap = new
HashMap<>(seriesBuilderMap.size());
+ for (var entry : seriesBuilderMap.entrySet()) {
+ long seriesHash = entry.getKey();
+ List<TimeSeries> timeSeriesList = new ArrayList<>();
+ timeSeriesList.add(entry.getValue().build());
+ seriesMap.put(seriesHash, timeSeriesList);
+ }
+ return new TimeSeriesBlock(timeBuckets, seriesMap);
+ }
+
+ private TimeSeriesBlock getNextBlockNoAggregation()
+ throws Throwable {
+ Map<Long, List<TimeSeries>> timeSeriesMap = new HashMap<>();
+ TimeBuckets timeBuckets = null;
+ for (int index = 0; index < _numServersQueried; index++) {
+ long remainingTimeMs = _deadlineMs - System.currentTimeMillis();
+ Preconditions.checkState(remainingTimeMs > 0, "Timed out before polling
exchange receive");
+ Object result = _receiver.poll(remainingTimeMs, TimeUnit.MILLISECONDS);
+ Preconditions.checkNotNull(result, "Timed out waiting for response.
Waited: %s ms", remainingTimeMs);
+ if (result instanceof Throwable) {
+ throw ((Throwable) result);
+ }
+ Preconditions.checkState(result instanceof TimeSeriesBlock,
+ "Found unexpected object. This is a bug: %s", result.getClass());
+ TimeSeriesBlock blockToMerge = (TimeSeriesBlock) result;
+ if (timeBuckets == null) {
+ timeBuckets = blockToMerge.getTimeBuckets();
+ } else {
+
Preconditions.checkState(timeBuckets.equals(blockToMerge.getTimeBuckets()),
+ "Found unequal time buckets from server response");
+ }
+ for (var entry : blockToMerge.getSeriesMap().entrySet()) {
+ long seriesHash = entry.getKey();
+ List<TimeSeries> timeSeriesList = entry.getValue();
+ timeSeriesMap.computeIfAbsent(seriesHash, x -> new
ArrayList<>()).addAll(timeSeriesList);
+ }
+ }
+ Preconditions.checkNotNull(timeBuckets, "Time buckets is null in exchange
receive operator");
+ return new TimeSeriesBlock(timeBuckets, timeSeriesMap);
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceivePlanNode.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceivePlanNode.java
new file mode 100644
index 0000000000..42416563e5
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceivePlanNode.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import javax.annotation.Nullable;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
+
+
+/**
+ * With the broker-reduce mode in Time Series Engine, this node becomes the
leaf stage for the broker. In other words,
+ * the plan fragment that runs in the broker will always have this node in the
leaves.
+ */
+public class TimeSeriesExchangeReceivePlanNode extends BaseTimeSeriesPlanNode {
+ private final long _deadlineMs;
+ private final AggInfo _aggInfo;
+ private final TimeSeriesBuilderFactory _factory;
+ private BlockingQueue<Object> _receiver;
+ private int _numServersQueried;
+
+ public TimeSeriesExchangeReceivePlanNode(String id, long deadlineMs,
@Nullable AggInfo aggInfo,
+ TimeSeriesBuilderFactory factory) {
+ super(id, Collections.emptyList());
+ _deadlineMs = deadlineMs;
+ _aggInfo = aggInfo;
+ _factory = factory;
+ }
+
+ public void init(BlockingQueue<Object> receiver, int numExpectedBlocks) {
+ _receiver = receiver;
+ _numServersQueried = numExpectedBlocks;
+ }
+
+ @Override
+ public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode>
newInputs) {
+ return new TimeSeriesExchangeReceivePlanNode(_id, _deadlineMs, _aggInfo,
_factory);
+ }
+
+ @Override
+ public String getKlass() {
+ return TimeSeriesExchangeReceivePlanNode.class.getName();
+ }
+
+ @Override
+ public String getExplainName() {
+ return "TIME_SERIES_EXCHANGE_RECEIVE";
+ }
+
+ @Override
+ public BaseTimeSeriesOperator run() {
+ return new TimeSeriesExchangeReceiveOperator(_receiver, _deadlineMs,
_numServersQueried, _aggInfo, _factory);
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
new file mode 100644
index 0000000000..cdbf668123
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries.serde;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+/**
+ * Implements a simple Serde mechanism for the Time Series Block. This is used
for transferring data between servers
+ * and brokers. The approach is to use a {@link TransferableBlock} and rely on
the existing serialization code to avoid
+ * re-inventing the wheel. Once the time-series engine coalesces with the
Multistage Engine, we will anyway use
+ * TransferableBlock for data transfers.
+ * <p>
+ * The {@link TimeSeriesBlock} is converted to and from a table, where the
first row contains information about the
+ * time-buckets. For each tag/label in the query, there's a dedicated
column, and the Double values are stored in
+ * the last column. As an example, consider the following, where FBV
represents the first bucket value of TimeBuckets.
+ * <pre>
+ *
+-------------+------------+-------------+---------------------------------+
+ * | tag-0 | tag-1 | tag-n | values
|
+ *
+-------------+------------+-------------+---------------------------------+
+ * | null | null | null | [FBV, bucketSize,
numBuckets] |
+ *
+-------------+------------+-------------+---------------------------------+
+ * | Chicago | 60607 | ... | [value-0, value-1, ...
value-x] |
+ *
+-------------+------------+-------------+---------------------------------+
+ * | San Fran. | 94107 | ... | [value-0, value-1, ...
value-x] |
+ *
+-------------+------------+-------------+---------------------------------+
+ * </pre>
+ * TODO(timeseries): When we support Time Series selection queries, we will
likely need a special column instead of
+ * tags, because one could store data in JSON Blobs and the series may
have different tags/labels.
+ * </p>
+ * <p>
+ * TODO(timeseries): One source of inefficiency is boxing/unboxing of Double
arrays.
+ * TODO(timeseries): The other is tag values being Object[]. We should make
tag values String[].
+ * </p>
+ */
+public class TimeSeriesBlockSerde {
+ /**
+ * Since DataBlock can only handle primitive double[] arrays, we use
Double.MIN_VALUE to represent nulls.
+ * Using Double.MIN_VALUE is better than using Double.NaN since Double.NaN
can help detect divide by 0.
+ * TODO(timeseries): Check if we can get rid of boxed Doubles altogether.
+ */
+ private static final double NULL_PLACEHOLDER = Double.MIN_VALUE;
+
+ private TimeSeriesBlockSerde() {
+ }
+
+ public static TimeSeriesBlock deserializeTimeSeriesBlock(ByteBuffer
readOnlyByteBuffer)
+ throws IOException {
+ DataBlock dataBlock = DataBlockUtils.readFrom(readOnlyByteBuffer);
+ TransferableBlock transferableBlock =
TransferableBlockUtils.wrap(dataBlock);
+ List<String> tagNames =
generateTagNames(Objects.requireNonNull(transferableBlock.getDataSchema(),
+ "Missing data schema in TransferableBlock"));
+ List<Object[]> container = transferableBlock.getContainer();
+ TimeBuckets timeBuckets = timeBucketsFromRow(container.get(0));
+ Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+ for (int index = 1; index < container.size(); index++) {
+ Object[] row = container.get(index);
+ TimeSeries timeSeries = timeSeriesFromRow(tagNames, row, timeBuckets);
+ long seriesId = Long.parseLong(timeSeries.getId());
+ seriesMap.computeIfAbsent(seriesId, x -> new
ArrayList<>()).add(timeSeries);
+ }
+ return new TimeSeriesBlock(timeBuckets, seriesMap);
+ }
+
+ public static ByteString serializeTimeSeriesBlock(TimeSeriesBlock
timeSeriesBlock)
+ throws IOException {
+ TimeBuckets timeBuckets =
Objects.requireNonNull(timeSeriesBlock.getTimeBuckets());
+ List<Object[]> container = new ArrayList<>();
+ DataSchema dataSchema = generateDataSchema(timeSeriesBlock);
+ container.add(timeBucketsToRow(timeBuckets, dataSchema));
+ for (var entry : timeSeriesBlock.getSeriesMap().entrySet()) {
+ for (TimeSeries timeSeries : entry.getValue()) {
+ container.add(timeSeriesToRow(timeSeries, dataSchema));
+ }
+ }
+ TransferableBlock transferableBlock = new TransferableBlock(container,
dataSchema, DataBlock.Type.ROW);
+ return DataBlockUtils.toByteString(transferableBlock.getDataBlock());
+ }
+
+ private static DataSchema generateDataSchema(TimeSeriesBlock
timeSeriesBlock) {
+ TimeSeries sampledTimeSeries =
sampleTimeSeries(timeSeriesBlock).orElse(null);
+ int numTags = sampledTimeSeries == null ? 0 :
sampledTimeSeries.getTagNames().size();
+ ColumnDataType[] dataTypes = new ColumnDataType[numTags + 1];
+ String[] columnNames = new String[numTags + 1];
+ for (int tagIndex = 0; tagIndex < numTags; tagIndex++) {
+ columnNames[tagIndex] = sampledTimeSeries.getTagNames().get(tagIndex);
+ dataTypes[tagIndex] = ColumnDataType.STRING;
+ }
+ columnNames[numTags] = "__ts_values";
+ dataTypes[numTags] = ColumnDataType.DOUBLE_ARRAY;
+ return new DataSchema(columnNames, dataTypes);
+ }
+
+ private static List<String> generateTagNames(DataSchema dataSchema) {
+ String[] columnNames = dataSchema.getColumnNames();
+ List<String> tagNames = new ArrayList<>(columnNames.length - 1);
+ for (int index = 0; index < columnNames.length - 1; index++) {
+ tagNames.add(columnNames[index]);
+ }
+ return tagNames;
+ }
+
+ private static Optional<TimeSeries> sampleTimeSeries(TimeSeriesBlock
timeSeriesBlock) {
+ if (timeSeriesBlock.getSeriesMap().isEmpty()) {
+ return Optional.empty();
+ }
+ List<TimeSeries> timeSeriesList =
timeSeriesBlock.getSeriesMap().values().iterator().next();
+ Preconditions.checkState(!timeSeriesList.isEmpty(), "Found empty
time-series list");
+ return Optional.of(timeSeriesList.get(0));
+ }
+
+ private static Object[] timeBucketsToRow(TimeBuckets timeBuckets, DataSchema
dataSchema) {
+ int numColumns = dataSchema.getColumnNames().length;
+ Object[] result = new Object[numColumns];
+ for (int index = 0; index < numColumns - 1; index++) {
+ result[index] = "null";
+ }
+ double firstBucketValue = timeBuckets.getTimeBuckets()[0];
+ double bucketSizeSeconds = timeBuckets.getBucketSize().getSeconds();
+ double numBuckets = timeBuckets.getNumBuckets();
+ result[numColumns - 1] = new double[]{firstBucketValue, bucketSizeSeconds,
numBuckets};
+ return result;
+ }
+
+ private static TimeBuckets timeBucketsFromRow(Object[] row) {
+ double[] values = (double[]) row[row.length - 1];
+ long fbv = (long) values[0];
+ Duration window = Duration.ofSeconds((long) values[1]);
+ int numBuckets = (int) values[2];
+ return TimeBuckets.ofSeconds(fbv, window, numBuckets);
+ }
+
+ private static Object[] timeSeriesToRow(TimeSeries timeSeries, DataSchema
dataSchema) {
+ int numColumns = dataSchema.getColumnNames().length;
+ Object[] result = new Object[numColumns];
+ for (int index = 0; index < numColumns - 1; index++) {
+ Object tagValue = timeSeries.getTagValues()[index];
+ result[index] = tagValue == null ? "null" : tagValue.toString();
+ }
+ result[numColumns - 1] = unboxDoubleArray(timeSeries.getValues());
+ return result;
+ }
+
+ private static TimeSeries timeSeriesFromRow(List<String> tagNames, Object[]
row, TimeBuckets timeBuckets) {
+ Double[] values = boxDoubleArray((double[]) row[row.length - 1]);
+ Object[] tagValues = new Object[row.length - 1];
+ System.arraycopy(row, 0, tagValues, 0, row.length - 1);
+ return new TimeSeries(Long.toString(TimeSeries.hash(tagValues)), null,
timeBuckets, values, tagNames, tagValues);
+ }
+
+ private static double[] unboxDoubleArray(Double[] values) {
+ double[] result = new double[values.length];
+ for (int index = 0; index < result.length; index++) {
+ result[index] = values[index] == null ? NULL_PLACEHOLDER : values[index];
+ }
+ return result;
+ }
+
+ private static Double[] boxDoubleArray(double[] values) {
+ Double[] result = new Double[values.length];
+ for (int index = 0; index < result.length; index++) {
+ result[index] = values[index] == NULL_PLACEHOLDER ? null : values[index];
+ }
+ return result;
+ }
+}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
new file mode 100644
index 0000000000..c9fd929333
--- /dev/null
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesExchangeReceiveOperatorTest {
+ private static final int NUM_SERVERS_QUERIED = 3;
+ private static final AggInfo SUM_AGG_INFO = new AggInfo("SUM", null);
+ private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000,
Duration.ofSeconds(200), 4);
+ private static final List<String> TAG_NAMES = ImmutableList.of("city",
"zip");
+ private static final Object[] CHICAGO_SERIES_VALUES = new
Object[]{"Chicago", "60605"};
+ private static final Object[] SF_SERIES_VALUES = new Object[]{"San
Francisco", "94107"};
+ private static final Long CHICAGO_SERIES_HASH =
TimeSeries.hash(CHICAGO_SERIES_VALUES);
+ private static final Long SF_SERIES_HASH = TimeSeries.hash(SF_SERIES_VALUES);
+ private static final SimpleTimeSeriesBuilderFactory SERIES_BUILDER_FACTORY =
new SimpleTimeSeriesBuilderFactory();
+
+ @Test
+ public void testGetNextBlockWithAggregation() {
+ // Setup test
+ long deadlineMs = Long.MAX_VALUE;
+ ArrayBlockingQueue<Object> blockingQueue = new
ArrayBlockingQueue<>(NUM_SERVERS_QUERIED);
+ blockingQueue.addAll(generateTimeSeriesBlocks());
+ TimeSeriesExchangeReceiveOperator operator = new
TimeSeriesExchangeReceiveOperator(blockingQueue, deadlineMs,
+ NUM_SERVERS_QUERIED, SUM_AGG_INFO, SERIES_BUILDER_FACTORY);
+ // Run test
+ TimeSeriesBlock block = operator.nextBlock();
+ // Validate results
+ assertEquals(block.getSeriesMap().size(), 2);
+ assertTrue(block.getSeriesMap().containsKey(CHICAGO_SERIES_HASH), "Chicago
series not present in received block");
+ assertTrue(block.getSeriesMap().containsKey(SF_SERIES_HASH), "SF series
not present in received block");
+ assertEquals(block.getSeriesMap().get(CHICAGO_SERIES_HASH).size(), 1,
"Expected 1 series for Chicago");
+ assertEquals(block.getSeriesMap().get(SF_SERIES_HASH).size(), 1, "Expected
1 series for SF");
+ // Ensure Chicago had series addition performed
+ Double[] chicagoSeriesValues =
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getValues();
+ assertEquals(chicagoSeriesValues, new Double[]{20.0, 20.0, 20.0, 20.0});
+ // Ensure SF had input series unmodified
+ Double[] sanFranciscoSeriesValues =
block.getSeriesMap().get(SF_SERIES_HASH).get(0).getValues();
+ assertEquals(sanFranciscoSeriesValues, new Double[]{10.0, 10.0, 10.0,
10.0});
+ }
+
+ @Test
+ public void testGetNextBlockNoAggregation() {
+ // Setup test
+ long deadlineMs = Long.MAX_VALUE;
+ ArrayBlockingQueue<Object> blockingQueue = new
ArrayBlockingQueue<>(NUM_SERVERS_QUERIED);
+ blockingQueue.addAll(generateTimeSeriesBlocks());
+ TimeSeriesExchangeReceiveOperator operator = new
TimeSeriesExchangeReceiveOperator(blockingQueue, deadlineMs,
+ NUM_SERVERS_QUERIED, null, SERIES_BUILDER_FACTORY);
+ // Run test
+ TimeSeriesBlock block = operator.nextBlock();
+ // Validate results
+ assertEquals(block.getSeriesMap().size(), 2);
+ assertTrue(block.getSeriesMap().containsKey(CHICAGO_SERIES_HASH), "Chicago
series not present in received block");
+ assertTrue(block.getSeriesMap().containsKey(SF_SERIES_HASH), "SF series
not present in received block");
+ assertEquals(block.getSeriesMap().get(CHICAGO_SERIES_HASH).size(), 2,
"Expected 2 series for Chicago");
+ assertEquals(block.getSeriesMap().get(SF_SERIES_HASH).size(), 1, "Expected
1 series for SF");
+ // Ensure Chicago has unmodified series values
+ Double[] firstChicagoSeriesValues =
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getValues();
+ Double[] secondChicagoSeriesValues =
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(1).getValues();
+ assertEquals(firstChicagoSeriesValues, new Double[]{10.0, 10.0, 10.0,
10.0});
+ assertEquals(secondChicagoSeriesValues, new Double[]{10.0, 10.0, 10.0,
10.0});
+ // Ensure SF has input unmodified series values
+ Double[] sanFranciscoSeriesValues =
block.getSeriesMap().get(SF_SERIES_HASH).get(0).getValues();
+ assertEquals(sanFranciscoSeriesValues, new Double[]{10.0, 10.0, 10.0,
10.0});
+ }
+
+ @Test
+ public void testGetNextBlockFailure() {
+ // Setup test
+ long deadlineMs = Long.MAX_VALUE;
+ ArrayBlockingQueue<Object> blockingQueue = new
ArrayBlockingQueue<>(NUM_SERVERS_QUERIED);
+ blockingQueue.add(new TimeoutException("Test error"));
+ // Run test with aggregation
+ try {
+ TimeSeriesExchangeReceiveOperator operator = new
TimeSeriesExchangeReceiveOperator(blockingQueue, deadlineMs,
+ NUM_SERVERS_QUERIED, SUM_AGG_INFO, SERIES_BUILDER_FACTORY);
+ TimeSeriesBlock block = operator.nextBlock();
+ fail();
+ } catch (Throwable t) {
+ assertEquals(t.getMessage(), "Test error");
+ }
+ blockingQueue.add(new TimeoutException("Test error"));
+ try {
+ TimeSeriesExchangeReceiveOperator operator = new
TimeSeriesExchangeReceiveOperator(blockingQueue, deadlineMs,
+ NUM_SERVERS_QUERIED, null, SERIES_BUILDER_FACTORY);
+ TimeSeriesBlock block = operator.nextBlock();
+ fail();
+ } catch (Throwable t) {
+ assertEquals(t.getMessage(), "Test error");
+ }
+ }
+
+ private List<TimeSeriesBlock> generateTimeSeriesBlocks() {
+ List<TimeSeriesBlock> seriesBlocks = new ArrayList<>();
+ {
+ Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+ seriesMap.put(CHICAGO_SERIES_HASH,
ImmutableList.of(createChicagoSeries(new Double[]{10.0, 10.0, 10.0, 10.0})));
+ seriesMap.put(SF_SERIES_HASH,
ImmutableList.of(createSanFranciscoSeries(new Double[]{10.0, 10.0, 10.0,
10.0})));
+ seriesBlocks.add(new TimeSeriesBlock(TIME_BUCKETS, seriesMap));
+ }
+ {
+ Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+ seriesMap.put(CHICAGO_SERIES_HASH,
ImmutableList.of(createChicagoSeries(new Double[]{10.0, 10.0, 10.0, 10.0})));
+ seriesBlocks.add(new TimeSeriesBlock(TIME_BUCKETS, seriesMap));
+ }
+ {
+ Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+ seriesBlocks.add(new TimeSeriesBlock(TIME_BUCKETS, seriesMap));
+ }
+ // Shuffle the output to test multiple scenarios over time
+ Collections.shuffle(seriesBlocks);
+ return seriesBlocks;
+ }
+
+ private TimeSeries createChicagoSeries(Double[] values) {
+ return new TimeSeries(CHICAGO_SERIES_HASH.toString(), null, TIME_BUCKETS,
values, TAG_NAMES, CHICAGO_SERIES_VALUES);
+ }
+
+ private TimeSeries createSanFranciscoSeries(Double[] values) {
+ return new TimeSeries(SF_SERIES_HASH.toString(), null, TIME_BUCKETS,
values, TAG_NAMES, SF_SERIES_VALUES);
+ }
+}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
new file mode 100644
index 0000000000..f08d39ca0a
--- /dev/null
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries.serde;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesBlockSerdeTest {
+ private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000,
Duration.ofSeconds(200), 5);
+
+ @Test
+ public void testSerde()
+ throws IOException {
+ // To test serde of TimeSeriesBlock, we do the following:
+ // 1. Serialize the time-series block (say Block-1) to get ByteString-1
+ // 2. Deserialize ByteString-1 to get Block-2.
+ // 3. Serialize Block-2 to get ByteString-2.
+ // 4. Compare ByteString-1 and ByteString-2.
+ // 5. Compare values of Block-1 and Block-2.
+ List<TimeSeriesBlock> blocks = List.of(buildBlockWithNoTags(),
buildBlockWithSingleTag(),
+ buildBlockWithMultipleTags());
+ for (TimeSeriesBlock block1 : blocks) {
+ // Serialize, deserialize and serialize again
+ ByteString byteString1 =
TimeSeriesBlockSerde.serializeTimeSeriesBlock(block1);
+ String serializedBlockString1 = byteString1.toStringUtf8();
+ TimeSeriesBlock block2 =
TimeSeriesBlockSerde.deserializeTimeSeriesBlock(byteString1.asReadOnlyByteBuffer());
+ String serializedBlockString2 =
TimeSeriesBlockSerde.serializeTimeSeriesBlock(block2).toStringUtf8();
+ // Serialized blocks in both cases should be the same since
serialization is deterministic.
+ assertEquals(serializedBlockString1, serializedBlockString2);
+ // Compare block1 and block2
+ compareBlocks(block1, block2);
+ }
+ }
+
+ /**
+ * Compares time series blocks in a way which makes it easy to debug test
failures when/if they happen in CI.
+ */
+ private static void compareBlocks(TimeSeriesBlock block1, TimeSeriesBlock
block2) {
+ assertEquals(block1.getTimeBuckets(), block2.getTimeBuckets(), "Time
buckets are different across blocks");
+ assertEquals(block1.getSeriesMap().size(), block2.getSeriesMap().size(),
String.format(
+ "Different number of series in blocks: %s and %s",
block1.getSeriesMap().size(), block2.getSeriesMap().size()));
+ assertEquals(block1.getSeriesMap().keySet(),
block2.getSeriesMap().keySet(),
+ String.format("Series blocks have different keys: %s vs %s",
+ block1.getSeriesMap().keySet(), block2.getSeriesMap().keySet()));
+ for (long seriesHash : block1.getSeriesMap().keySet()) {
+ List<TimeSeries> seriesList1 = block1.getSeriesMap().get(seriesHash);
+ List<TimeSeries> seriesList2 = block2.getSeriesMap().get(seriesHash);
+ compareTimeSeries(seriesList1, seriesList2);
+ }
+ }
+
+ private static void compareTimeSeries(List<TimeSeries> series1,
List<TimeSeries> series2) {
+ assertEquals(series1.size(), series2.size(),
+ String.format("Different count of series with the same id: %s vs %s",
series1.size(), series2.size()));
+ for (int index = 0; index < series1.size(); index++) {
+ TimeSeries seriesOne = series1.get(index);
+ TimeSeries seriesTwo = series2.get(index);
+ assertEquals(seriesOne.getTagNames(), seriesTwo.getTagNames());
+ assertEquals(seriesOne.getValues(), seriesTwo .getValues());
+ }
+ }
+
+ private static TimeSeriesBlock buildBlockWithNoTags() {
+ TimeBuckets timeBuckets = TIME_BUCKETS;
+ // Single series: []
+ List<String> tagNames = Collections.emptyList();
+ Object[] seriesValues = new Object[0];
+ long seriesHash = TimeSeries.hash(seriesValues);
+ Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+ seriesMap.put(seriesHash, ImmutableList.of(new
TimeSeries(Long.toString(seriesHash), null, timeBuckets,
+ new Double[]{null, 123.0, 0.0, 1.0}, tagNames, seriesValues)));
+ return new TimeSeriesBlock(timeBuckets, seriesMap);
+ }
+
+ private static TimeSeriesBlock buildBlockWithSingleTag() {
+ TimeBuckets timeBuckets = TIME_BUCKETS;
+ // Series are: [cityId=Chicago] and [cityId=San Francisco]
+ List<String> tagNames = ImmutableList.of("cityId");
+ Object[] seriesOneValues = new Object[]{"Chicago"};
+ Object[] seriesTwoValues = new Object[]{"San Francisco"};
+ long seriesOneHash = TimeSeries.hash(seriesOneValues);
+ long seriesTwoHash = TimeSeries.hash(seriesTwoValues);
+ Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+ seriesMap.put(seriesOneHash, ImmutableList.of(new
TimeSeries(Long.toString(seriesOneHash), null, timeBuckets,
+ new Double[]{null, 123.0, 0.0, 1.0}, tagNames, seriesOneValues)));
+ seriesMap.put(seriesTwoHash, ImmutableList.of(new
TimeSeries(Long.toString(seriesTwoHash), null, timeBuckets,
+ new Double[]{null, null, null, null}, tagNames, seriesTwoValues)));
+ return new TimeSeriesBlock(timeBuckets, seriesMap);
+ }
+
+ private static TimeSeriesBlock buildBlockWithMultipleTags() {
+ TimeBuckets timeBuckets = TIME_BUCKETS;
+ // Series are: [cityId=Chicago, zip=60605] and [cityId=San Francisco,
zip=94107]
+ List<String> tagNames = ImmutableList.of("cityId", "zip");
+ Object[] seriesOneValues = new Object[]{"Chicago", "60605"};
+ Object[] seriesTwoValues = new Object[]{"San Francisco", "94107"};
+ long seriesOneHash = TimeSeries.hash(seriesOneValues);
+ long seriesTwoHash = TimeSeries.hash(seriesTwoValues);
+ Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+ seriesMap.put(seriesOneHash, ImmutableList.of(new
TimeSeries(Long.toString(seriesOneHash), null, timeBuckets,
+ new Double[]{null, 123.0, Double.NaN, 1.0}, tagNames,
seriesOneValues)));
+ seriesMap.put(seriesTwoHash, ImmutableList.of(new
TimeSeries(Long.toString(seriesTwoHash), null, timeBuckets,
+ new Double[]{Double.NaN, -1.0, -1231231.0, 3.14}, tagNames,
seriesTwoValues)));
+ return new TimeSeriesBlock(timeBuckets, seriesMap);
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesExchangeNode.java
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesExchangeNode.java
new file mode 100644
index 0000000000..399a2bd28d
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesExchangeNode.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.planner;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
+
+
+/**
+ * This node exists in the logical plan, but not in the physical/dispatchable
plans. Similar to the
+ * {@link LeafTimeSeriesPlanNode}, a physical plan visitor will convert to its
equivalent physical plan node, which will
+ * be capable of returning an executable operator with the {@link #run()}
method.
+ * <br />
+ * <b>Note:</b> This node doesn't exist in the pinot-timeseries-spi because we
don't want to let language developers
+ * control how and when exchange will be run (as of now).
+ */
+public class TimeSeriesExchangeNode extends BaseTimeSeriesPlanNode {
+ @Nullable
+ private final AggInfo _aggInfo;
+
+ @JsonCreator
+ public TimeSeriesExchangeNode(@JsonProperty("id") String id,
+ @JsonProperty("inputs") List<BaseTimeSeriesPlanNode> inputs,
+ @Nullable @JsonProperty("aggInfo") AggInfo aggInfo) {
+ super(id, inputs);
+ _aggInfo = aggInfo;
+ }
+
+ @Nullable
+ public AggInfo getAggInfo() {
+ return _aggInfo;
+ }
+
+ @Override
+ public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode>
newInputs) {
+ return new TimeSeriesExchangeNode(_id, newInputs, _aggInfo);
+ }
+
+ @Override
+ public String getKlass() {
+ return TimeSeriesExchangeNode.class.getName();
+ }
+
+ @Override
+ public String getExplainName() {
+ return "TIME_SERIES_BROKER_RECEIVE";
+ }
+
+ @Override
+ public BaseTimeSeriesOperator run() {
+ throw new IllegalStateException("Time Series Exchange should have been
replaced with a physical plan node");
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
new file mode 100644
index 0000000000..46a3f68c31
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.planner;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
+
+
+/**
+ * Fragments the plan into executable units. Since we only support
Broker-Reduce for Time Series Queries at present,
+ * we will have 1 fragment for the broker, and 1 fragment for each {@link
LeafTimeSeriesPlanNode}.
+ * <p>
+ * As an example, say we have the following plan:
+ * <pre>
+ * +------------+
+ * | Node-1 |
+ * +------------+
+ * / \
+ * +------------+ +------------+
+ * | Node-2 | | Leaf-2 |
+ * +------------+ +------------+
+ * /
+ * +------------+
+ * | Leaf-1 |
+ * +------------+
+ * </pre>
+ * The plan fragmenter will convert this to:
+ * <pre>
+ * This is fragment-1, aka the Broker's plan fragment:
+ *
+ * +------------+
+ * | Node-1 |
+ * +------------+
+ * / \
+ * +------------+ +------------+
+ * | Node-2 | | Exchange |
+ * +------------+ +------------+
+ * /
+ * +------------+
+ * | Exchange |
+ * +------------+
+ * </pre>
+ * <pre>
+ * This is fragment-2:
+ * +------------+
+ * | Leaf-1 |
+ * +------------+
+ * </pre>
+ * <pre>
+ * This is fragment-3:
+ * +------------+
+ * | Leaf-2 |
+ * +------------+
+ * </pre>
+ * </p>
+ */
+public class TimeSeriesPlanFragmenter {
+ private TimeSeriesPlanFragmenter() {
+ }
+
+ /**
+ * Fragments the plan as described in {@link TimeSeriesPlanFragmenter}. The
first element of the list is the broker
+ * fragment, and the other elements are the server fragments. For
single-node queries, this pushes down the entire
+ * plan to the servers.
+ * <p>
+ * <b>Note:</b> This method may return cloned plan nodes, so you should
use them as the plan subsequently.
+ * </p>
+ */
+ public static List<BaseTimeSeriesPlanNode>
getFragments(BaseTimeSeriesPlanNode rootNode,
+ boolean isSingleServerQuery) {
+ List<BaseTimeSeriesPlanNode> result = new ArrayList<>();
+ Context context = new Context();
+ if (isSingleServerQuery) {
+ final String id = rootNode.getId();
+ return ImmutableList.of(new TimeSeriesExchangeNode(id,
Collections.emptyList(), null), rootNode);
+ }
+ result.add(fragmentRecursively(rootNode, context));
+ result.addAll(context._fragments);
+ return result;
+ }
+
+ private static BaseTimeSeriesPlanNode
fragmentRecursively(BaseTimeSeriesPlanNode planNode, Context context) {
+ if (planNode instanceof LeafTimeSeriesPlanNode) {
+ LeafTimeSeriesPlanNode leafNode = (LeafTimeSeriesPlanNode) planNode;
+ context._fragments.add(leafNode.withInputs(Collections.emptyList()));
+ return new TimeSeriesExchangeNode(planNode.getId(),
Collections.emptyList(), leafNode.getAggInfo());
+ }
+ List<BaseTimeSeriesPlanNode> newInputs = new ArrayList<>();
+ for (BaseTimeSeriesPlanNode input : planNode.getInputs()) {
+ newInputs.add(fragmentRecursively(input, context));
+ }
+ return planNode.withInputs(newInputs);
+ }
+
+ private static class Context {
+ private final List<BaseTimeSeriesPlanNode> _fragments = new ArrayList<>();
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
b/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
new file mode 100644
index 0000000000..8727f64ddc
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.planner;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesPlanFragmenterTest {
+ @Test
+ public void testGetFragmentsWithMultipleLeafNodes() {
+ /*
+ * Create Input:
+ * Node-1
+ * / \
+ * Node-2 Leaf-2
+ * /
+ * Leaf-1
+ * Expected Outputs:
+ * Fragment-1:
+ * Node-1
+ * / \
+ * Node-2 Exchange (named Leaf-2)
+ * /
+ * Exchange (named Leaf-1)
+ * Fragment-2:
+ * Leaf-1
+ * Fragment-3:
+ * Leaf-2
+ */
+ LeafTimeSeriesPlanNode leafOne = createMockLeafNode("Leaf-1");
+ LeafTimeSeriesPlanNode leafTwo = createMockLeafNode("Leaf-2");
+ BaseTimeSeriesPlanNode nodeTwo = new MockTimeSeriesPlanNode("Node-2",
Collections.singletonList(leafOne));
+ BaseTimeSeriesPlanNode nodeOne = new MockTimeSeriesPlanNode("Node-1",
ImmutableList.of(nodeTwo, leafTwo));
+ List<BaseTimeSeriesPlanNode> fragments =
TimeSeriesPlanFragmenter.getFragments(nodeOne, false);
+ // Test whether correct number of fragments generated
+ assertEquals(fragments.size(), 3);
+ // Test whether fragment roots are correct
+ assertEquals(fragments.get(0).getId(), "Node-1");
+ assertEquals(fragments.get(1).getId(), "Leaf-1");
+ assertEquals(fragments.get(2).getId(), "Leaf-2");
+ // Test whether broker fragment has the right inputs
+ {
+ BaseTimeSeriesPlanNode brokerFragment = fragments.get(0);
+ assertEquals(brokerFragment.getInputs().size(), 2);
+ // Left and right inputs should have IDs Node-2 and Leaf-2.
+ BaseTimeSeriesPlanNode leftInput = brokerFragment.getInputs().get(0);
+ BaseTimeSeriesPlanNode rightInput = brokerFragment.getInputs().get(1);
+ assertEquals(leftInput.getId(), "Node-2");
+ assertEquals(rightInput.getId(), "Leaf-2");
+ // Right input should be exchange
+ assertTrue(rightInput instanceof TimeSeriesExchangeNode, "Node should
have been replaced by Exchange");
+ // Input for Left input should be exchange
+ assertEquals(leftInput.getInputs().size(), 1);
+ assertEquals(leftInput.getInputs().get(0).getId(), "Leaf-1");
+ assertTrue(leftInput.getInputs().get(0) instanceof
TimeSeriesExchangeNode);
+ }
+ // Test the other two fragments
+ assertTrue(fragments.get(1) instanceof LeafTimeSeriesPlanNode, "Expected
leaf node in fragment");
+ assertTrue(fragments.get(2) instanceof LeafTimeSeriesPlanNode, "Expected
leaf node in fragment");
+ }
+
+ @Test
+ public void testGetFragmentsForSingleServerQuery() {
+ /*
+ * Create Input:
+ * Node-1
+ * / \
+ * Node-2 Leaf-2
+ * /
+ * Leaf-1
+ * Expected Outputs:
+ * Fragment-1:
+ * Node-1
+ * / \
+ * Node-2 Exchange (named Leaf-2)
+ * /
+ * Exchange (named Leaf-1)
+ * Fragment-2:
+ * Leaf-1
+ * Fragment-3:
+ * Leaf-2
+ */
+ LeafTimeSeriesPlanNode leafOne = createMockLeafNode("Leaf-1");
+ LeafTimeSeriesPlanNode leafTwo = createMockLeafNode("Leaf-2");
+ BaseTimeSeriesPlanNode nodeTwo = new MockTimeSeriesPlanNode("Node-2",
Collections.singletonList(leafOne));
+ BaseTimeSeriesPlanNode nodeOne = new MockTimeSeriesPlanNode("Node-1",
ImmutableList.of(nodeTwo, leafTwo));
+ List<BaseTimeSeriesPlanNode> fragments =
TimeSeriesPlanFragmenter.getFragments(nodeOne, true);
+ assertEquals(fragments.size(), 2, "Expect only 2 fragments for
single-server query");
+ assertEquals(fragments.get(0).getId(), "Node-1");
+ assertEquals(fragments.get(1), nodeOne);
+ }
+
+ @Test
+ public void testGetFragmentsWithSinglePlanNode() {
+ /*
+ * Create Input:
+ * Leaf-1
+ * Expected Outputs:
+ * Fragment-1:
+ * Exchange (named Leaf-1)
+ * Fragment-2:
+ * Leaf-1
+ */
+ LeafTimeSeriesPlanNode leafOne = createMockLeafNode("Leaf-1");
+ List<BaseTimeSeriesPlanNode> fragments =
TimeSeriesPlanFragmenter.getFragments(leafOne, false);
+ assertEquals(fragments.size(), 2);
+ assertTrue(fragments.get(0) instanceof TimeSeriesExchangeNode);
+ assertTrue(fragments.get(1) instanceof LeafTimeSeriesPlanNode);
+ assertEquals(fragments.get(0).getId(), fragments.get(1).getId());
+ }
+
+ private LeafTimeSeriesPlanNode createMockLeafNode(String id) {
+ return new LeafTimeSeriesPlanNode(id, Collections.emptyList(),
"someTableName", "someTimeColumn",
+ TimeUnit.SECONDS, 0L, "", "", null, Collections.emptyList());
+ }
+
+ static class MockTimeSeriesPlanNode extends BaseTimeSeriesPlanNode {
+ public MockTimeSeriesPlanNode(String id, List<BaseTimeSeriesPlanNode>
inputs) {
+ super(id, inputs);
+ }
+
+ @Override
+ public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode>
newInputs) {
+ return new MockTimeSeriesPlanNode(_id, newInputs);
+ }
+
+ @Override
+ public String getKlass() {
+ return "";
+ }
+
+ @Override
+ public String getExplainName() {
+ return "";
+ }
+
+ @Override
+ public BaseTimeSeriesOperator run() {
+ return null;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]