This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b03b5e434f [multistage] Fix data block transfer semantics and plan 
node data schema usage. (#9064)
b03b5e434f is described below

commit b03b5e434f8eda78cbca3007d285453f1d48e1c4
Author: Rong Rong <[email protected]>
AuthorDate: Sat Jul 23 11:21:43 2022 -0700

    [multistage] Fix data block transfer semantics and plan node data schema 
usage. (#9064)
    
    * fix data schema usage in stage to operator
    
    * adding error handling to all block transfer
    
    * carve out test files
    
    * standardized channel metadata
    
    * standardized block transfer
    
    - always transfer data block then end-of-stream empty block (with or w/o 
error)
    - processing finishes when end-of-stream block is reached
      - if end-of-stream is error, propagate
      - if end-of-stream is no-error, send normal data block then another new 
end-of-stream with proper schema
    - error gets propagate all the way through end
    - do random selection and singleton differently
    
    * fix error block bubbling issue
    
    * address diff comments
    
    - remove empty block util, replace with end-of-stream
    - remove schema in error block
    - no longer repackage error block
    - upstream error block are stored in intermediate stage so the next time
      getBlock() is called it will return error
      - this should also log a warning b/c once error block is sent, there
        should be no more upstream call to getBlock()
    
    Co-authored-by: Rong Rong <[email protected]>
---
 .../MultiStageBrokerRequestHandler.java            |   2 +-
 .../pinot/core/common/datablock/BaseDataBlock.java |   4 +-
 .../core/common/datablock/DataBlockUtils.java      |  32 +++---
 .../pinot/core/common/datablock/MetadataBlock.java |   4 +
 .../pinot/core/common/datablock/DataBlockTest.java |   3 +-
 .../apache/pinot/query/QueryEnvironmentTest.java   |  20 ++++
 .../pinot/query/QueryEnvironmentTestBase.java      |   4 -
 .../pinot/query/mailbox/channel/ChannelUtils.java  |  29 +++++
 .../channel/MailboxContentStreamObserver.java      |   6 +-
 .../channel/MailboxStatusStreamObserver.java       |   5 +-
 .../apache/pinot/query/runtime/QueryRunner.java    |  82 ++++++++++++--
 .../query/runtime/blocks/TransferableBlock.java    |  55 ++++++++-
 .../runtime/blocks/TransferableBlockUtils.java     |  17 +--
 .../runtime/executor/WorkerQueryExecutor.java      |  21 ++--
 .../query/runtime/operator/AggregateOperator.java  |  82 +++++++-------
 .../query/runtime/operator/HashJoinOperator.java   |  57 ++++++----
 .../runtime/operator/MailboxReceiveOperator.java   |  17 ++-
 .../runtime/operator/MailboxSendOperator.java      | 124 ++++++++++-----------
 .../query/runtime/operator/TransformOperator.java  |  41 +++----
 .../pinot/query/service/QueryDispatcher.java       |  16 ++-
 .../query/mailbox/GrpcMailboxServiceTest.java      |   9 +-
 .../query/runtime/QueryRunnerExceptionTest.java    |  77 +++++++++++++
 .../pinot/query/runtime/QueryRunnerTest.java       |  87 +--------------
 .../pinot/query/runtime/QueryRunnerTestBase.java   | 109 ++++++++++++++++++
 24 files changed, 600 insertions(+), 303 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index e992031288..d7b7a1c802 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -146,7 +146,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     try {
       queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, 
_mailboxService, DEFAULT_TIMEOUT_NANO);
     } catch (Exception e) {
-      LOGGER.info("query submission failed", e);
+      LOGGER.info("query execution failed", e);
       return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
     }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
index 2af725dc6f..395bc49d05 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
@@ -102,11 +102,11 @@ public abstract class BaseDataBlock implements DataTable {
    * @param fixedSizeDataBytes byte[] for fix-sized columns.
    * @param variableSizeDataBytes byte[] for variable length columns (arrays).
    */
-  public BaseDataBlock(int numRows, DataSchema dataSchema, String[] 
stringDictionary,
+  public BaseDataBlock(int numRows, @Nullable DataSchema dataSchema, String[] 
stringDictionary,
       byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
     _numRows = numRows;
-    _numColumns = dataSchema.size();
     _dataSchema = dataSchema;
+    _numColumns = dataSchema == null ? 0 : dataSchema.size();
     _stringDictionary = stringDictionary;
     _fixedSizeDataBytes = fixedSizeDataBytes;
     _fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
index 1a26365210..d1d9185d11 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
@@ -21,11 +21,13 @@ package org.apache.pinot.core.common.datablock;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import javax.annotation.Nonnull;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 
 
 public final class DataBlockUtils {
@@ -34,29 +36,25 @@ public final class DataBlockUtils {
     // do not instantiate.
   }
 
-  private static final DataSchema EMPTY_SCHEMA = new DataSchema(new String[0], 
new DataSchema.ColumnDataType[0]);
-  private static final MetadataBlock EOS_DATA_BLOCK = new 
MetadataBlock(EMPTY_SCHEMA);
-  static {
-    EOS_DATA_BLOCK._metadata.put(DataTable.MetadataKey.TABLE.getName(), 
"END_OF_STREAM");
-  }
-
-  public static MetadataBlock getEndOfStreamDataBlock() {
-    return EOS_DATA_BLOCK;
-  }
-
   public static MetadataBlock getErrorDataBlock(Exception e) {
-    MetadataBlock errorBlock = new MetadataBlock(EMPTY_SCHEMA);
-    errorBlock._metadata.put(DataTable.MetadataKey.TABLE.getName(), "ERROR");
     if (e instanceof ProcessingException) {
-      errorBlock.addException(((ProcessingException) e).getErrorCode(), 
e.getMessage());
+      return getErrorDataBlock(Collections.singletonMap(((ProcessingException) 
e).getErrorCode(), e.getMessage()));
     } else {
-      errorBlock.addException(QueryException.UNKNOWN_ERROR_CODE, 
e.getMessage());
+      return 
getErrorDataBlock(Collections.singletonMap(QueryException.UNKNOWN_ERROR_CODE, 
e.getMessage()));
+    }
+  }
+
+  public static MetadataBlock getErrorDataBlock(Map<Integer, String> 
exceptions) {
+    MetadataBlock errorBlock = new MetadataBlock();
+    for (Map.Entry<Integer, String> exception : exceptions.entrySet()) {
+      errorBlock.addException(exception.getKey(), exception.getValue());
     }
     return errorBlock;
   }
 
-  public static MetadataBlock getEmptyDataBlock(DataSchema dataSchema) {
-    return dataSchema == null ? EOS_DATA_BLOCK : new MetadataBlock(dataSchema);
+  public static MetadataBlock getEndOfStreamDataBlock(@Nonnull DataSchema 
dataSchema) {
+    // TODO: add query statistics metadata for the block.
+    return new MetadataBlock(dataSchema);
   }
 
   public static BaseDataBlock getDataBlock(ByteBuffer byteBuffer)
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
index b100425e5a..d469ec0287 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
@@ -29,6 +29,10 @@ import org.apache.pinot.common.utils.DataSchema;
 public class MetadataBlock extends BaseDataBlock {
   private static final int VERSION = 1;
 
+  public MetadataBlock() {
+    super(0, null, new String[0], new byte[]{0}, new byte[]{0});
+  }
+
   public MetadataBlock(DataSchema dataSchema) {
     super(0, dataSchema, new String[0], new byte[]{0}, new byte[]{0});
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index 327fa241aa..f673d2433d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -48,8 +48,7 @@ public class DataBlockTest {
 
     BaseDataBlock dataBlock = 
DataBlockUtils.getErrorDataBlock(originalException);
     dataBlock.addException(processingException);
-    Assert.assertEquals(dataBlock.getDataSchema().getColumnNames().length, 0);
-    Assert.assertEquals(dataBlock.getDataSchema().getColumnDataTypes().length, 
0);
+    Assert.assertNull(dataBlock.getDataSchema());
     Assert.assertEquals(dataBlock.getNumberOfRows(), 0);
 
     // Assert processing exception and original exception both matches.
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
index 9f4778743b..143eabf0f0 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
@@ -55,6 +55,16 @@ public class QueryEnvironmentTest extends 
QueryEnvironmentTestBase {
     }
   }
 
+  @Test(dataProvider = "testQueryExceptionDataProvider")
+  public void testQueryWithException(String query, String exceptionSnippet) {
+    try {
+      _queryEnvironment.planQuery(query);
+      Assert.fail("query plan should throw exception");
+    } catch (RuntimeException e) {
+      Assert.assertTrue(e.getCause().getMessage().contains(exceptionSnippet));
+    }
+  }
+
   @Test
   public void testQueryAndAssertStageContentForJoin()
       throws Exception {
@@ -100,4 +110,14 @@ public class QueryEnvironmentTest extends 
QueryEnvironmentTestBase {
             "SELECT *\n" + "FROM `a`\n" + "INNER JOIN `b` ON `a`.`col1` = 
`b`.`col2`\n" + "WHERE `a`.`col3` >= 0"},
     };
   }
+
+  @DataProvider(name = "testQueryExceptionDataProvider")
+  private Object[][] provideQueriesWithException() {
+    return new Object[][] {
+        // wrong table is being used after JOIN
+        new Object[]{"SELECT b.col1 - a.col3 FROM a JOIN c ON a.col1 = 
c.col3", "Table 'b' not found"},
+        // non-agg column not being grouped
+        new Object[]{"SELECT a.col1, SUM(a.col3) FROM a", "'a.col1' is not 
being grouped"},
+    };
+  }
 }
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index a146f5ca3d..1c6ad7ff29 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -56,10 +56,6 @@ public class QueryEnvironmentTestBase {
             + " GROUP BY a.col1, a.col2"},
         new Object[]{"SELECT a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 = 
b.col2 "
             + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY 
a.col1"},
-        new Object[]{"SELECT a.col1, a.col3, b.col3 FROM a JOIN b ON 
MOD(a.col3, 2) = MOD(b.col3, 2)"},
-        new Object[]{"SELECT a.col1, a.col3, i.maxVal FROM a JOIN "
-            + "  (SELECT b.col1 AS joinKey, MAX(b.col3) AS maxVal FROM b GROUP 
BY b.col1) AS i "
-            + "  ON a.col1 = i.joinKey"},
     };
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
new file mode 100644
index 0000000000..1662047263
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+public class ChannelUtils {
+
+  public static final String MAILBOX_METADATA_BUFFER_SIZE_KEY = "buffer.size";
+  public static final String MAILBOX_METADATA_END_OF_STREAM_KEY = 
"end.of.stream";
+
+  private ChannelUtils() {
+    // do not instantiate.
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 8aeb560c8f..5b79549a27 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -91,9 +91,9 @@ public class MailboxContentStreamObserver implements 
StreamObserver<Mailbox.Mail
       int remainingCapacity = _receivingBuffer.remainingCapacity() - 1;
       Mailbox.MailboxStatus.Builder builder =
           
Mailbox.MailboxStatus.newBuilder().setMailboxId(mailboxContent.getMailboxId())
-              .putMetadata("buffer.size", String.valueOf(remainingCapacity));
-      if (mailboxContent.getMetadataMap().get("finished") != null) {
-        builder.putMetadata("finished", "true");
+              .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY, 
String.valueOf(remainingCapacity));
+      if 
(mailboxContent.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY)
 != null) {
+        builder.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, 
"true");
       }
       Mailbox.MailboxStatus status = builder.build();
       // returns the buffer available size to sender for rate controller / 
throttling.
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
index 45233952e2..e482583bd3 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
@@ -63,8 +63,9 @@ public class MailboxStatusStreamObserver implements 
StreamObserver<Mailbox.Mailb
     // when received a mailbox status from the receiving end, sending end 
update the known buffer size available
     // so we can make better throughput send judgement. here is a simple 
example.
     // TODO: this feedback info is not used to throttle the send speed. it is 
currently being discarded.
-    if (mailboxStatus.getMetadataMap().containsKey("buffer.size")) {
-      
_bufferSize.set(Integer.parseInt(mailboxStatus.getMetadataMap().get("buffer.size")));
+    if 
(mailboxStatus.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY))
 {
+      _bufferSize.set(Integer.parseInt(
+          
mailboxStatus.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY)));
     } else {
       _bufferSize.set(DEFAULT_MAILBOX_QUEUE_CAPACITY); // DEFAULT_AVAILABILITY;
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index cf56c5b786..2f7f23f264 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -18,17 +18,21 @@
  */
 package org.apache.pinot.query.runtime;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.common.datablock.BaseDataBlock;
 import org.apache.pinot.core.common.datablock.DataBlockUtils;
+import org.apache.pinot.core.common.datablock.MetadataBlock;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.transport.ServerInstance;
@@ -36,6 +40,7 @@ import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -101,25 +106,84 @@ public class QueryRunner {
       BaseDataBlock dataBlock;
       try {
         DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, 
executorService, null);
-        // this works because default DataTableImplV3 will have a version 
number at beginning,
-        // which maps to ROW type of version 3.
-        dataBlock = 
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
-      } catch (IOException e) {
-        throw new RuntimeException("Unable to convert byte buffer", e);
+        if (!dataTable.getExceptions().isEmpty()) {
+          // if contains exception, directly return a metadata block with the 
exceptions.
+          dataBlock = 
DataBlockUtils.getErrorDataBlock(dataTable.getExceptions());
+        } else {
+          // this works because default DataTableImplV3 will have a version 
number at beginning:
+          // the new DataBlock encodes lower 16 bits as version and upper 16 
bits as type (ROW, COLUMNAR, METADATA)
+          dataBlock = 
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
+        }
+      } catch (Exception e) {
+        dataBlock = DataBlockUtils.getErrorDataBlock(e);
       }
 
       MailboxSendNode sendNode = (MailboxSendNode) 
distributedStagePlan.getStageRoot();
       StageMetadata receivingStageMetadata = 
distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
       MailboxSendOperator mailboxSendOperator =
-          new MailboxSendOperator(_mailboxService, dataBlock, 
receivingStageMetadata.getServerInstances(),
-              sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), 
_hostname, _port,
-              serverQueryRequest.getRequestId(), sendNode.getStageId());
+          new MailboxSendOperator(_mailboxService, sendNode.getDataSchema(),
+              new LeafStageTransferableBlockOperator(dataBlock, 
sendNode.getDataSchema()),
+              receivingStageMetadata.getServerInstances(), 
sendNode.getExchangeType(),
+              sendNode.getPartitionKeySelector(), _hostname, _port, 
serverQueryRequest.getRequestId(),
+              sendNode.getStageId());
       mailboxSendOperator.nextBlock();
+      if (dataBlock.getExceptions().isEmpty()) {
+        mailboxSendOperator.nextBlock();
+      }
     } else {
       _workerExecutor.processQuery(distributedStagePlan, requestMetadataMap, 
executorService);
     }
   }
 
+  /**
+   * Leaf-stage transfer block opreator is used to wrap around the leaf stage 
process results. They are passed to the
+   * Pinot server to execute query thus only one {@link DataTable} were 
returned. However, to conform with the
+   * intermediate stage operators. an additional {@link MetadataBlock} needs 
to be transfer after the data block.
+   *
+   * <p>In order to achieve this:
+   * <ul>
+   *   <li>The leaf-stage result is split into data payload block and metadata 
payload block.</li>
+   *   <li>In case the leaf-stage result contains error or only metadata, we 
skip the data payload block.</li>
+   * </ul>
+   */
+  private static class LeafStageTransferableBlockOperator extends 
BaseOperator<TransferableBlock> {
+    private static final String EXPLAIN_NAME = "LEAF_STAGE_TRANSFER_OPERATOR";
+
+    private final MetadataBlock _endOfStreamBlock;
+    private final BaseDataBlock _baseDataBlock;
+    private final DataSchema _dataSchema;
+    private boolean _hasTransferred;
+
+    private LeafStageTransferableBlockOperator(BaseDataBlock baseDataBlock, 
DataSchema dataSchema) {
+      _baseDataBlock = baseDataBlock;
+      _dataSchema = dataSchema;
+      _endOfStreamBlock = baseDataBlock.getExceptions().isEmpty()
+          ? DataBlockUtils.getEndOfStreamDataBlock(dataSchema) : null;
+      _hasTransferred = false;
+    }
+
+    @Override
+    public List<Operator> getChildOperators() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public String toExplainString() {
+      return EXPLAIN_NAME;
+    }
+
+    @Override
+    protected TransferableBlock getNextBlock() {
+      if (!_hasTransferred) {
+        _hasTransferred = true;
+        return new TransferableBlock(_baseDataBlock);
+      } else {
+        return new TransferableBlock(_endOfStreamBlock);
+      }
+    }
+  }
+
   private boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
     int stageId = distributedStagePlan.getStageId();
     ServerInstance serverInstance = distributedStagePlan.getServerInstance();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 79470fed8c..775a5de6ef 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.runtime.blocks;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.List;
 import org.apache.pinot.common.utils.DataSchema;
@@ -41,15 +42,22 @@ public class TransferableBlock implements Block {
 
   private final BaseDataBlock.Type _type;
   private final DataSchema _dataSchema;
+  private final boolean _isErrorBlock;
 
   private BaseDataBlock _dataBlock;
-
   private List<Object[]> _container;
 
   public TransferableBlock(List<Object[]> container, DataSchema dataSchema, 
BaseDataBlock.Type containerType) {
+    this(container, dataSchema, containerType, false);
+  }
+
+  @VisibleForTesting
+  TransferableBlock(List<Object[]> container, DataSchema dataSchema, 
BaseDataBlock.Type containerType,
+      boolean isErrorBlock) {
     _container = container;
     _dataSchema = dataSchema;
     _type = containerType;
+    _isErrorBlock = isErrorBlock;
   }
 
   public TransferableBlock(BaseDataBlock dataBlock) {
@@ -57,12 +65,20 @@ public class TransferableBlock implements Block {
     _dataSchema = dataBlock.getDataSchema();
     _type = dataBlock instanceof ColumnarDataBlock ? 
BaseDataBlock.Type.COLUMNAR
         : dataBlock instanceof RowDataBlock ? BaseDataBlock.Type.ROW : 
BaseDataBlock.Type.METADATA;
+    _isErrorBlock = !_dataBlock.getExceptions().isEmpty();
   }
 
   public DataSchema getDataSchema() {
     return _dataSchema;
   }
 
+  /**
+   * Retrieve the extracted {@link TransferableBlock#_container} of the 
transferable block.
+   * If not already constructed. It will use {@link DataBlockUtils} to extract 
the row/columnar data from the
+   * binary-packed format.
+   *
+   * @return data container.
+   */
   public List<Object[]> getContainer() {
     if (_container == null) {
       switch (_type) {
@@ -77,6 +93,13 @@ public class TransferableBlock implements Block {
     return _container;
   }
 
+  /**
+   * Retrieve the binary-packed version of the data block.
+   * If not already constructed. It will use {@link DataBlockBuilder} to 
construct the binary-packed format from
+   * the {@link TransferableBlock#_container}.
+   *
+   * @return data block.
+   */
   public BaseDataBlock getDataBlock() {
     if (_dataBlock == null) {
       try {
@@ -87,6 +110,8 @@ public class TransferableBlock implements Block {
           case COLUMNAR:
             _dataBlock = DataBlockBuilder.buildFromColumns(_container, null, 
_dataSchema);
             break;
+          case METADATA:
+            throw new UnsupportedOperationException("Metadata block cannot be 
constructed from container");
           default:
             throw new UnsupportedOperationException("Unable to build from 
container with type: " + _type);
         }
@@ -97,10 +122,38 @@ public class TransferableBlock implements Block {
     return _dataBlock;
   }
 
+  /**
+   * Return the type of block (one of ROW, COLUMNAR, or METADATA).
+   *
+   * @return return type of block
+   */
   public BaseDataBlock.Type getType() {
     return _type;
   }
 
+  /**
+   * Return whether a transferable block is at the end of a stream.
+   *
+   * <p>End of stream is different from data block with 0-rows. which can 
indicate that one partition of the execution
+   * returns no rows. but that doesn't mean the rest of the partitions are 
also finished.
+   * <p>When an exception is caught within a stream, no matter how many 
outstanding data is pending to be received,
+   * it is considered end of stream because the exception should bubble up 
immediately.
+   *
+   * @return whether this block is the end of stream.
+   */
+  public boolean isEndOfStreamBlock() {
+    return _type == BaseDataBlock.Type.METADATA;
+  }
+
+  /**
+   * Return whether a transferable block contains exception.
+   *
+   * @return true if contains exception.
+   */
+  public boolean isErrorBlock() {
+    return _isErrorBlock;
+  }
+
   public byte[] toBytes()
       throws IOException {
     return _dataBlock.toBytes();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
index f2992c41df..4900ec193f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
@@ -18,9 +18,8 @@
  */
 package org.apache.pinot.query.runtime.blocks;
 
+import java.util.Map;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datablock.BaseDataBlock;
 import org.apache.pinot.core.common.datablock.DataBlockUtils;
 
 
@@ -28,24 +27,20 @@ public final class TransferableBlockUtils {
   private TransferableBlockUtils() {
     // do not instantiate.
   }
-  private static final TransferableBlock EOS_TRANSFERABLE_BLOCK =
-      new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
 
-  public static TransferableBlock getEndOfStreamTransferableBlock() {
-    return EOS_TRANSFERABLE_BLOCK;
+  public static TransferableBlock getEndOfStreamTransferableBlock(DataSchema 
dataSchema) {
+    return new 
TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(dataSchema));
   }
 
   public static TransferableBlock getErrorTransferableBlock(Exception e) {
     return new TransferableBlock(DataBlockUtils.getErrorDataBlock(e));
   }
 
-  public static TransferableBlock getEmptyTransferableBlock(DataSchema 
dataSchema) {
-    return new TransferableBlock(DataBlockUtils.getEmptyDataBlock(dataSchema));
+  public static TransferableBlock getErrorTransferableBlock(Map<Integer, 
String> exceptions) {
+    return new TransferableBlock(DataBlockUtils.getErrorDataBlock(exceptions));
   }
 
   public static boolean isEndOfStream(TransferableBlock transferableBlock) {
-    return transferableBlock.getType().equals(BaseDataBlock.Type.METADATA)
-        && 
"END_OF_STREAM".equals(transferableBlock.getDataBlock().getMetadata()
-        .get(DataTable.MetadataKey.TABLE.getName()));
+    return transferableBlock.isEndOfStreamBlock();
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index afe8617830..9c978f0ad9 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -100,38 +100,39 @@ public class WorkerQueryExecutor {
   }
 
   // TODO: split this PhysicalPlanner into a separate module
+  // TODO: optimize this into a framework. (physical planner)
   private BaseOperator<TransferableBlock> getOperator(long requestId, 
StageNode stageNode,
       Map<Integer, StageMetadata> metadataMap) {
-    // TODO: optimize this into a framework. (physical planner)
     if (stageNode instanceof MailboxReceiveNode) {
       MailboxReceiveNode receiveNode = (MailboxReceiveNode) stageNode;
       List<ServerInstance> sendingInstances = 
metadataMap.get(receiveNode.getSenderStageId()).getServerInstances();
-      return new MailboxReceiveOperator(_mailboxService, 
RelDistribution.Type.ANY, sendingInstances, _hostName, _port,
-          requestId, receiveNode.getSenderStageId());
+      return new MailboxReceiveOperator(_mailboxService, 
receiveNode.getDataSchema(), RelDistribution.Type.ANY,
+          sendingInstances, _hostName, _port, requestId, 
receiveNode.getSenderStageId());
     } else if (stageNode instanceof MailboxSendNode) {
       MailboxSendNode sendNode = (MailboxSendNode) stageNode;
       BaseOperator<TransferableBlock> nextOperator = getOperator(requestId, 
sendNode.getInputs().get(0), metadataMap);
       StageMetadata receivingStageMetadata = 
metadataMap.get(sendNode.getReceiverStageId());
-      return new MailboxSendOperator(_mailboxService, nextOperator, 
receivingStageMetadata.getServerInstances(),
-          sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), 
_hostName, _port, requestId,
-          sendNode.getStageId());
+      return new MailboxSendOperator(_mailboxService, 
sendNode.getDataSchema(), nextOperator,
+          receivingStageMetadata.getServerInstances(), 
sendNode.getExchangeType(), sendNode.getPartitionKeySelector(),
+          _hostName, _port, requestId, sendNode.getStageId());
     } else if (stageNode instanceof JoinNode) {
       JoinNode joinNode = (JoinNode) stageNode;
       BaseOperator<TransferableBlock> leftOperator = getOperator(requestId, 
joinNode.getInputs().get(0), metadataMap);
       BaseOperator<TransferableBlock> rightOperator = getOperator(requestId, 
joinNode.getInputs().get(1), metadataMap);
-      return new HashJoinOperator(leftOperator, rightOperator, 
joinNode.getCriteria());
+      return new HashJoinOperator(leftOperator, 
joinNode.getInputs().get(0).getDataSchema(), rightOperator,
+          joinNode.getInputs().get(1).getDataSchema(), 
joinNode.getDataSchema(), joinNode.getCriteria());
     } else if (stageNode instanceof AggregateNode) {
       AggregateNode aggregateNode = (AggregateNode) stageNode;
       BaseOperator<TransferableBlock> inputOperator =
           getOperator(requestId, aggregateNode.getInputs().get(0), 
metadataMap);
-      return new AggregateOperator(inputOperator, aggregateNode.getAggCalls(), 
aggregateNode.getGroupSet(),
-          aggregateNode.getInputs().get(0).getDataSchema());
+      return new AggregateOperator(inputOperator, 
aggregateNode.getDataSchema(), aggregateNode.getAggCalls(),
+          aggregateNode.getGroupSet(), 
aggregateNode.getInputs().get(0).getDataSchema());
     } else if (stageNode instanceof FilterNode) {
       throw new UnsupportedOperationException("Unsupported!");
     } else if (stageNode instanceof ProjectNode) {
       ProjectNode projectNode = (ProjectNode) stageNode;
       return new TransformOperator(getOperator(requestId, 
projectNode.getInputs().get(0), metadataMap),
-          projectNode.getProjects(), 
projectNode.getInputs().get(0).getDataSchema());
+          projectNode.getDataSchema(), projectNode.getProjects(), 
projectNode.getInputs().get(0).getDataSchema());
     } else {
       throw new UnsupportedOperationException(
           String.format("Stage node type %s is not supported!", 
stageNode.getClass().getSimpleName()));
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 1949cee595..200336ed77 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -29,8 +29,6 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.common.datablock.BaseDataBlock;
-import org.apache.pinot.core.common.datablock.DataBlockBuilder;
-import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -61,15 +59,17 @@ public class AggregateOperator extends 
BaseOperator<TransferableBlock> {
   private final Map<Integer, Object[]> _groupByKeyHolder;
 
   private DataSchema _upstreamDataSchema;
+  private TransferableBlock _upstreamErrorBlock;
   private boolean _isCumulativeBlockConstructed;
 
   // TODO: refactor Pinot Reducer code to support the intermediate stage agg 
operator.
-  public AggregateOperator(BaseOperator<TransferableBlock> inputOperator, 
List<RexExpression> aggCalls,
-      List<RexExpression> groupSet, DataSchema upstreamDataSchema) {
+  public AggregateOperator(BaseOperator<TransferableBlock> inputOperator, 
DataSchema dataSchema,
+      List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema 
upstreamDataSchema) {
     _inputOperator = inputOperator;
     _aggCalls = aggCalls;
     _groupSet = groupSet;
     _upstreamDataSchema = upstreamDataSchema;
+    _upstreamErrorBlock = null;
 
     _aggregationFunctions = new AggregationFunction[_aggCalls.size()];
     _aggregationFunctionInputRefs = new int[_aggCalls.size()];
@@ -80,20 +80,7 @@ public class AggregateOperator extends 
BaseOperator<TransferableBlock> {
       _aggregationFunctions[i] = toAggregationFunction(aggCalls.get(i), 
_aggregationFunctionInputRefs[i]);
       _groupByResultHolders[i] = new HashMap<Integer, Object>();
     }
-
-    String[] columnNames = new String[_groupSet.size() + _aggCalls.size()];
-    DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[_groupSet.size() + _aggCalls.size()];
-    for (int i = 0; i < _groupSet.size(); i++) {
-      int idx = ((RexExpression.InputRef) groupSet.get(i)).getIndex();
-      columnNames[i] = _upstreamDataSchema.getColumnName(idx);
-      columnDataTypes[i] = _upstreamDataSchema.getColumnDataType(idx);
-    }
-    for (int i = 0; i < _aggCalls.size(); i++) {
-      int idx = i + _groupSet.size();
-      columnNames[idx] = _aggregationFunctions[i].getColumnName();
-      columnDataTypes[idx] = 
_aggregationFunctions[i].getFinalResultColumnType();
-    }
-    _resultSchema = new DataSchema(columnNames, columnDataTypes);
+    _resultSchema = dataSchema;
 
     _isCumulativeBlockConstructed = false;
   }
@@ -119,15 +106,18 @@ public class AggregateOperator extends 
BaseOperator<TransferableBlock> {
   @Override
   protected TransferableBlock getNextBlock() {
     try {
-      cumulateAggregationBlocks();
-      return new TransferableBlock(toResultBlock());
+      consumeInputBlocks();
+      return produceAggregatedBlock();
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
     }
   }
 
-  private BaseDataBlock toResultBlock()
+  private TransferableBlock produceAggregatedBlock()
       throws IOException {
+    if (_upstreamErrorBlock != null) {
+      return _upstreamErrorBlock;
+    }
     if (!_isCumulativeBlockConstructed) {
       List<Object[]> rows = new ArrayList<>(_groupByKeyHolder.size());
       for (Map.Entry<Integer, Object[]> e : _groupByKeyHolder.entrySet()) {
@@ -143,36 +133,42 @@ public class AggregateOperator extends 
BaseOperator<TransferableBlock> {
       }
       _isCumulativeBlockConstructed = true;
       if (rows.size() == 0) {
-        return DataBlockUtils.getEmptyDataBlock(_resultSchema);
+        return 
TransferableBlockUtils.getEndOfStreamTransferableBlock(_resultSchema);
       } else {
-        return DataBlockBuilder.buildFromRows(rows, null, _resultSchema);
+        return new TransferableBlock(rows, _resultSchema, 
BaseDataBlock.Type.ROW);
       }
     } else {
-      return DataBlockUtils.getEndOfStreamDataBlock();
+      return 
TransferableBlockUtils.getEndOfStreamTransferableBlock(_resultSchema);
     }
   }
 
-  private void cumulateAggregationBlocks() {
-    TransferableBlock block = _inputOperator.nextBlock();
-    while (!TransferableBlockUtils.isEndOfStream(block)) {
-      BaseDataBlock dataBlock = block.getDataBlock();
-      int numRows = dataBlock.getNumberOfRows();
-      for (int rowId = 0; rowId < numRows; rowId++) {
-        Object[] row = 
SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
-        Key key = extraRowKey(row, _groupSet);
-        int keyHashCode = key.hashCode();
-        _groupByKeyHolder.put(keyHashCode, key.getValues());
-        for (int i = 0; i < _aggregationFunctions.length; i++) {
-          Object currentRes = _groupByResultHolders[i].get(keyHashCode);
-          if (currentRes == null) {
-            _groupByResultHolders[i].put(keyHashCode, 
row[_aggregationFunctionInputRefs[i]]);
-          } else {
-            _groupByResultHolders[i].put(keyHashCode,
-                merge(_aggCalls.get(i), currentRes, 
row[_aggregationFunctionInputRefs[i]]));
+  private void consumeInputBlocks() {
+    if (!_isCumulativeBlockConstructed) {
+      TransferableBlock block = _inputOperator.nextBlock();
+      while (!TransferableBlockUtils.isEndOfStream(block)) {
+        BaseDataBlock dataBlock = block.getDataBlock();
+        int numRows = dataBlock.getNumberOfRows();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object[] row = 
SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+          Key key = extraRowKey(row, _groupSet);
+          int keyHashCode = key.hashCode();
+          _groupByKeyHolder.put(keyHashCode, key.getValues());
+          for (int i = 0; i < _aggregationFunctions.length; i++) {
+            Object currentRes = _groupByResultHolders[i].get(keyHashCode);
+            if (currentRes == null) {
+              _groupByResultHolders[i].put(keyHashCode, 
row[_aggregationFunctionInputRefs[i]]);
+            } else {
+              _groupByResultHolders[i].put(keyHashCode,
+                  merge(_aggCalls.get(i), currentRes, 
row[_aggregationFunctionInputRefs[i]]));
+            }
           }
         }
+        block = _inputOperator.nextBlock();
+      }
+      // setting upstream error block
+      if (block.isErrorBlock()) {
+        _upstreamErrorBlock = block;
       }
-      block = _inputOperator.nextBlock();
     }
   }
 
@@ -212,7 +208,7 @@ public class AggregateOperator extends 
BaseOperator<TransferableBlock> {
         return ((Number) left).doubleValue() + ((Number) right).doubleValue();
       case "COUNT":
       case "$COUNT":
-        return (int) left + (int) right;
+        return ((Number) left).longValue() + ((Number) right).longValue();
       case "MIN":
       case "$MIN":
       case "$MIN0":
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 5c5ac00c0f..ed9743e88a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.stage.JoinNode;
@@ -47,22 +48,29 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
   private final HashMap<Integer, List<Object[]>> _broadcastHashTable;
   private final BaseOperator<TransferableBlock> _leftTableOperator;
   private final BaseOperator<TransferableBlock> _rightTableOperator;
-
-  private DataSchema _leftTableSchema;
-  private DataSchema _rightTableSchema;
-  private int _resultRowSize;
+  private final DataSchema _resultSchema;
+  private final DataSchema _leftTableSchema;
+  private final DataSchema _rightTableSchema;
+  private final int _resultRowSize;
   private boolean _isHashTableBuilt;
+  private TransferableBlock _upstreamErrorBlock;
   private KeySelector<Object[], Object[]> _leftKeySelector;
   private KeySelector<Object[], Object[]> _rightKeySelector;
 
-  public HashJoinOperator(BaseOperator<TransferableBlock> leftTableOperator,
-      BaseOperator<TransferableBlock> rightTableOperator, 
List<JoinNode.JoinClause> criteria) {
+  public HashJoinOperator(BaseOperator<TransferableBlock> leftTableOperator, 
DataSchema leftSchema,
+      BaseOperator<TransferableBlock> rightTableOperator, DataSchema 
rightSchema, DataSchema outputSchema,
+      List<JoinNode.JoinClause> criteria) {
     _leftKeySelector = criteria.get(0).getLeftJoinKeySelector();
     _rightKeySelector = criteria.get(0).getRightJoinKeySelector();
     _leftTableOperator = leftTableOperator;
     _rightTableOperator = rightTableOperator;
+    _resultSchema = outputSchema;
+    _leftTableSchema = leftSchema;
+    _rightTableSchema = rightSchema;
+    _resultRowSize = _resultSchema.size();
     _isHashTableBuilt = false;
     _broadcastHashTable = new HashMap<>();
+    _upstreamErrorBlock = null;
   }
 
   @Override
@@ -79,7 +87,12 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
 
   @Override
   protected TransferableBlock getNextBlock() {
+    // Build JOIN hash table
     buildBroadcastHashTable();
+    if (_upstreamErrorBlock != null) {
+      return _upstreamErrorBlock;
+    }
+    // JOIN each left block with the right block.
     try {
       return buildJoinedDataBlock(_leftTableOperator.nextBlock());
     } catch (Exception e) {
@@ -91,7 +104,6 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
     if (!_isHashTableBuilt) {
       TransferableBlock rightBlock = _rightTableOperator.nextBlock();
       while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
-        _rightTableSchema = rightBlock.getDataSchema();
         List<Object[]> container = rightBlock.getContainer();
         // put all the rows into corresponding hash collections keyed by the 
key selector function.
         for (Object[] row : container) {
@@ -101,27 +113,32 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
         }
         rightBlock = _rightTableOperator.nextBlock();
       }
+      if (rightBlock.isErrorBlock()) {
+        _upstreamErrorBlock = rightBlock;
+      }
       _isHashTableBuilt = true;
     }
   }
 
   private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock)
       throws Exception {
-    if (TransferableBlockUtils.isEndOfStream(leftBlock)) {
-      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
-    }
-    List<Object[]> rows = new ArrayList<>();
-    _leftTableSchema = leftBlock.getDataSchema();
-    _resultRowSize = _leftTableSchema.size() + _rightTableSchema.size();
-    List<Object[]> container = leftBlock.getContainer();
-    for (Object[] leftRow : container) {
-      List<Object[]> hashCollection =
-          
_broadcastHashTable.getOrDefault(_leftKeySelector.computeHash(leftRow), 
Collections.emptyList());
-      for (Object[] rightRow : hashCollection) {
-        rows.add(joinRow(leftRow, rightRow));
+    if (!TransferableBlockUtils.isEndOfStream(leftBlock)) {
+      List<Object[]> rows = new ArrayList<>();
+      List<Object[]> container = leftBlock.getContainer();
+      for (Object[] leftRow : container) {
+        List<Object[]> hashCollection = _broadcastHashTable.getOrDefault(
+            _leftKeySelector.computeHash(leftRow), Collections.emptyList());
+        for (Object[] rightRow : hashCollection) {
+          rows.add(joinRow(leftRow, rightRow));
+        }
       }
+      return new TransferableBlock(rows, computeSchema(), 
BaseDataBlock.Type.ROW);
+    } else if (leftBlock.isErrorBlock()) {
+      _upstreamErrorBlock = leftBlock;
+      return _upstreamErrorBlock;
+    } else {
+      return new 
TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(_resultSchema));
     }
-    return new TransferableBlock(rows, computeSchema(), 
BaseDataBlock.Type.ROW);
   }
 
   private Object[] joinRow(Object[] leftRow, Object[] rightRow) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 0b8d88cd00..014cf49bed 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -27,6 +27,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.common.datablock.BaseDataBlock;
 import org.apache.pinot.core.common.datablock.DataBlockUtils;
+import org.apache.pinot.core.common.datablock.MetadataBlock;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxService;
@@ -50,14 +51,17 @@ public class MailboxReceiveOperator extends 
BaseOperator<TransferableBlock> {
   private final MailboxService<Mailbox.MailboxContent> _mailboxService;
   private final RelDistribution.Type _exchangeType;
   private final List<ServerInstance> _sendingStageInstances;
+  private final DataSchema _dataSchema;
   private final String _hostName;
   private final int _port;
   private final long _jobId;
   private final int _stageId;
+  private TransferableBlock _upstreamErrorBlock;
 
-  public MailboxReceiveOperator(MailboxService<Mailbox.MailboxContent> 
mailboxService,
+  public MailboxReceiveOperator(MailboxService<Mailbox.MailboxContent> 
mailboxService, DataSchema dataSchema,
       RelDistribution.Type exchangeType, List<ServerInstance> 
sendingStageInstances, String hostName, int port,
       long jobId, int stageId) {
+    _dataSchema = dataSchema;
     _mailboxService = mailboxService;
     _exchangeType = exchangeType;
     _sendingStageInstances = sendingStageInstances;
@@ -65,6 +69,7 @@ public class MailboxReceiveOperator extends 
BaseOperator<TransferableBlock> {
     _port = port;
     _jobId = jobId;
     _stageId = stageId;
+    _upstreamErrorBlock = null;
   }
 
   @Override
@@ -81,9 +86,11 @@ public class MailboxReceiveOperator extends 
BaseOperator<TransferableBlock> {
 
   @Override
   protected TransferableBlock getNextBlock() {
+    if (_upstreamErrorBlock != null) {
+      return _upstreamErrorBlock;
+    }
     // TODO: do a round robin check against all MailboxContentStreamObservers 
and find which one that has data.
     boolean hasOpenedMailbox = true;
-    DataSchema dataSchema = null;
     long timeoutWatermark = System.nanoTime() + DEFAULT_TIMEOUT_NANO;
     while (hasOpenedMailbox && System.nanoTime() < timeoutWatermark) {
       hasOpenedMailbox = false;
@@ -100,6 +107,10 @@ public class MailboxReceiveOperator extends 
BaseOperator<TransferableBlock> {
               ByteBuffer byteBuffer = 
mailboxContent.getPayload().asReadOnlyByteBuffer();
               if (byteBuffer.hasRemaining()) {
                 BaseDataBlock dataBlock = 
DataBlockUtils.getDataBlock(byteBuffer);
+                if (dataBlock instanceof MetadataBlock && 
!dataBlock.getExceptions().isEmpty()) {
+                  _upstreamErrorBlock = 
TransferableBlockUtils.getErrorTransferableBlock(dataBlock.getExceptions());
+                  return _upstreamErrorBlock;
+                }
                 if (dataBlock.getNumberOfRows() > 0) {
                   // here we only return data table block when it is not empty.
                   return new TransferableBlock(dataBlock);
@@ -117,7 +128,7 @@ public class MailboxReceiveOperator extends 
BaseOperator<TransferableBlock> {
     }
     // TODO: we need to at least return one data table with schema if there's 
no error.
     // we need to condition this on whether there's already things being 
returned or not.
-    return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
   }
 
   public RelDistribution.Type getExchangeType() {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 632c617d4e..bcd49f0b9a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -24,21 +24,23 @@ import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.proto.Mailbox;
-import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.common.datablock.BaseDataBlock;
 import org.apache.pinot.core.common.datablock.DataBlockBuilder;
-import org.apache.pinot.core.common.datablock.DataBlockUtils;
+import org.apache.pinot.core.common.datablock.MetadataBlock;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.mailbox.channel.ChannelUtils;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -55,6 +57,7 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
   private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE =
       ImmutableSet.of(RelDistribution.Type.SINGLETON, 
RelDistribution.Type.RANDOM_DISTRIBUTED,
           RelDistribution.Type.BROADCAST_DISTRIBUTED, 
RelDistribution.Type.HASH_DISTRIBUTED);
+  private static final Random RANDOM = new Random();
 
   private final List<ServerInstance> _receivingStageInstances;
   private final RelDistribution.Type _exchangeType;
@@ -64,13 +67,14 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
   private final long _jobId;
   private final int _stageId;
   private final MailboxService<Mailbox.MailboxContent> _mailboxService;
+  private final DataSchema _dataSchema;
   private BaseOperator<TransferableBlock> _dataTableBlockBaseOperator;
-  private BaseDataBlock _dataTable;
 
-  public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> 
mailboxService,
+  public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> 
mailboxService, DataSchema dataSchema,
       BaseOperator<TransferableBlock> dataTableBlockBaseOperator, 
List<ServerInstance> receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> 
keySelector, String hostName, int port,
       long jobId, int stageId) {
+    _dataSchema = dataSchema;
     _mailboxService = mailboxService;
     _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
     _receivingStageInstances = receivingStageInstances;
@@ -84,25 +88,6 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
         String.format("Exchange type '%s' is not supported yet", 
_exchangeType));
   }
 
-  /**
-   * This is a temporary interface for connecting with server API. 
remove/merge with InstanceResponseOperator once
-   * we create a {@link 
org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl} that can handle 
the
-   * creation of MailboxSendOperator we should not use this API.
-   */
-  public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> 
mailboxService, BaseDataBlock dataTable,
-      List<ServerInstance> receivingStageInstances, RelDistribution.Type 
exchangeType,
-      KeySelector<Object[], Object[]> keySelector, String hostName, int port, 
long jobId, int stageId) {
-    _mailboxService = mailboxService;
-    _dataTable = dataTable;
-    _receivingStageInstances = receivingStageInstances;
-    _exchangeType = exchangeType;
-    _keySelector = keySelector;
-    _serverHostName = hostName;
-    _serverPort = port;
-    _jobId = jobId;
-    _stageId = stageId;
-  }
-
   @Override
   public List<Operator> getChildOperators() {
     // WorkerExecutor doesn't use getChildOperators, returns null here.
@@ -117,43 +102,40 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    BaseDataBlock dataTable;
+    BaseDataBlock dataBlock;
     TransferableBlock transferableBlock = null;
     boolean isEndOfStream;
-    if (_dataTableBlockBaseOperator != null) {
-      transferableBlock = _dataTableBlockBaseOperator.nextBlock();
-      dataTable = transferableBlock.getDataBlock();
-      isEndOfStream = TransferableBlockUtils.isEndOfStream(transferableBlock);
-    } else {
-      dataTable = _dataTable;
-      isEndOfStream = true;
-    }
+    transferableBlock = _dataTableBlockBaseOperator.nextBlock();
+    dataBlock = transferableBlock.getDataBlock();
+    isEndOfStream = TransferableBlockUtils.isEndOfStream(transferableBlock);
 
     try {
       switch (_exchangeType) {
-        // TODO: random and singleton distribution should've been selected in 
planning phase.
         case SINGLETON:
+          // TODO: singleton or random distribution should've been 
distinguished in planning phase.
         case RANDOM_DISTRIBUTED:
-          // TODO: make random distributed actually random, this impl only 
sends data to the first instances.
-          for (ServerInstance serverInstance : _receivingStageInstances) {
-            sendDataTableBlock(serverInstance, dataTable, isEndOfStream);
-            // we no longer need to send data to the rest of the receiving 
instances, but we still need to transfer
-            // the dataTable over indicating that we are a potential sender. 
thus next time a random server is selected
-            // it might still be useful.
-            dataTable = 
DataBlockUtils.getEmptyDataBlock(dataTable.getDataSchema());
+          if (isEndOfStream) {
+            for (ServerInstance serverInstance : _receivingStageInstances) {
+              sendDataTableBlock(serverInstance, dataBlock);
+            }
+          } else {
+            int randomInstanceIdx = _exchangeType == 
RelDistribution.Type.SINGLETON ? 0
+                : RANDOM.nextInt(_receivingStageInstances.size());
+            ServerInstance randomInstance = 
_receivingStageInstances.get(randomInstanceIdx);
+            sendDataTableBlock(randomInstance, dataBlock);
           }
           break;
         case BROADCAST_DISTRIBUTED:
           for (ServerInstance serverInstance : _receivingStageInstances) {
-            sendDataTableBlock(serverInstance, dataTable, isEndOfStream);
+            sendDataTableBlock(serverInstance, dataBlock);
           }
           break;
         case HASH_DISTRIBUTED:
           // TODO: ensure that server instance list is sorted using same 
function in sender.
-          List<BaseDataBlock> dataTableList = 
constructPartitionedDataBlock(dataTable, _keySelector,
-              _receivingStageInstances.size());
+          List<BaseDataBlock> dataTableList = 
constructPartitionedDataBlock(dataBlock, _keySelector,
+              _receivingStageInstances.size(), isEndOfStream);
           for (int i = 0; i < _receivingStageInstances.size(); i++) {
-            sendDataTableBlock(_receivingStageInstances.get(i), 
dataTableList.get(i), isEndOfStream);
+            sendDataTableBlock(_receivingStageInstances.get(i), 
dataTableList.get(i));
           }
           break;
         case RANGE_DISTRIBUTED:
@@ -168,43 +150,51 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
     return transferableBlock;
   }
 
-  private static List<BaseDataBlock> constructPartitionedDataBlock(DataTable 
dataTable,
-      KeySelector<Object[], Object[]> keySelector, int partitionSize)
+  private static List<BaseDataBlock> 
constructPartitionedDataBlock(BaseDataBlock dataBlock,
+      KeySelector<Object[], Object[]> keySelector, int partitionSize, boolean 
isEndOfStream)
       throws Exception {
-    List<List<Object[]>> temporaryRows = new ArrayList<>(partitionSize);
-    for (int i = 0; i < partitionSize; i++) {
-      temporaryRows.add(new ArrayList<>());
-    }
-    for (int rowId = 0; rowId < dataTable.getNumberOfRows(); rowId++) {
-      Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataTable, 
rowId);
-      int partitionId = keySelector.computeHash(row) % partitionSize;
-      temporaryRows.get(partitionId).add(row);
-    }
-    List<BaseDataBlock> dataTableList = new ArrayList<>(partitionSize);
-    for (int i = 0; i < partitionSize; i++) {
-      List<Object[]> objects = temporaryRows.get(i);
-      dataTableList.add(DataBlockBuilder.buildFromRows(objects, null, 
dataTable.getDataSchema()));
+    if (isEndOfStream) {
+      List<BaseDataBlock> dataTableList = new ArrayList<>(partitionSize);
+      for (int i = 0; i < partitionSize; i++) {
+        dataTableList.add(dataBlock);
+      }
+      return dataTableList;
+    } else {
+      List<List<Object[]>> temporaryRows = new ArrayList<>(partitionSize);
+      for (int i = 0; i < partitionSize; i++) {
+        temporaryRows.add(new ArrayList<>());
+      }
+      for (int rowId = 0; rowId < dataBlock.getNumberOfRows(); rowId++) {
+        Object[] row = 
SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+        int partitionId = keySelector.computeHash(row) % partitionSize;
+        temporaryRows.get(partitionId).add(row);
+      }
+      List<BaseDataBlock> dataTableList = new ArrayList<>(partitionSize);
+      for (int i = 0; i < partitionSize; i++) {
+        List<Object[]> objects = temporaryRows.get(i);
+        dataTableList.add(DataBlockBuilder.buildFromRows(objects, null, 
dataBlock.getDataSchema()));
+      }
+      return dataTableList;
     }
-    return dataTableList;
   }
 
-  private void sendDataTableBlock(ServerInstance serverInstance, BaseDataBlock 
dataTable, boolean isEndOfStream)
+  private void sendDataTableBlock(ServerInstance serverInstance, BaseDataBlock 
dataBlock)
       throws IOException {
     String mailboxId = toMailboxId(serverInstance);
     SendingMailbox<Mailbox.MailboxContent> sendingMailbox = 
_mailboxService.getSendingMailbox(mailboxId);
-    Mailbox.MailboxContent mailboxContent = toMailboxContent(mailboxId, 
dataTable, isEndOfStream);
+    Mailbox.MailboxContent mailboxContent = toMailboxContent(mailboxId, 
dataBlock);
     sendingMailbox.send(mailboxContent);
-    if (mailboxContent.getMetadataMap().containsKey("finished")) {
+    if 
(mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY))
 {
       sendingMailbox.complete();
     }
   }
 
-  private Mailbox.MailboxContent toMailboxContent(String mailboxId, 
BaseDataBlock dataTable, boolean isEndOfStream)
+  private Mailbox.MailboxContent toMailboxContent(String mailboxId, 
BaseDataBlock dataBlock)
       throws IOException {
     Mailbox.MailboxContent.Builder builder = 
Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
-        .setPayload(ByteString.copyFrom(new 
TransferableBlock(dataTable).toBytes()));
-    if (isEndOfStream) {
-      builder.putMetadata("finished", "true");
+        .setPayload(ByteString.copyFrom(new 
TransferableBlock(dataBlock).toBytes()));
+    if (dataBlock instanceof MetadataBlock) {
+      builder.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, 
"true");
     }
     return builder.build();
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index 6744e35d99..7c2369e6c0 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.function.FunctionUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -44,22 +45,17 @@ public class TransformOperator extends 
BaseOperator<TransferableBlock> {
   private final List<TransformOperands> _transformOperandsList;
   private final int _resultColumnSize;
   private final DataSchema _resultSchema;
+  private TransferableBlock _upstreamErrorBlock;
 
-  public TransformOperator(BaseOperator<TransferableBlock> upstreamOperator, 
List<RexExpression> transforms,
-      DataSchema upstreamDataSchema) {
+  public TransformOperator(BaseOperator<TransferableBlock> upstreamOperator, 
DataSchema dataSchema,
+      List<RexExpression> transforms, DataSchema upstreamDataSchema) {
     _upstreamOperator = upstreamOperator;
     _resultColumnSize = transforms.size();
     _transformOperandsList = new ArrayList<>(_resultColumnSize);
     for (RexExpression rexExpression : transforms) {
       
_transformOperandsList.add(TransformOperands.toFunctionOperands(rexExpression, 
upstreamDataSchema));
     }
-    String[] columnNames = new String[_resultColumnSize];
-    DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[_resultColumnSize];
-    for (int i = 0; i < _resultColumnSize; i++) {
-      columnNames[i] = _transformOperandsList.get(i).getResultName();
-      columnDataTypes[i] = _transformOperandsList.get(i).getResultType();
-    }
-    _resultSchema = new DataSchema(columnNames, columnDataTypes);
+    _resultSchema = dataSchema;
   }
 
   @Override
@@ -85,19 +81,26 @@ public class TransformOperator extends 
BaseOperator<TransferableBlock> {
 
   private TransferableBlock transform(TransferableBlock block)
       throws Exception {
-    if (TransferableBlockUtils.isEndOfStream(block)) {
-      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    if (_upstreamErrorBlock != null) {
+      return _upstreamErrorBlock;
     }
-    List<Object[]> resultRows = new ArrayList<>();
-    List<Object[]> container = block.getContainer();
-    for (Object[] row : container) {
-      Object[] resultRow = new Object[_resultColumnSize];
-      for (int i = 0; i < _resultColumnSize; i++) {
-        resultRow[i] = _transformOperandsList.get(i).apply(row);
+    if (!TransferableBlockUtils.isEndOfStream(block)) {
+      List<Object[]> resultRows = new ArrayList<>();
+      List<Object[]> container = block.getContainer();
+      for (Object[] row : container) {
+        Object[] resultRow = new Object[_resultColumnSize];
+        for (int i = 0; i < _resultColumnSize; i++) {
+          resultRow[i] = _transformOperandsList.get(i).apply(row);
+        }
+        resultRows.add(resultRow);
       }
-      resultRows.add(resultRow);
+      return new TransferableBlock(resultRows, _resultSchema, 
BaseDataBlock.Type.ROW);
+    } else if (block.isErrorBlock()) {
+      _upstreamErrorBlock = block;
+      return _upstreamErrorBlock;
+    } else {
+      return new 
TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(_resultSchema));
     }
-    return new TransferableBlock(resultRows, _resultSchema, 
BaseDataBlock.Type.ROW);
   }
 
   private static abstract class TransformOperands {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index f0236eb7ff..70989caca0 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datablock.BaseDataBlock;
 import org.apache.pinot.core.transport.ServerInstance;
@@ -64,7 +65,7 @@ public class QueryDispatcher {
     MailboxReceiveNode reduceNode = (MailboxReceiveNode) 
queryPlan.getQueryStageMap().get(reduceStageId);
     MailboxReceiveOperator mailboxReceiveOperator = 
createReduceStageOperator(mailboxService,
         
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
-        requestId, reduceNode.getSenderStageId(), mailboxService.getHostname(),
+        requestId, reduceNode.getSenderStageId(), reduceNode.getDataSchema(), 
mailboxService.getHostname(),
         mailboxService.getMailboxPort());
     return reduceMailboxReceive(mailboxReceiveOperator);
   }
@@ -118,6 +119,12 @@ public class QueryDispatcher {
     while (true) {
       transferableBlock = mailboxReceiveOperator.nextBlock();
       if (TransferableBlockUtils.isEndOfStream(transferableBlock)) {
+        // TODO: we only received bubble up error from the execution stage 
tree.
+        // TODO: query dispatch should also send cancel signal to the rest of 
the execution stage tree.
+        if (transferableBlock.isErrorBlock()) {
+          throw new RuntimeException("Received error query execution result 
block: "
+              + transferableBlock.getDataBlock().getExceptions());
+        }
         break;
       }
       if (transferableBlock.getDataBlock() != null) {
@@ -129,10 +136,11 @@ public class QueryDispatcher {
   }
 
   public static MailboxReceiveOperator 
createReduceStageOperator(MailboxService<Mailbox.MailboxContent> mailboxService,
-      List<ServerInstance> sendingInstances, long jobId, int stageId, String 
hostname, int port) {
+      List<ServerInstance> sendingInstances, long jobId, int stageId, 
DataSchema dataSchema, String hostname,
+      int port) {
     MailboxReceiveOperator mailboxReceiveOperator =
-        new MailboxReceiveOperator(mailboxService, RelDistribution.Type.ANY, 
sendingInstances, hostname, port, jobId,
-            stageId);
+        new MailboxReceiveOperator(mailboxService, dataSchema, 
RelDistribution.Type.ANY, sendingInstances, hostname,
+            port, jobId, stageId);
     return mailboxReceiveOperator;
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index 07c69a05ac..5661d9cae7 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -24,7 +24,9 @@ import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.datablock.DataBlockUtils;
+import org.apache.pinot.query.mailbox.channel.ChannelUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
@@ -66,8 +68,11 @@ public class GrpcMailboxServiceTest extends 
GrpcMailboxServiceTestBase {
   private Mailbox.MailboxContent getTestMailboxContent(String mailboxId)
       throws IOException {
     return Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
-        .putAllMetadata(ImmutableMap.of("key", "value", "finished", "true"))
-        .setPayload(ByteString.copyFrom(new 
TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock()).toBytes()))
+        .putAllMetadata(ImmutableMap.of("key", "value", 
ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true"))
+        .setPayload(ByteString.copyFrom(new 
TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(new DataSchema(
+            new String[]{"foo", "bar"},
+            new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.STRING}))
+        ).toBytes()))
         .build();
   }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerExceptionTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerExceptionTest.java
new file mode 100644
index 0000000000..3b5a94d560
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerExceptionTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.service.QueryDispatcher;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class QueryRunnerExceptionTest extends QueryRunnerTestBase {
+
+  @Test(dataProvider = "testDataWithSqlExecutionExceptions")
+  public void testSqlWithFinalRowCountChecker(String sql, String exeptionMsg) {
+    QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
+    Map<String, String> requestMetadataMap =
+        ImmutableMap.of("REQUEST_ID", 
String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()));
+    MailboxReceiveOperator mailboxReceiveOperator = null;
+    for (int stageId : queryPlan.getStageMetadataMap().keySet()) {
+      if (queryPlan.getQueryStageMap().get(stageId) instanceof 
MailboxReceiveNode) {
+        MailboxReceiveNode reduceNode = (MailboxReceiveNode) 
queryPlan.getQueryStageMap().get(stageId);
+        mailboxReceiveOperator = 
QueryDispatcher.createReduceStageOperator(_mailboxService,
+            
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
+            Long.parseLong(requestMetadataMap.get("REQUEST_ID")), 
reduceNode.getSenderStageId(),
+            reduceNode.getDataSchema(), "localhost", _reducerGrpcPort);
+      } else {
+        for (ServerInstance serverInstance : 
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
+          DistributedStagePlan distributedStagePlan =
+              QueryDispatcher.constructDistributedStagePlan(queryPlan, 
stageId, serverInstance);
+          _servers.get(serverInstance).processQuery(distributedStagePlan, 
requestMetadataMap);
+        }
+      }
+    }
+    Preconditions.checkNotNull(mailboxReceiveOperator);
+
+    try {
+      QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator);
+    } catch (RuntimeException rte) {
+      Assert.assertTrue(rte.getMessage().contains("Received error query 
execution result block"));
+      Assert.assertTrue(rte.getMessage().contains(exeptionMsg));
+    }
+  }
+
+  @DataProvider(name = "testDataWithSqlExecutionExceptions")
+  private Object[][] provideTestSqlWithExecutionException() {
+    return new Object[][] {
+        // default planner will auto-cast string column to numeric on JOIN 
condition, so exception is:
+        // "error while invoking cast function", because the cast cannot be 
done.
+        new Object[]{"SELECT a.col2 - b.col3 FROM a JOIN b ON a.col1 = 
b.col1", "transform function: cast"},
+        new Object[]{"SELECT a.col2, b.col1 FROM a JOIN b ON a.col1 = b.col3", 
"transform function: cast"},
+    };
+  }
+}
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 1142210de8..0fcddab91b 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -20,89 +20,21 @@ package org.apache.pinot.query.runtime;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.transport.ServerInstance;
-import org.apache.pinot.query.QueryEnvironment;
-import org.apache.pinot.query.QueryEnvironmentTestUtils;
-import org.apache.pinot.query.QueryServerEnclosure;
-import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.routing.WorkerInstance;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
-import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.query.service.QueryDispatcher;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import static 
org.apache.pinot.core.query.selection.SelectionOperatorUtils.extractRowFromDataTable;
-
-
-public class QueryRunnerTest {
-  private static final Random RANDOM_REQUEST_ID_GEN = new Random();
-  private static final File INDEX_DIR_S1_A = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableA");
-  private static final File INDEX_DIR_S1_B = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableB");
-  private static final File INDEX_DIR_S2_A = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableA");
-  private static final File INDEX_DIR_S1_C = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableC");
-  private static final File INDEX_DIR_S2_C = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableC");
-
-  private QueryEnvironment _queryEnvironment;
-  private String _reducerHostname;
-  private int _reducerGrpcPort;
-  private Map<ServerInstance, QueryServerEnclosure> _servers = new HashMap<>();
-  private GrpcMailboxService _mailboxService;
-
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
-    QueryServerEnclosure server1 = new 
QueryServerEnclosure(Lists.newArrayList("a", "b", "c"),
-        ImmutableMap.of("a", INDEX_DIR_S1_A, "b", INDEX_DIR_S1_B, "c", 
INDEX_DIR_S1_C),
-        QueryEnvironmentTestUtils.SERVER1_SEGMENTS);
-    QueryServerEnclosure server2 = new 
QueryServerEnclosure(Lists.newArrayList("a", "c"),
-        ImmutableMap.of("a", INDEX_DIR_S2_A, "c", INDEX_DIR_S2_C), 
QueryEnvironmentTestUtils.SERVER2_SEGMENTS);
-
-    _reducerGrpcPort = QueryEnvironmentTestUtils.getAvailablePort();
-    _reducerHostname = String.format("Broker_%s", 
QueryConfig.DEFAULT_QUERY_RUNNER_HOSTNAME);
-    Map<String, Object> reducerConfig = new HashMap<>();
-    reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, _reducerGrpcPort);
-    reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME, 
_reducerHostname);
-    _mailboxService = new GrpcMailboxService(_reducerHostname, 
_reducerGrpcPort);
-    _mailboxService.start();
-
-    _queryEnvironment = 
QueryEnvironmentTestUtils.getQueryEnvironment(_reducerGrpcPort, 
server1.getPort(),
-        server2.getPort());
-    server1.start();
-    server2.start();
-    // this doesn't test the QueryServer functionality so the server port can 
be the same as the mailbox port.
-    // this is only use for test identifier purpose.
-    int port1 = server1.getPort();
-    int port2 = server2.getPort();
-    _servers.put(new WorkerInstance("localhost", port1, port1, port1, port1), 
server1);
-    _servers.put(new WorkerInstance("localhost", port2, port2, port2, port2), 
server2);
-  }
 
-  @AfterClass
-  public void tearDown() {
-    DataTableFactory.setDataTableVersion(DataTableFactory.DEFAULT_VERSION);
-    for (QueryServerEnclosure server : _servers.values()) {
-      server.shutDown();
-    }
-    _mailboxService.shutdown();
-  }
+
+public class QueryRunnerTest extends QueryRunnerTestBase {
 
   @Test(dataProvider = "testDataWithSqlToFinalRowCount")
   public void testSqlWithFinalRowCountChecker(String sql, int 
expectedRowCount) {
@@ -115,8 +47,8 @@ public class QueryRunnerTest {
         MailboxReceiveNode reduceNode = (MailboxReceiveNode) 
queryPlan.getQueryStageMap().get(stageId);
         mailboxReceiveOperator = 
QueryDispatcher.createReduceStageOperator(_mailboxService,
             
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
-            Long.parseLong(requestMetadataMap.get("REQUEST_ID")), 
reduceNode.getSenderStageId(), "localhost",
-            _reducerGrpcPort);
+            Long.parseLong(requestMetadataMap.get("REQUEST_ID")), 
reduceNode.getSenderStageId(),
+            reduceNode.getDataSchema(), "localhost", _reducerGrpcPort);
       } else {
         for (ServerInstance serverInstance : 
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
           DistributedStagePlan distributedStagePlan =
@@ -131,17 +63,6 @@ public class QueryRunnerTest {
     Assert.assertEquals(resultRows.size(), expectedRowCount);
   }
 
-  private static List<Object[]> toRows(List<DataTable> dataTables) {
-    List<Object[]> resultRows = new ArrayList<>();
-    for (DataTable dataTable : dataTables) {
-      int numRows = dataTable.getNumberOfRows();
-      for (int rowId = 0; rowId < numRows; rowId++) {
-        resultRows.add(extractRowFromDataTable(dataTable, rowId));
-      }
-    }
-    return resultRows;
-  }
-
   @DataProvider(name = "testDataWithSqlToFinalRowCount")
   private Object[][] provideTestSqlAndRowCount() {
     return new Object[][] {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
new file mode 100644
index 0000000000..e32ee1db49
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.QueryEnvironment;
+import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.QueryServerEnclosure;
+import org.apache.pinot.query.mailbox.GrpcMailboxService;
+import org.apache.pinot.query.routing.WorkerInstance;
+import org.apache.pinot.query.service.QueryConfig;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+import static 
org.apache.pinot.core.query.selection.SelectionOperatorUtils.extractRowFromDataTable;
+
+
+public class QueryRunnerTestBase {
+  private static final File INDEX_DIR_S1_A = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableA");
+  private static final File INDEX_DIR_S1_B = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableB");
+  private static final File INDEX_DIR_S2_A = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableA");
+  private static final File INDEX_DIR_S1_C = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableC");
+  private static final File INDEX_DIR_S2_C = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableC");
+
+  protected static final Random RANDOM_REQUEST_ID_GEN = new Random();
+
+  protected QueryEnvironment _queryEnvironment;
+  protected String _reducerHostname;
+  protected int _reducerGrpcPort;
+  protected Map<ServerInstance, QueryServerEnclosure> _servers = new 
HashMap<>();
+  protected GrpcMailboxService _mailboxService;
+
+  protected static List<Object[]> toRows(List<DataTable> dataTables) {
+    List<Object[]> resultRows = new ArrayList<>();
+    for (DataTable dataTable : dataTables) {
+      int numRows = dataTable.getNumberOfRows();
+      for (int rowId = 0; rowId < numRows; rowId++) {
+        resultRows.add(extractRowFromDataTable(dataTable, rowId));
+      }
+    }
+    return resultRows;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
+    QueryServerEnclosure server1 = new 
QueryServerEnclosure(Lists.newArrayList("a", "b", "c"),
+        ImmutableMap.of("a", INDEX_DIR_S1_A, "b", INDEX_DIR_S1_B, "c", 
INDEX_DIR_S1_C),
+        QueryEnvironmentTestUtils.SERVER1_SEGMENTS);
+    QueryServerEnclosure server2 = new 
QueryServerEnclosure(Lists.newArrayList("a", "c"),
+        ImmutableMap.of("a", INDEX_DIR_S2_A, "c", INDEX_DIR_S2_C), 
QueryEnvironmentTestUtils.SERVER2_SEGMENTS);
+
+    _reducerGrpcPort = QueryEnvironmentTestUtils.getAvailablePort();
+    _reducerHostname = String.format("Broker_%s", 
QueryConfig.DEFAULT_QUERY_RUNNER_HOSTNAME);
+    Map<String, Object> reducerConfig = new HashMap<>();
+    reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, _reducerGrpcPort);
+    reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME, 
_reducerHostname);
+    _mailboxService = new GrpcMailboxService(_reducerHostname, 
_reducerGrpcPort);
+    _mailboxService.start();
+
+    _queryEnvironment = 
QueryEnvironmentTestUtils.getQueryEnvironment(_reducerGrpcPort, 
server1.getPort(),
+        server2.getPort());
+    server1.start();
+    server2.start();
+    // this doesn't test the QueryServer functionality so the server port can 
be the same as the mailbox port.
+    // this is only use for test identifier purpose.
+    int port1 = server1.getPort();
+    int port2 = server2.getPort();
+    _servers.put(new WorkerInstance("localhost", port1, port1, port1, port1), 
server1);
+    _servers.put(new WorkerInstance("localhost", port2, port2, port2, port2), 
server2);
+  }
+
+  @AfterClass
+  public void tearDown() {
+    DataTableFactory.setDataTableVersion(DataTableFactory.DEFAULT_VERSION);
+    for (QueryServerEnclosure server : _servers.values()) {
+      server.shutDown();
+    }
+    _mailboxService.shutdown();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to