This is an automated email from the ASF dual-hosted git repository.

pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b5858b5b4 [timeseries] Part-3: Add Time Series Exchange Operator, 
Plan Node and Serde (#14611)
1b5858b5b4 is described below

commit 1b5858b5b4b5dffe45d5af73f3604d85bfa36132
Author: Ankit Sultana <[email protected]>
AuthorDate: Tue Dec 10 11:39:27 2024 -0600

    [timeseries] Part-3: Add Time Series Exchange Operator, Plan Node and Serde 
(#14611)
    
    * Part-3: Add Time Series Exchange Operator, Plan Node and Serde
    
    * add tests + fragmenter
    
    * add license
    
    * address feedback
    
    * address more feedback
---
 .../TimeSeriesExchangeReceiveOperator.java         | 182 +++++++++++++++++++
 .../TimeSeriesExchangeReceivePlanNode.java         |  74 ++++++++
 .../timeseries/serde/TimeSeriesBlockSerde.java     | 201 +++++++++++++++++++++
 .../TimeSeriesExchangeReceiveOperatorTest.java     | 156 ++++++++++++++++
 .../timeseries/serde/TimeSeriesBlockSerdeTest.java | 135 ++++++++++++++
 .../pinot/tsdb/planner/TimeSeriesExchangeNode.java |  75 ++++++++
 .../tsdb/planner/TimeSeriesPlanFragmenter.java     | 118 ++++++++++++
 .../tsdb/planner/TimeSeriesPlanFragmenterTest.java | 167 +++++++++++++++++
 8 files changed, 1108 insertions(+)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
new file mode 100644
index 0000000000..79d49e0b46
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
+
+
+/**
+ * <h2>Overview</h2>
+ * Receives and optionally aggregates the response from all servers for the 
corresponding plan node.
+ *
+ * <h3>Aggregate Receive</h3>
+ * When a non-null {@link AggInfo} is passed, this operator will aggregate the 
received data using the corresponding
+ * series builder created via {@link TimeSeriesBuilderFactory}.
+ *
+ * <h3>Non-Aggregate Receive</h3>
+ * When a null AggInfo is passed, then we don't perform any aggregation. If we 
receive series with the same ID from
+ * different servers, we will simply append them to the list, creating a union.
+ */
+public class TimeSeriesExchangeReceiveOperator extends BaseTimeSeriesOperator {
+  /**
+   * Receiver will receive either TimeSeriesBlock or Throwable. And will have 
at most _numServersQueried objects that
+   * can be polled.
+   */
+  private final BlockingQueue<Object> _receiver;
+  private final long _deadlineMs;
+  private final int _numServersQueried;
+  @Nullable
+  private final AggInfo _aggInfo;
+  private final TimeSeriesBuilderFactory _factory;
+
+  public TimeSeriesExchangeReceiveOperator(BlockingQueue<Object> receiver, 
long deadlineMs, int numServersQueried,
+      @Nullable AggInfo aggInfo, TimeSeriesBuilderFactory 
seriesBuilderFactory) {
+    super(Collections.emptyList());
+    Preconditions.checkArgument(numServersQueried > 0, "No servers to query in 
receive operator");
+    _receiver = receiver;
+    _deadlineMs = deadlineMs;
+    _numServersQueried = numServersQueried;
+    _aggInfo = aggInfo;
+    _factory = seriesBuilderFactory;
+  }
+
+  @Override
+  public TimeSeriesBlock getNextBlock() {
+    try {
+      if (_aggInfo == null) {
+        return getNextBlockNoAggregation();
+      } else {
+        return getNextBlockWithAggregation();
+      }
+    } catch (Throwable t) {
+      throw new RuntimeException(t.getMessage(), t);
+    }
+  }
+
+  @Override
+  public String getExplainName() {
+    return "TIME_SERIES_EXCHANGE_RECEIVE";
+  }
+
+  @VisibleForTesting
+  protected Object poll(long remainingTimeMs)
+      throws InterruptedException {
+    return _receiver.poll(remainingTimeMs, TimeUnit.MILLISECONDS);
+  }
+
+  private TimeSeriesBlock getNextBlockWithAggregation()
+      throws Throwable {
+    TimeBuckets timeBuckets = null;
+    Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>();
+    for (int index = 0; index < _numServersQueried; index++) {
+      // Step-1: Poll, and ensure we received a TimeSeriesBlock.
+      long remainingTimeMs = _deadlineMs - System.currentTimeMillis();
+      Preconditions.checkState(remainingTimeMs > 0,
+          "Timed out before polling all servers. Successfully Polled: %s of 
%s", index, _numServersQueried);
+      Object result = poll(remainingTimeMs);
+      Preconditions.checkNotNull(result, "Timed out waiting for response. 
Waited: %s ms", remainingTimeMs);
+      if (result instanceof Throwable) {
+        throw (Throwable) result;
+      }
+      Preconditions.checkState(result instanceof TimeSeriesBlock,
+          "Found unexpected object. This is a bug: %s", result.getClass());
+      // Step-2: Init timeBuckets and ensure they are the same across all 
servers.
+      TimeSeriesBlock blockToMerge = (TimeSeriesBlock) result;
+      if (timeBuckets == null) {
+        timeBuckets = blockToMerge.getTimeBuckets();
+      } else {
+        
Preconditions.checkState(timeBuckets.equals(blockToMerge.getTimeBuckets()),
+            "Found unequal time buckets from server response");
+      }
+      // Step-3: Merge new block with existing block.
+      for (var entry : blockToMerge.getSeriesMap().entrySet()) {
+        long seriesHash = entry.getKey();
+        List<TimeSeries> currentSeriesList = entry.getValue();
+        TimeSeries sampledTimeSeries = currentSeriesList.get(0);
+        // Init seriesBuilder if required
+        BaseTimeSeriesBuilder seriesBuilder = seriesBuilderMap.get(seriesHash);
+        if (seriesBuilder == null) {
+          seriesBuilder = _factory.newTimeSeriesBuilder(
+              _aggInfo, Long.toString(seriesHash), timeBuckets, 
sampledTimeSeries.getTagNames(),
+              sampledTimeSeries.getTagValues());
+          seriesBuilderMap.put(seriesHash, seriesBuilder);
+        }
+        for (TimeSeries timeSeries : currentSeriesList) {
+          seriesBuilder.mergeAlignedSeries(timeSeries);
+        }
+      }
+    }
+    // Convert series builders to series and return.
+    Map<Long, List<TimeSeries>> seriesMap = new 
HashMap<>(seriesBuilderMap.size());
+    for (var entry : seriesBuilderMap.entrySet()) {
+      long seriesHash = entry.getKey();
+      List<TimeSeries> timeSeriesList = new ArrayList<>();
+      timeSeriesList.add(entry.getValue().build());
+      seriesMap.put(seriesHash, timeSeriesList);
+    }
+    return new TimeSeriesBlock(timeBuckets, seriesMap);
+  }
+
+  private TimeSeriesBlock getNextBlockNoAggregation()
+      throws Throwable {
+    Map<Long, List<TimeSeries>> timeSeriesMap = new HashMap<>();
+    TimeBuckets timeBuckets = null;
+    for (int index = 0; index < _numServersQueried; index++) {
+      long remainingTimeMs = _deadlineMs - System.currentTimeMillis();
+      Preconditions.checkState(remainingTimeMs > 0, "Timed out before polling 
exchange receive");
+      Object result = _receiver.poll(remainingTimeMs, TimeUnit.MILLISECONDS);
+      Preconditions.checkNotNull(result, "Timed out waiting for response. 
Waited: %s ms", remainingTimeMs);
+      if (result instanceof Throwable) {
+        throw ((Throwable) result);
+      }
+      Preconditions.checkState(result instanceof TimeSeriesBlock,
+          "Found unexpected object. This is a bug: %s", result.getClass());
+      TimeSeriesBlock blockToMerge = (TimeSeriesBlock) result;
+      if (timeBuckets == null) {
+        timeBuckets = blockToMerge.getTimeBuckets();
+      } else {
+        
Preconditions.checkState(timeBuckets.equals(blockToMerge.getTimeBuckets()),
+            "Found unequal time buckets from server response");
+      }
+      for (var entry : blockToMerge.getSeriesMap().entrySet()) {
+        long seriesHash = entry.getKey();
+        List<TimeSeries> timeSeriesList = entry.getValue();
+        timeSeriesMap.computeIfAbsent(seriesHash, x -> new 
ArrayList<>()).addAll(timeSeriesList);
+      }
+    }
+    Preconditions.checkNotNull(timeBuckets, "Time buckets is null in exchange 
receive operator");
+    return new TimeSeriesBlock(timeBuckets, timeSeriesMap);
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceivePlanNode.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceivePlanNode.java
new file mode 100644
index 0000000000..42416563e5
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceivePlanNode.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import javax.annotation.Nullable;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
+
+
+/**
+ * With the broker-reduce mode in Time Series Engine, this node becomes the 
leaf stage for the broker. In other words,
+ * the plan fragment that runs in the broker will always have this node in the 
leaves.
+ */
+public class TimeSeriesExchangeReceivePlanNode extends BaseTimeSeriesPlanNode {
+  private final long _deadlineMs;
+  private final AggInfo _aggInfo;
+  private final TimeSeriesBuilderFactory _factory;
+  private BlockingQueue<Object> _receiver;
+  private int _numServersQueried;
+
+  public TimeSeriesExchangeReceivePlanNode(String id, long deadlineMs, 
@Nullable AggInfo aggInfo,
+      TimeSeriesBuilderFactory factory) {
+    super(id, Collections.emptyList());
+    _deadlineMs = deadlineMs;
+    _aggInfo = aggInfo;
+    _factory = factory;
+  }
+
+  public void init(BlockingQueue<Object> receiver, int numExpectedBlocks) {
+    _receiver = receiver;
+    _numServersQueried = numExpectedBlocks;
+  }
+
+  @Override
+  public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode> 
newInputs) {
+    return new TimeSeriesExchangeReceivePlanNode(_id, _deadlineMs, _aggInfo, 
_factory);
+  }
+
+  @Override
+  public String getKlass() {
+    return TimeSeriesExchangeReceivePlanNode.class.getName();
+  }
+
+  @Override
+  public String getExplainName() {
+    return "TIME_SERIES_EXCHANGE_RECEIVE";
+  }
+
+  @Override
+  public BaseTimeSeriesOperator run() {
+    return new TimeSeriesExchangeReceiveOperator(_receiver, _deadlineMs, 
_numServersQueried, _aggInfo, _factory);
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
new file mode 100644
index 0000000000..cdbf668123
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries.serde;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+/**
+ * Implements a simple Serde mechanism for the Time Series Block. This is used 
for transferring data between servers
+ * and brokers. The approach is to use a {@link TransferableBlock} and rely on 
the existing serialization code to avoid
+ * re-inventing the wheel. Once the time-series engine coalesces with the 
Multistage Engine, we will anyway use
+ * TransferableBlock for data transfers.
+ * <p>
+ *   The {@link TimeSeriesBlock} is converted to and from a table, where the 
first row contains information about the
+ *   time-buckets. For each tag/label in the query, there's a dedicated 
column, and the Double values are stored in
+ *   the last column. As an example, consider the following, where FBV 
represents the first bucket value of TimeBuckets.
+ *   <pre>
+ *     
+-------------+------------+-------------+---------------------------------+
+ *     | tag-0       | tag-1      | tag-n       | values                       
   |
+ *     
+-------------+------------+-------------+---------------------------------+
+ *     | null        | null       | null        | [FBV, bucketSize, 
numBuckets]   |
+ *     
+-------------+------------+-------------+---------------------------------+
+ *     | Chicago     | 60607      | ...         | [value-0, value-1, ... 
value-x] |
+ *     
+-------------+------------+-------------+---------------------------------+
+ *     | San Fran.   | 94107      | ...         | [value-0, value-1, ... 
value-x] |
+ *     
+-------------+------------+-------------+---------------------------------+
+ *   </pre>
+ *   TODO(timeseries): When we support Time Series selection queries, we will 
likely need a special column instead of
+ *     tags, because one could store data in JSON Blobs and the series may 
have different tags/labels.
+ * </p>
+ * <p>
+ *  TODO(timeseries): One source of inefficiency is boxing/unboxing of Double 
arrays.
+ *  TODO(timeseries): The other is tag values being Object[]. We should make 
tag values String[].
+ * </p>
+ */
+public class TimeSeriesBlockSerde {
+  /**
+   * Since DataBlock can only handle primitive double[] arrays, we use 
Double.MIN_VALUE to represent nulls.
+   * Using Double.MIN_VALUE is better than using Double.NaN since Double.NaN 
can help detect divide by 0.
+   * TODO(timeseries): Check if we can get rid of boxed Doubles altogether.
+   */
+  private static final double NULL_PLACEHOLDER = Double.MIN_VALUE;
+
+  private TimeSeriesBlockSerde() {
+  }
+
+  public static TimeSeriesBlock deserializeTimeSeriesBlock(ByteBuffer 
readOnlyByteBuffer)
+      throws IOException {
+    DataBlock dataBlock = DataBlockUtils.readFrom(readOnlyByteBuffer);
+    TransferableBlock transferableBlock = 
TransferableBlockUtils.wrap(dataBlock);
+    List<String> tagNames = 
generateTagNames(Objects.requireNonNull(transferableBlock.getDataSchema(),
+        "Missing data schema in TransferableBlock"));
+    List<Object[]> container = transferableBlock.getContainer();
+    TimeBuckets timeBuckets = timeBucketsFromRow(container.get(0));
+    Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+    for (int index = 1; index < container.size(); index++) {
+      Object[] row = container.get(index);
+      TimeSeries timeSeries = timeSeriesFromRow(tagNames, row, timeBuckets);
+      long seriesId = Long.parseLong(timeSeries.getId());
+      seriesMap.computeIfAbsent(seriesId, x -> new 
ArrayList<>()).add(timeSeries);
+    }
+    return new TimeSeriesBlock(timeBuckets, seriesMap);
+  }
+
+  public static ByteString serializeTimeSeriesBlock(TimeSeriesBlock 
timeSeriesBlock)
+      throws IOException {
+    TimeBuckets timeBuckets = 
Objects.requireNonNull(timeSeriesBlock.getTimeBuckets());
+    List<Object[]> container = new ArrayList<>();
+    DataSchema dataSchema = generateDataSchema(timeSeriesBlock);
+    container.add(timeBucketsToRow(timeBuckets, dataSchema));
+    for (var entry : timeSeriesBlock.getSeriesMap().entrySet()) {
+      for (TimeSeries timeSeries : entry.getValue()) {
+        container.add(timeSeriesToRow(timeSeries, dataSchema));
+      }
+    }
+    TransferableBlock transferableBlock = new TransferableBlock(container, 
dataSchema, DataBlock.Type.ROW);
+    return DataBlockUtils.toByteString(transferableBlock.getDataBlock());
+  }
+
+  private static DataSchema generateDataSchema(TimeSeriesBlock 
timeSeriesBlock) {
+    TimeSeries sampledTimeSeries = 
sampleTimeSeries(timeSeriesBlock).orElse(null);
+    int numTags = sampledTimeSeries == null ? 0 : 
sampledTimeSeries.getTagNames().size();
+    ColumnDataType[] dataTypes = new ColumnDataType[numTags + 1];
+    String[] columnNames = new String[numTags + 1];
+    for (int tagIndex = 0; tagIndex < numTags; tagIndex++) {
+      columnNames[tagIndex] = sampledTimeSeries.getTagNames().get(tagIndex);
+      dataTypes[tagIndex] = ColumnDataType.STRING;
+    }
+    columnNames[numTags] = "__ts_values";
+    dataTypes[numTags] = ColumnDataType.DOUBLE_ARRAY;
+    return new DataSchema(columnNames, dataTypes);
+  }
+
+  private static List<String> generateTagNames(DataSchema dataSchema) {
+    String[] columnNames = dataSchema.getColumnNames();
+    List<String> tagNames = new ArrayList<>(columnNames.length - 1);
+    for (int index = 0; index < columnNames.length - 1; index++) {
+      tagNames.add(columnNames[index]);
+    }
+    return tagNames;
+  }
+
+  private static Optional<TimeSeries> sampleTimeSeries(TimeSeriesBlock 
timeSeriesBlock) {
+    if (timeSeriesBlock.getSeriesMap().isEmpty()) {
+      return Optional.empty();
+    }
+    List<TimeSeries> timeSeriesList = 
timeSeriesBlock.getSeriesMap().values().iterator().next();
+    Preconditions.checkState(!timeSeriesList.isEmpty(), "Found empty 
time-series list");
+    return Optional.of(timeSeriesList.get(0));
+  }
+
+  private static Object[] timeBucketsToRow(TimeBuckets timeBuckets, DataSchema 
dataSchema) {
+    int numColumns = dataSchema.getColumnNames().length;
+    Object[] result = new Object[numColumns];
+    for (int index = 0; index < numColumns - 1; index++) {
+      result[index] = "null";
+    }
+    double firstBucketValue = timeBuckets.getTimeBuckets()[0];
+    double bucketSizeSeconds = timeBuckets.getBucketSize().getSeconds();
+    double numBuckets = timeBuckets.getNumBuckets();
+    result[numColumns - 1] = new double[]{firstBucketValue, bucketSizeSeconds, 
numBuckets};
+    return result;
+  }
+
+  private static TimeBuckets timeBucketsFromRow(Object[] row) {
+    double[] values = (double[]) row[row.length - 1];
+    long fbv = (long) values[0];
+    Duration window = Duration.ofSeconds((long) values[1]);
+    int numBuckets = (int) values[2];
+    return TimeBuckets.ofSeconds(fbv, window, numBuckets);
+  }
+
+  private static Object[] timeSeriesToRow(TimeSeries timeSeries, DataSchema 
dataSchema) {
+    int numColumns = dataSchema.getColumnNames().length;
+    Object[] result = new Object[numColumns];
+    for (int index = 0; index < numColumns - 1; index++) {
+      Object tagValue = timeSeries.getTagValues()[index];
+      result[index] = tagValue == null ? "null" : tagValue.toString();
+    }
+    result[numColumns - 1] = unboxDoubleArray(timeSeries.getValues());
+    return result;
+  }
+
+  private static TimeSeries timeSeriesFromRow(List<String> tagNames, Object[] 
row, TimeBuckets timeBuckets) {
+    Double[] values = boxDoubleArray((double[]) row[row.length - 1]);
+    Object[] tagValues = new Object[row.length - 1];
+    System.arraycopy(row, 0, tagValues, 0, row.length - 1);
+    return new TimeSeries(Long.toString(TimeSeries.hash(tagValues)), null, 
timeBuckets, values, tagNames, tagValues);
+  }
+
+  private static double[] unboxDoubleArray(Double[] values) {
+    double[] result = new double[values.length];
+    for (int index = 0; index < result.length; index++) {
+      result[index] = values[index] == null ? NULL_PLACEHOLDER : values[index];
+    }
+    return result;
+  }
+
+  private static Double[] boxDoubleArray(double[] values) {
+    Double[] result = new Double[values.length];
+    for (int index = 0; index < result.length; index++) {
+      result[index] = values[index] == NULL_PLACEHOLDER ? null : values[index];
+    }
+    return result;
+  }
+}
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
new file mode 100644
index 0000000000..c9fd929333
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesExchangeReceiveOperatorTest {
+  private static final int NUM_SERVERS_QUERIED = 3;
+  private static final AggInfo SUM_AGG_INFO = new AggInfo("SUM", null);
+  private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, 
Duration.ofSeconds(200), 4);
+  private static final List<String> TAG_NAMES = ImmutableList.of("city", 
"zip");
+  private static final Object[] CHICAGO_SERIES_VALUES = new 
Object[]{"Chicago", "60605"};
+  private static final Object[] SF_SERIES_VALUES = new Object[]{"San 
Francisco", "94107"};
+  private static final Long CHICAGO_SERIES_HASH = 
TimeSeries.hash(CHICAGO_SERIES_VALUES);
+  private static final Long SF_SERIES_HASH = TimeSeries.hash(SF_SERIES_VALUES);
+  private static final SimpleTimeSeriesBuilderFactory SERIES_BUILDER_FACTORY = 
new SimpleTimeSeriesBuilderFactory();
+
+  @Test
+  public void testGetNextBlockWithAggregation() {
+    // Setup test
+    long deadlineMs = Long.MAX_VALUE;
+    ArrayBlockingQueue<Object> blockingQueue = new 
ArrayBlockingQueue<>(NUM_SERVERS_QUERIED);
+    blockingQueue.addAll(generateTimeSeriesBlocks());
+    TimeSeriesExchangeReceiveOperator operator = new 
TimeSeriesExchangeReceiveOperator(blockingQueue, deadlineMs,
+        NUM_SERVERS_QUERIED, SUM_AGG_INFO, SERIES_BUILDER_FACTORY);
+    // Run test
+    TimeSeriesBlock block = operator.nextBlock();
+    // Validate results
+    assertEquals(block.getSeriesMap().size(), 2);
+    assertTrue(block.getSeriesMap().containsKey(CHICAGO_SERIES_HASH), "Chicago 
series not present in received block");
+    assertTrue(block.getSeriesMap().containsKey(SF_SERIES_HASH), "SF series 
not present in received block");
+    assertEquals(block.getSeriesMap().get(CHICAGO_SERIES_HASH).size(), 1, 
"Expected 1 series for Chicago");
+    assertEquals(block.getSeriesMap().get(SF_SERIES_HASH).size(), 1, "Expected 
1 series for SF");
+    // Ensure Chicago had series addition performed
+    Double[] chicagoSeriesValues = 
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getValues();
+    assertEquals(chicagoSeriesValues, new Double[]{20.0, 20.0, 20.0, 20.0});
+    // Ensure SF had input series unmodified
+    Double[] sanFranciscoSeriesValues = 
block.getSeriesMap().get(SF_SERIES_HASH).get(0).getValues();
+    assertEquals(sanFranciscoSeriesValues, new Double[]{10.0, 10.0, 10.0, 
10.0});
+  }
+
+  @Test
+  public void testGetNextBlockNoAggregation() {
+    // Setup test
+    long deadlineMs = Long.MAX_VALUE;
+    ArrayBlockingQueue<Object> blockingQueue = new 
ArrayBlockingQueue<>(NUM_SERVERS_QUERIED);
+    blockingQueue.addAll(generateTimeSeriesBlocks());
+    TimeSeriesExchangeReceiveOperator operator = new 
TimeSeriesExchangeReceiveOperator(blockingQueue, deadlineMs,
+        NUM_SERVERS_QUERIED, null, SERIES_BUILDER_FACTORY);
+    // Run test
+    TimeSeriesBlock block = operator.nextBlock();
+    // Validate results
+    assertEquals(block.getSeriesMap().size(), 2);
+    assertTrue(block.getSeriesMap().containsKey(CHICAGO_SERIES_HASH), "Chicago 
series not present in received block");
+    assertTrue(block.getSeriesMap().containsKey(SF_SERIES_HASH), "SF series 
not present in received block");
+    assertEquals(block.getSeriesMap().get(CHICAGO_SERIES_HASH).size(), 2, 
"Expected 2 series for Chicago");
+    assertEquals(block.getSeriesMap().get(SF_SERIES_HASH).size(), 1, "Expected 
1 series for SF");
+    // Ensure Chicago has unmodified series values
+    Double[] firstChicagoSeriesValues = 
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getValues();
+    Double[] secondChicagoSeriesValues = 
block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(1).getValues();
+    assertEquals(firstChicagoSeriesValues, new Double[]{10.0, 10.0, 10.0, 
10.0});
+    assertEquals(secondChicagoSeriesValues, new Double[]{10.0, 10.0, 10.0, 
10.0});
+    // Ensure SF has input unmodified series values
+    Double[] sanFranciscoSeriesValues = 
block.getSeriesMap().get(SF_SERIES_HASH).get(0).getValues();
+    assertEquals(sanFranciscoSeriesValues, new Double[]{10.0, 10.0, 10.0, 
10.0});
+  }
+
+  @Test
+  public void testGetNextBlockFailure() {
+    // Setup test
+    long deadlineMs = Long.MAX_VALUE;
+    ArrayBlockingQueue<Object> blockingQueue = new 
ArrayBlockingQueue<>(NUM_SERVERS_QUERIED);
+    blockingQueue.add(new TimeoutException("Test error"));
+    // Run test with aggregation
+    try {
+      TimeSeriesExchangeReceiveOperator operator = new 
TimeSeriesExchangeReceiveOperator(blockingQueue, deadlineMs,
+          NUM_SERVERS_QUERIED, SUM_AGG_INFO, SERIES_BUILDER_FACTORY);
+      TimeSeriesBlock block = operator.nextBlock();
+      fail();
+    } catch (Throwable t) {
+      assertEquals(t.getMessage(), "Test error");
+    }
+    blockingQueue.add(new TimeoutException("Test error"));
+    try {
+      TimeSeriesExchangeReceiveOperator operator = new 
TimeSeriesExchangeReceiveOperator(blockingQueue, deadlineMs,
+          NUM_SERVERS_QUERIED, null, SERIES_BUILDER_FACTORY);
+      TimeSeriesBlock block = operator.nextBlock();
+      fail();
+    } catch (Throwable t) {
+      assertEquals(t.getMessage(), "Test error");
+    }
+  }
+
+  private List<TimeSeriesBlock> generateTimeSeriesBlocks() {
+    List<TimeSeriesBlock> seriesBlocks = new ArrayList<>();
+    {
+      Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+      seriesMap.put(CHICAGO_SERIES_HASH, 
ImmutableList.of(createChicagoSeries(new Double[]{10.0, 10.0, 10.0, 10.0})));
+      seriesMap.put(SF_SERIES_HASH, 
ImmutableList.of(createSanFranciscoSeries(new Double[]{10.0, 10.0, 10.0, 
10.0})));
+      seriesBlocks.add(new TimeSeriesBlock(TIME_BUCKETS, seriesMap));
+    }
+    {
+      Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+      seriesMap.put(CHICAGO_SERIES_HASH, 
ImmutableList.of(createChicagoSeries(new Double[]{10.0, 10.0, 10.0, 10.0})));
+      seriesBlocks.add(new TimeSeriesBlock(TIME_BUCKETS, seriesMap));
+    }
+    {
+      Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+      seriesBlocks.add(new TimeSeriesBlock(TIME_BUCKETS, seriesMap));
+    }
+    // Shuffle the output to test multiple scenarios over time
+    Collections.shuffle(seriesBlocks);
+    return seriesBlocks;
+  }
+
+  private TimeSeries createChicagoSeries(Double[] values) {
+    return new TimeSeries(CHICAGO_SERIES_HASH.toString(), null, TIME_BUCKETS, 
values, TAG_NAMES, CHICAGO_SERIES_VALUES);
+  }
+
+  private TimeSeries createSanFranciscoSeries(Double[] values) {
+    return new TimeSeries(SF_SERIES_HASH.toString(), null, TIME_BUCKETS, 
values, TAG_NAMES, SF_SERIES_VALUES);
+  }
+}
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
new file mode 100644
index 0000000000..f08d39ca0a
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries.serde;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesBlockSerdeTest {
+  private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, 
Duration.ofSeconds(200), 5);
+
+  @Test
+  public void testSerde()
+      throws IOException {
+    // To test serde of TimeSeriesBlock, we do the following:
+    // 1. Serialize the time-series block (say Block-1) to get ByteString-1
+    // 2. Deserialize ByteString-1 to get Block-2.
+    // 3. Serialize Block-2 to get ByteString-2.
+    // 4. Compare ByteString-1 and ByteString-2.
+    // 5. Compare values of Block-1 and Block-2.
+    List<TimeSeriesBlock> blocks = List.of(buildBlockWithNoTags(), 
buildBlockWithSingleTag(),
+        buildBlockWithMultipleTags());
+    for (TimeSeriesBlock block1 : blocks) {
+      // Serialize, deserialize and serialize again
+      ByteString byteString1 = 
TimeSeriesBlockSerde.serializeTimeSeriesBlock(block1);
+      String serializedBlockString1 = byteString1.toStringUtf8();
+      TimeSeriesBlock block2 = 
TimeSeriesBlockSerde.deserializeTimeSeriesBlock(byteString1.asReadOnlyByteBuffer());
+      String serializedBlockString2 = 
TimeSeriesBlockSerde.serializeTimeSeriesBlock(block2).toStringUtf8();
+      // Serialized blocks in both cases should be the same since 
serialization is deterministic.
+      assertEquals(serializedBlockString1, serializedBlockString2);
+      // Compare block1 and block2
+      compareBlocks(block1, block2);
+    }
+  }
+
+  /**
+   * Compares time series blocks in a way which makes it easy to debug test 
failures when/if they happen in CI.
+   */
+  private static void compareBlocks(TimeSeriesBlock block1, TimeSeriesBlock 
block2) {
+    assertEquals(block1.getTimeBuckets(), block2.getTimeBuckets(), "Time 
buckets are different across blocks");
+    assertEquals(block1.getSeriesMap().size(), block2.getSeriesMap().size(), 
String.format(
+        "Different number of series in blocks: %s and %s", 
block1.getSeriesMap().size(), block2.getSeriesMap().size()));
+    assertEquals(block1.getSeriesMap().keySet(), 
block2.getSeriesMap().keySet(),
+        String.format("Series blocks have different keys: %s vs %s",
+            block1.getSeriesMap().keySet(), block2.getSeriesMap().keySet()));
+    for (long seriesHash : block1.getSeriesMap().keySet()) {
+      List<TimeSeries> seriesList1 = block1.getSeriesMap().get(seriesHash);
+      List<TimeSeries> seriesList2 = block2.getSeriesMap().get(seriesHash);
+      compareTimeSeries(seriesList1, seriesList2);
+    }
+  }
+
+  private static void compareTimeSeries(List<TimeSeries> series1, 
List<TimeSeries> series2) {
+    assertEquals(series1.size(), series2.size(),
+        String.format("Different count of series with the same id: %s vs %s", 
series1.size(), series2.size()));
+    for (int index = 0; index < series1.size(); index++) {
+      TimeSeries seriesOne = series1.get(index);
+      TimeSeries seriesTwo = series2.get(index);
+      assertEquals(seriesOne.getTagNames(), seriesTwo.getTagNames());
+      assertEquals(seriesOne.getValues(), seriesTwo .getValues());
+    }
+  }
+
+  private static TimeSeriesBlock buildBlockWithNoTags() {
+    TimeBuckets timeBuckets = TIME_BUCKETS;
+    // Single series: []
+    List<String> tagNames = Collections.emptyList();
+    Object[] seriesValues = new Object[0];
+    long seriesHash = TimeSeries.hash(seriesValues);
+    Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+    seriesMap.put(seriesHash, ImmutableList.of(new 
TimeSeries(Long.toString(seriesHash), null, timeBuckets,
+        new Double[]{null, 123.0, 0.0, 1.0}, tagNames, seriesValues)));
+    return new TimeSeriesBlock(timeBuckets, seriesMap);
+  }
+
+  private static TimeSeriesBlock buildBlockWithSingleTag() {
+    TimeBuckets timeBuckets = TIME_BUCKETS;
+    // Series are: [cityId=Chicago] and [cityId=San Francisco]
+    List<String> tagNames = ImmutableList.of("cityId");
+    Object[] seriesOneValues = new Object[]{"Chicago"};
+    Object[] seriesTwoValues = new Object[]{"San Francisco"};
+    long seriesOneHash = TimeSeries.hash(seriesOneValues);
+    long seriesTwoHash = TimeSeries.hash(seriesTwoValues);
+    Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+    seriesMap.put(seriesOneHash, ImmutableList.of(new 
TimeSeries(Long.toString(seriesOneHash), null, timeBuckets,
+        new Double[]{null, 123.0, 0.0, 1.0}, tagNames, seriesOneValues)));
+    seriesMap.put(seriesTwoHash, ImmutableList.of(new 
TimeSeries(Long.toString(seriesTwoHash), null, timeBuckets,
+        new Double[]{null, null, null, null}, tagNames, seriesTwoValues)));
+    return new TimeSeriesBlock(timeBuckets, seriesMap);
+  }
+
+  private static TimeSeriesBlock buildBlockWithMultipleTags() {
+    TimeBuckets timeBuckets = TIME_BUCKETS;
+    // Series are: [cityId=Chicago, zip=60605] and [cityId=San Francisco, 
zip=94107]
+    List<String> tagNames = ImmutableList.of("cityId", "zip");
+    Object[] seriesOneValues = new Object[]{"Chicago", "60605"};
+    Object[] seriesTwoValues = new Object[]{"San Francisco", "94107"};
+    long seriesOneHash = TimeSeries.hash(seriesOneValues);
+    long seriesTwoHash = TimeSeries.hash(seriesTwoValues);
+    Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+    seriesMap.put(seriesOneHash, ImmutableList.of(new 
TimeSeries(Long.toString(seriesOneHash), null, timeBuckets,
+        new Double[]{null, 123.0, Double.NaN, 1.0}, tagNames, 
seriesOneValues)));
+    seriesMap.put(seriesTwoHash, ImmutableList.of(new 
TimeSeries(Long.toString(seriesTwoHash), null, timeBuckets,
+        new Double[]{Double.NaN, -1.0, -1231231.0, 3.14}, tagNames, 
seriesTwoValues)));
+    return new TimeSeriesBlock(timeBuckets, seriesMap);
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesExchangeNode.java
 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesExchangeNode.java
new file mode 100644
index 0000000000..399a2bd28d
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesExchangeNode.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.planner;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
+
+
+/**
+ * This node exists in the logical plan, but not in the physical/dispatchable 
plans. Similar to the
+ * {@link LeafTimeSeriesPlanNode}, a physical plan visitor will convert to its 
equivalent physical plan node, which will
+ * be capable of returning an executable operator with the {@link #run()} 
method.
+ * <br />
+ * <b>Note:</b> This node doesn't exist in the pinot-timeseries-spi because we 
don't want to let language developers
+ *   control how and when exchange will be run (as of now).
+ */
+public class TimeSeriesExchangeNode extends BaseTimeSeriesPlanNode {
+  @Nullable
+  private final AggInfo _aggInfo;
+
+  @JsonCreator
+  public TimeSeriesExchangeNode(@JsonProperty("id") String id,
+      @JsonProperty("inputs") List<BaseTimeSeriesPlanNode> inputs,
+      @Nullable @JsonProperty("aggInfo") AggInfo aggInfo) {
+    super(id, inputs);
+    _aggInfo = aggInfo;
+  }
+
+  @Nullable
+  public AggInfo getAggInfo() {
+    return _aggInfo;
+  }
+
+  @Override
+  public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode> 
newInputs) {
+    return new TimeSeriesExchangeNode(_id, newInputs, _aggInfo);
+  }
+
+  @Override
+  public String getKlass() {
+    return TimeSeriesExchangeNode.class.getName();
+  }
+
+  @Override
+  public String getExplainName() {
+    return "TIME_SERIES_BROKER_RECEIVE";
+  }
+
+  @Override
+  public BaseTimeSeriesOperator run() {
+    throw new IllegalStateException("Time Series Exchange should have been 
replaced with a physical plan node");
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
new file mode 100644
index 0000000000..46a3f68c31
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.planner;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
+
+
+/**
+ * Fragments the plan into executable units. Since we only support 
Broker-Reduce for Time Series Queries at present,
+ * we will have 1 fragment for the broker, and 1 fragment for each {@link 
LeafTimeSeriesPlanNode}.
+ * <p>
+ * As an example, say we have the following plan:
+ *   <pre>
+ *                            +------------+
+ *                            | Node-1     |
+ *                            +------------+
+ *                          /              \
+ *                   +------------+     +------------+
+ *                   | Node-2     |     | Leaf-2     |
+ *                   +------------+     +------------+
+ *                     /
+ *            +------------+
+ *            | Leaf-1     |
+ *            +------------+
+ *   </pre>
+ *   The plan fragmenter will convert this to:
+ *   <pre>
+ *     This is fragment-1, aka the Broker's plan fragment:
+ *
+ *                            +------------+
+ *                            | Node-1     |
+ *                            +------------+
+ *                          /              \
+ *                   +------------+     +------------+
+ *                   | Node-2     |     | Exchange   |
+ *                   +------------+     +------------+
+ *                     /
+ *            +------------+
+ *            | Exchange   |
+ *            +------------+
+ *   </pre>
+ *   <pre>
+ *     This is fragment-2:
+ *            +------------+
+ *            | Leaf-1     |
+ *            +------------+
+ *   </pre>
+ *   <pre>
+ *     This is fragment-3:
+ *            +------------+
+ *            | Leaf-2     |
+ *            +------------+
+ *   </pre>
+ * </p>
+ */
+public class TimeSeriesPlanFragmenter {
+  private TimeSeriesPlanFragmenter() {
+  }
+
+  /**
+   * Fragments the plan as described in {@link TimeSeriesPlanFragmenter}. The 
first element of the list is the broker
+   * fragment, and the other elements are the server fragments. For 
single-node queries, this pushes down the entire
+   * plan to the servers.
+   * <p>
+   *   <b>Note:</b> This method may return cloned plan nodes, so you should 
use them as the plan subsequently.
+   * </p>
+   */
+  public static List<BaseTimeSeriesPlanNode> 
getFragments(BaseTimeSeriesPlanNode rootNode,
+      boolean isSingleServerQuery) {
+    List<BaseTimeSeriesPlanNode> result = new ArrayList<>();
+    Context context = new Context();
+    if (isSingleServerQuery) {
+      final String id = rootNode.getId();
+      return ImmutableList.of(new TimeSeriesExchangeNode(id, 
Collections.emptyList(), null), rootNode);
+    }
+    result.add(fragmentRecursively(rootNode, context));
+    result.addAll(context._fragments);
+    return result;
+  }
+
+  private static BaseTimeSeriesPlanNode 
fragmentRecursively(BaseTimeSeriesPlanNode planNode, Context context) {
+    if (planNode instanceof LeafTimeSeriesPlanNode) {
+      LeafTimeSeriesPlanNode leafNode = (LeafTimeSeriesPlanNode) planNode;
+      context._fragments.add(leafNode.withInputs(Collections.emptyList()));
+      return new TimeSeriesExchangeNode(planNode.getId(), 
Collections.emptyList(), leafNode.getAggInfo());
+    }
+    List<BaseTimeSeriesPlanNode> newInputs = new ArrayList<>();
+    for (BaseTimeSeriesPlanNode input : planNode.getInputs()) {
+      newInputs.add(fragmentRecursively(input, context));
+    }
+    return planNode.withInputs(newInputs);
+  }
+
+  private static class Context {
+    private final List<BaseTimeSeriesPlanNode> _fragments = new ArrayList<>();
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
 
b/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
new file mode 100644
index 0000000000..8727f64ddc
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.planner;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesPlanFragmenterTest {
+  @Test
+  public void testGetFragmentsWithMultipleLeafNodes() {
+    /*
+     * Create Input:
+     *         Node-1
+     *        /     \
+     *      Node-2  Leaf-2
+     *       /
+     *     Leaf-1
+     * Expected Outputs:
+     *   Fragment-1:
+     *         Node-1
+     *        /     \
+     *      Node-2  Exchange (named Leaf-2)
+     *       /
+     *     Exchange (named Leaf-1)
+     *   Fragment-2:
+     *         Leaf-1
+     *   Fragment-3:
+     *         Leaf-2
+     */
+    LeafTimeSeriesPlanNode leafOne = createMockLeafNode("Leaf-1");
+    LeafTimeSeriesPlanNode leafTwo = createMockLeafNode("Leaf-2");
+    BaseTimeSeriesPlanNode nodeTwo = new MockTimeSeriesPlanNode("Node-2", 
Collections.singletonList(leafOne));
+    BaseTimeSeriesPlanNode nodeOne = new MockTimeSeriesPlanNode("Node-1", 
ImmutableList.of(nodeTwo, leafTwo));
+    List<BaseTimeSeriesPlanNode> fragments = 
TimeSeriesPlanFragmenter.getFragments(nodeOne, false);
+    // Test whether correct number of fragments generated
+    assertEquals(fragments.size(), 3);
+    // Test whether fragment roots are correct
+    assertEquals(fragments.get(0).getId(), "Node-1");
+    assertEquals(fragments.get(1).getId(), "Leaf-1");
+    assertEquals(fragments.get(2).getId(), "Leaf-2");
+    // Test whether broker fragment has the right inputs
+    {
+      BaseTimeSeriesPlanNode brokerFragment = fragments.get(0);
+      assertEquals(brokerFragment.getInputs().size(), 2);
+      // Left and right inputs should have IDs Node-2 and Leaf-2.
+      BaseTimeSeriesPlanNode leftInput = brokerFragment.getInputs().get(0);
+      BaseTimeSeriesPlanNode rightInput = brokerFragment.getInputs().get(1);
+      assertEquals(leftInput.getId(), "Node-2");
+      assertEquals(rightInput.getId(), "Leaf-2");
+      // Right input should be exchange
+      assertTrue(rightInput instanceof TimeSeriesExchangeNode, "Node should 
have been replaced by Exchange");
+      // Input for Left input should be exchange
+      assertEquals(leftInput.getInputs().size(), 1);
+      assertEquals(leftInput.getInputs().get(0).getId(), "Leaf-1");
+      assertTrue(leftInput.getInputs().get(0) instanceof 
TimeSeriesExchangeNode);
+    }
+    // Test the other two fragments
+    assertTrue(fragments.get(1) instanceof LeafTimeSeriesPlanNode, "Expected 
leaf node in fragment");
+    assertTrue(fragments.get(2) instanceof LeafTimeSeriesPlanNode, "Expected 
leaf node in fragment");
+  }
+
+  @Test
+  public void testGetFragmentsForSingleServerQuery() {
+    /*
+     * Create Input:
+     *         Node-1
+     *        /     \
+     *      Node-2  Leaf-2
+     *       /
+     *     Leaf-1
+     * Expected Outputs:
+     *   Fragment-1:
+     *         Node-1
+     *        /     \
+     *      Node-2  Exchange (named Leaf-2)
+     *       /
+     *     Exchange (named Leaf-1)
+     *   Fragment-2:
+     *         Leaf-1
+     *   Fragment-3:
+     *         Leaf-2
+     */
+    LeafTimeSeriesPlanNode leafOne = createMockLeafNode("Leaf-1");
+    LeafTimeSeriesPlanNode leafTwo = createMockLeafNode("Leaf-2");
+    BaseTimeSeriesPlanNode nodeTwo = new MockTimeSeriesPlanNode("Node-2", 
Collections.singletonList(leafOne));
+    BaseTimeSeriesPlanNode nodeOne = new MockTimeSeriesPlanNode("Node-1", 
ImmutableList.of(nodeTwo, leafTwo));
+    List<BaseTimeSeriesPlanNode> fragments = 
TimeSeriesPlanFragmenter.getFragments(nodeOne, true);
+    assertEquals(fragments.size(), 2, "Expect only 2 fragments for 
single-server query");
+    assertEquals(fragments.get(0).getId(), "Node-1");
+    assertEquals(fragments.get(1), nodeOne);
+  }
+
+  @Test
+  public void testGetFragmentsWithSinglePlanNode() {
+    /*
+     * Create Input:
+     *         Leaf-1
+     * Expected Outputs:
+     *   Fragment-1:
+     *       Exchange (named Leaf-1)
+     *   Fragment-2:
+     *         Leaf-1
+     */
+    LeafTimeSeriesPlanNode leafOne = createMockLeafNode("Leaf-1");
+    List<BaseTimeSeriesPlanNode> fragments = 
TimeSeriesPlanFragmenter.getFragments(leafOne, false);
+    assertEquals(fragments.size(), 2);
+    assertTrue(fragments.get(0) instanceof TimeSeriesExchangeNode);
+    assertTrue(fragments.get(1) instanceof LeafTimeSeriesPlanNode);
+    assertEquals(fragments.get(0).getId(), fragments.get(1).getId());
+  }
+
+  private LeafTimeSeriesPlanNode createMockLeafNode(String id) {
+    return new LeafTimeSeriesPlanNode(id, Collections.emptyList(), 
"someTableName", "someTimeColumn",
+        TimeUnit.SECONDS, 0L, "", "", null, Collections.emptyList());
+  }
+
+  static class MockTimeSeriesPlanNode extends BaseTimeSeriesPlanNode {
+    public MockTimeSeriesPlanNode(String id, List<BaseTimeSeriesPlanNode> 
inputs) {
+      super(id, inputs);
+    }
+
+    @Override
+    public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode> 
newInputs) {
+      return new MockTimeSeriesPlanNode(_id, newInputs);
+    }
+
+    @Override
+    public String getKlass() {
+      return "";
+    }
+
+    @Override
+    public String getExplainName() {
+      return "";
+    }
+
+    @Override
+    public BaseTimeSeriesOperator run() {
+      return null;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to