This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b03b5e434f [multistage] Fix data block transfer semantics and plan
node data schema usage. (#9064)
b03b5e434f is described below
commit b03b5e434f8eda78cbca3007d285453f1d48e1c4
Author: Rong Rong <[email protected]>
AuthorDate: Sat Jul 23 11:21:43 2022 -0700
[multistage] Fix data block transfer semantics and plan node data schema
usage. (#9064)
* fix data schema usage in stage to operator
* adding error handling to all block transfer
* carve out test files
* standardized channel metadata
* standardized block transfer
- always transfer data block then end-of-stream empty block (with or w/o
error)
- processing finishes when end-of-stream block is reached
- if end-of-stream is error, propagate
- if end-of-stream is no-error, send normal data block then another new
end-of-stream with proper schema
- error gets propagate all the way through end
- do random selection and singleton differently
* fix error block bubbling issue
* address diff comments
- remove empty block util, replace with end-of-stream
- remove schema in error block
- no longer repackage error block
- upstream error block are stored in intermediate stage so the next time
getBlock() is called it will return error
- this should also log a warning b/c once error block is sent, there
should be no more upstream call to getBlock()
Co-authored-by: Rong Rong <[email protected]>
---
.../MultiStageBrokerRequestHandler.java | 2 +-
.../pinot/core/common/datablock/BaseDataBlock.java | 4 +-
.../core/common/datablock/DataBlockUtils.java | 32 +++---
.../pinot/core/common/datablock/MetadataBlock.java | 4 +
.../pinot/core/common/datablock/DataBlockTest.java | 3 +-
.../apache/pinot/query/QueryEnvironmentTest.java | 20 ++++
.../pinot/query/QueryEnvironmentTestBase.java | 4 -
.../pinot/query/mailbox/channel/ChannelUtils.java | 29 +++++
.../channel/MailboxContentStreamObserver.java | 6 +-
.../channel/MailboxStatusStreamObserver.java | 5 +-
.../apache/pinot/query/runtime/QueryRunner.java | 82 ++++++++++++--
.../query/runtime/blocks/TransferableBlock.java | 55 ++++++++-
.../runtime/blocks/TransferableBlockUtils.java | 17 +--
.../runtime/executor/WorkerQueryExecutor.java | 21 ++--
.../query/runtime/operator/AggregateOperator.java | 82 +++++++-------
.../query/runtime/operator/HashJoinOperator.java | 57 ++++++----
.../runtime/operator/MailboxReceiveOperator.java | 17 ++-
.../runtime/operator/MailboxSendOperator.java | 124 ++++++++++-----------
.../query/runtime/operator/TransformOperator.java | 41 +++----
.../pinot/query/service/QueryDispatcher.java | 16 ++-
.../query/mailbox/GrpcMailboxServiceTest.java | 9 +-
.../query/runtime/QueryRunnerExceptionTest.java | 77 +++++++++++++
.../pinot/query/runtime/QueryRunnerTest.java | 87 +--------------
.../pinot/query/runtime/QueryRunnerTestBase.java | 109 ++++++++++++++++++
24 files changed, 600 insertions(+), 303 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index e992031288..d7b7a1c802 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -146,7 +146,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
try {
queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan,
_mailboxService, DEFAULT_TIMEOUT_NANO);
} catch (Exception e) {
- LOGGER.info("query submission failed", e);
+ LOGGER.info("query execution failed", e);
return new
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
index 2af725dc6f..395bc49d05 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
@@ -102,11 +102,11 @@ public abstract class BaseDataBlock implements DataTable {
* @param fixedSizeDataBytes byte[] for fix-sized columns.
* @param variableSizeDataBytes byte[] for variable length columns (arrays).
*/
- public BaseDataBlock(int numRows, DataSchema dataSchema, String[]
stringDictionary,
+ public BaseDataBlock(int numRows, @Nullable DataSchema dataSchema, String[]
stringDictionary,
byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
_numRows = numRows;
- _numColumns = dataSchema.size();
_dataSchema = dataSchema;
+ _numColumns = dataSchema == null ? 0 : dataSchema.size();
_stringDictionary = stringDictionary;
_fixedSizeDataBytes = fixedSizeDataBytes;
_fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
index 1a26365210..d1d9185d11 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
@@ -21,11 +21,13 @@ package org.apache.pinot.core.common.datablock;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import javax.annotation.Nonnull;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
public final class DataBlockUtils {
@@ -34,29 +36,25 @@ public final class DataBlockUtils {
// do not instantiate.
}
- private static final DataSchema EMPTY_SCHEMA = new DataSchema(new String[0],
new DataSchema.ColumnDataType[0]);
- private static final MetadataBlock EOS_DATA_BLOCK = new
MetadataBlock(EMPTY_SCHEMA);
- static {
- EOS_DATA_BLOCK._metadata.put(DataTable.MetadataKey.TABLE.getName(),
"END_OF_STREAM");
- }
-
- public static MetadataBlock getEndOfStreamDataBlock() {
- return EOS_DATA_BLOCK;
- }
-
public static MetadataBlock getErrorDataBlock(Exception e) {
- MetadataBlock errorBlock = new MetadataBlock(EMPTY_SCHEMA);
- errorBlock._metadata.put(DataTable.MetadataKey.TABLE.getName(), "ERROR");
if (e instanceof ProcessingException) {
- errorBlock.addException(((ProcessingException) e).getErrorCode(),
e.getMessage());
+ return getErrorDataBlock(Collections.singletonMap(((ProcessingException)
e).getErrorCode(), e.getMessage()));
} else {
- errorBlock.addException(QueryException.UNKNOWN_ERROR_CODE,
e.getMessage());
+ return
getErrorDataBlock(Collections.singletonMap(QueryException.UNKNOWN_ERROR_CODE,
e.getMessage()));
+ }
+ }
+
+ public static MetadataBlock getErrorDataBlock(Map<Integer, String>
exceptions) {
+ MetadataBlock errorBlock = new MetadataBlock();
+ for (Map.Entry<Integer, String> exception : exceptions.entrySet()) {
+ errorBlock.addException(exception.getKey(), exception.getValue());
}
return errorBlock;
}
- public static MetadataBlock getEmptyDataBlock(DataSchema dataSchema) {
- return dataSchema == null ? EOS_DATA_BLOCK : new MetadataBlock(dataSchema);
+ public static MetadataBlock getEndOfStreamDataBlock(@Nonnull DataSchema
dataSchema) {
+ // TODO: add query statistics metadata for the block.
+ return new MetadataBlock(dataSchema);
}
public static BaseDataBlock getDataBlock(ByteBuffer byteBuffer)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
index b100425e5a..d469ec0287 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
@@ -29,6 +29,10 @@ import org.apache.pinot.common.utils.DataSchema;
public class MetadataBlock extends BaseDataBlock {
private static final int VERSION = 1;
+ public MetadataBlock() {
+ super(0, null, new String[0], new byte[]{0}, new byte[]{0});
+ }
+
public MetadataBlock(DataSchema dataSchema) {
super(0, dataSchema, new String[0], new byte[]{0}, new byte[]{0});
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index 327fa241aa..f673d2433d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -48,8 +48,7 @@ public class DataBlockTest {
BaseDataBlock dataBlock =
DataBlockUtils.getErrorDataBlock(originalException);
dataBlock.addException(processingException);
- Assert.assertEquals(dataBlock.getDataSchema().getColumnNames().length, 0);
- Assert.assertEquals(dataBlock.getDataSchema().getColumnDataTypes().length,
0);
+ Assert.assertNull(dataBlock.getDataSchema());
Assert.assertEquals(dataBlock.getNumberOfRows(), 0);
// Assert processing exception and original exception both matches.
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
index 9f4778743b..143eabf0f0 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
@@ -55,6 +55,16 @@ public class QueryEnvironmentTest extends
QueryEnvironmentTestBase {
}
}
+ @Test(dataProvider = "testQueryExceptionDataProvider")
+ public void testQueryWithException(String query, String exceptionSnippet) {
+ try {
+ _queryEnvironment.planQuery(query);
+ Assert.fail("query plan should throw exception");
+ } catch (RuntimeException e) {
+ Assert.assertTrue(e.getCause().getMessage().contains(exceptionSnippet));
+ }
+ }
+
@Test
public void testQueryAndAssertStageContentForJoin()
throws Exception {
@@ -100,4 +110,14 @@ public class QueryEnvironmentTest extends
QueryEnvironmentTestBase {
"SELECT *\n" + "FROM `a`\n" + "INNER JOIN `b` ON `a`.`col1` =
`b`.`col2`\n" + "WHERE `a`.`col3` >= 0"},
};
}
+
+ @DataProvider(name = "testQueryExceptionDataProvider")
+ private Object[][] provideQueriesWithException() {
+ return new Object[][] {
+ // wrong table is being used after JOIN
+ new Object[]{"SELECT b.col1 - a.col3 FROM a JOIN c ON a.col1 =
c.col3", "Table 'b' not found"},
+ // non-agg column not being grouped
+ new Object[]{"SELECT a.col1, SUM(a.col3) FROM a", "'a.col1' is not
being grouped"},
+ };
+ }
}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index a146f5ca3d..1c6ad7ff29 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -56,10 +56,6 @@ public class QueryEnvironmentTestBase {
+ " GROUP BY a.col1, a.col2"},
new Object[]{"SELECT a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 =
b.col2 "
+ " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY
a.col1"},
- new Object[]{"SELECT a.col1, a.col3, b.col3 FROM a JOIN b ON
MOD(a.col3, 2) = MOD(b.col3, 2)"},
- new Object[]{"SELECT a.col1, a.col3, i.maxVal FROM a JOIN "
- + " (SELECT b.col1 AS joinKey, MAX(b.col3) AS maxVal FROM b GROUP
BY b.col1) AS i "
- + " ON a.col1 = i.joinKey"},
};
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
new file mode 100644
index 0000000000..1662047263
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+public class ChannelUtils {
+
+ public static final String MAILBOX_METADATA_BUFFER_SIZE_KEY = "buffer.size";
+ public static final String MAILBOX_METADATA_END_OF_STREAM_KEY =
"end.of.stream";
+
+ private ChannelUtils() {
+ // do not instantiate.
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 8aeb560c8f..5b79549a27 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -91,9 +91,9 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
int remainingCapacity = _receivingBuffer.remainingCapacity() - 1;
Mailbox.MailboxStatus.Builder builder =
Mailbox.MailboxStatus.newBuilder().setMailboxId(mailboxContent.getMailboxId())
- .putMetadata("buffer.size", String.valueOf(remainingCapacity));
- if (mailboxContent.getMetadataMap().get("finished") != null) {
- builder.putMetadata("finished", "true");
+ .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY,
String.valueOf(remainingCapacity));
+ if
(mailboxContent.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY)
!= null) {
+ builder.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY,
"true");
}
Mailbox.MailboxStatus status = builder.build();
// returns the buffer available size to sender for rate controller /
throttling.
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
index 45233952e2..e482583bd3 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
@@ -63,8 +63,9 @@ public class MailboxStatusStreamObserver implements
StreamObserver<Mailbox.Mailb
// when received a mailbox status from the receiving end, sending end
update the known buffer size available
// so we can make better throughput send judgement. here is a simple
example.
// TODO: this feedback info is not used to throttle the send speed. it is
currently being discarded.
- if (mailboxStatus.getMetadataMap().containsKey("buffer.size")) {
-
_bufferSize.set(Integer.parseInt(mailboxStatus.getMetadataMap().get("buffer.size")));
+ if
(mailboxStatus.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY))
{
+ _bufferSize.set(Integer.parseInt(
+
mailboxStatus.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY)));
} else {
_bufferSize.set(DEFAULT_MAILBOX_QUEUE_CAPACITY); // DEFAULT_AVAILABILITY;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index cf56c5b786..2f7f23f264 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -18,17 +18,21 @@
*/
package org.apache.pinot.query.runtime;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
import org.apache.pinot.core.common.datablock.DataBlockUtils;
+import org.apache.pinot.core.common.datablock.MetadataBlock;
import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.transport.ServerInstance;
@@ -36,6 +40,7 @@ import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -101,25 +106,84 @@ public class QueryRunner {
BaseDataBlock dataBlock;
try {
DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest,
executorService, null);
- // this works because default DataTableImplV3 will have a version
number at beginning,
- // which maps to ROW type of version 3.
- dataBlock =
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
- } catch (IOException e) {
- throw new RuntimeException("Unable to convert byte buffer", e);
+ if (!dataTable.getExceptions().isEmpty()) {
+ // if contains exception, directly return a metadata block with the
exceptions.
+ dataBlock =
DataBlockUtils.getErrorDataBlock(dataTable.getExceptions());
+ } else {
+ // this works because default DataTableImplV3 will have a version
number at beginning:
+ // the new DataBlock encodes lower 16 bits as version and upper 16
bits as type (ROW, COLUMNAR, METADATA)
+ dataBlock =
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
+ }
+ } catch (Exception e) {
+ dataBlock = DataBlockUtils.getErrorDataBlock(e);
}
MailboxSendNode sendNode = (MailboxSendNode)
distributedStagePlan.getStageRoot();
StageMetadata receivingStageMetadata =
distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
MailboxSendOperator mailboxSendOperator =
- new MailboxSendOperator(_mailboxService, dataBlock,
receivingStageMetadata.getServerInstances(),
- sendNode.getExchangeType(), sendNode.getPartitionKeySelector(),
_hostname, _port,
- serverQueryRequest.getRequestId(), sendNode.getStageId());
+ new MailboxSendOperator(_mailboxService, sendNode.getDataSchema(),
+ new LeafStageTransferableBlockOperator(dataBlock,
sendNode.getDataSchema()),
+ receivingStageMetadata.getServerInstances(),
sendNode.getExchangeType(),
+ sendNode.getPartitionKeySelector(), _hostname, _port,
serverQueryRequest.getRequestId(),
+ sendNode.getStageId());
mailboxSendOperator.nextBlock();
+ if (dataBlock.getExceptions().isEmpty()) {
+ mailboxSendOperator.nextBlock();
+ }
} else {
_workerExecutor.processQuery(distributedStagePlan, requestMetadataMap,
executorService);
}
}
+ /**
+ * Leaf-stage transfer block opreator is used to wrap around the leaf stage
process results. They are passed to the
+ * Pinot server to execute query thus only one {@link DataTable} were
returned. However, to conform with the
+ * intermediate stage operators. an additional {@link MetadataBlock} needs
to be transfer after the data block.
+ *
+ * <p>In order to achieve this:
+ * <ul>
+ * <li>The leaf-stage result is split into data payload block and metadata
payload block.</li>
+ * <li>In case the leaf-stage result contains error or only metadata, we
skip the data payload block.</li>
+ * </ul>
+ */
+ private static class LeafStageTransferableBlockOperator extends
BaseOperator<TransferableBlock> {
+ private static final String EXPLAIN_NAME = "LEAF_STAGE_TRANSFER_OPERATOR";
+
+ private final MetadataBlock _endOfStreamBlock;
+ private final BaseDataBlock _baseDataBlock;
+ private final DataSchema _dataSchema;
+ private boolean _hasTransferred;
+
+ private LeafStageTransferableBlockOperator(BaseDataBlock baseDataBlock,
DataSchema dataSchema) {
+ _baseDataBlock = baseDataBlock;
+ _dataSchema = dataSchema;
+ _endOfStreamBlock = baseDataBlock.getExceptions().isEmpty()
+ ? DataBlockUtils.getEndOfStreamDataBlock(dataSchema) : null;
+ _hasTransferred = false;
+ }
+
+ @Override
+ public List<Operator> getChildOperators() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ protected TransferableBlock getNextBlock() {
+ if (!_hasTransferred) {
+ _hasTransferred = true;
+ return new TransferableBlock(_baseDataBlock);
+ } else {
+ return new TransferableBlock(_endOfStreamBlock);
+ }
+ }
+ }
+
private boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
int stageId = distributedStagePlan.getStageId();
ServerInstance serverInstance = distributedStagePlan.getServerInstance();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 79470fed8c..775a5de6ef 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.runtime.blocks;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.List;
import org.apache.pinot.common.utils.DataSchema;
@@ -41,15 +42,22 @@ public class TransferableBlock implements Block {
private final BaseDataBlock.Type _type;
private final DataSchema _dataSchema;
+ private final boolean _isErrorBlock;
private BaseDataBlock _dataBlock;
-
private List<Object[]> _container;
public TransferableBlock(List<Object[]> container, DataSchema dataSchema,
BaseDataBlock.Type containerType) {
+ this(container, dataSchema, containerType, false);
+ }
+
+ @VisibleForTesting
+ TransferableBlock(List<Object[]> container, DataSchema dataSchema,
BaseDataBlock.Type containerType,
+ boolean isErrorBlock) {
_container = container;
_dataSchema = dataSchema;
_type = containerType;
+ _isErrorBlock = isErrorBlock;
}
public TransferableBlock(BaseDataBlock dataBlock) {
@@ -57,12 +65,20 @@ public class TransferableBlock implements Block {
_dataSchema = dataBlock.getDataSchema();
_type = dataBlock instanceof ColumnarDataBlock ?
BaseDataBlock.Type.COLUMNAR
: dataBlock instanceof RowDataBlock ? BaseDataBlock.Type.ROW :
BaseDataBlock.Type.METADATA;
+ _isErrorBlock = !_dataBlock.getExceptions().isEmpty();
}
public DataSchema getDataSchema() {
return _dataSchema;
}
+ /**
+ * Retrieve the extracted {@link TransferableBlock#_container} of the
transferable block.
+ * If not already constructed. It will use {@link DataBlockUtils} to extract
the row/columnar data from the
+ * binary-packed format.
+ *
+ * @return data container.
+ */
public List<Object[]> getContainer() {
if (_container == null) {
switch (_type) {
@@ -77,6 +93,13 @@ public class TransferableBlock implements Block {
return _container;
}
+ /**
+ * Retrieve the binary-packed version of the data block.
+ * If not already constructed. It will use {@link DataBlockBuilder} to
construct the binary-packed format from
+ * the {@link TransferableBlock#_container}.
+ *
+ * @return data block.
+ */
public BaseDataBlock getDataBlock() {
if (_dataBlock == null) {
try {
@@ -87,6 +110,8 @@ public class TransferableBlock implements Block {
case COLUMNAR:
_dataBlock = DataBlockBuilder.buildFromColumns(_container, null,
_dataSchema);
break;
+ case METADATA:
+ throw new UnsupportedOperationException("Metadata block cannot be
constructed from container");
default:
throw new UnsupportedOperationException("Unable to build from
container with type: " + _type);
}
@@ -97,10 +122,38 @@ public class TransferableBlock implements Block {
return _dataBlock;
}
+ /**
+ * Return the type of block (one of ROW, COLUMNAR, or METADATA).
+ *
+ * @return return type of block
+ */
public BaseDataBlock.Type getType() {
return _type;
}
+ /**
+ * Return whether a transferable block is at the end of a stream.
+ *
+ * <p>End of stream is different from data block with 0-rows. which can
indicate that one partition of the execution
+ * returns no rows. but that doesn't mean the rest of the partitions are
also finished.
+ * <p>When an exception is caught within a stream, no matter how many
outstanding data is pending to be received,
+ * it is considered end of stream because the exception should bubble up
immediately.
+ *
+ * @return whether this block is the end of stream.
+ */
+ public boolean isEndOfStreamBlock() {
+ return _type == BaseDataBlock.Type.METADATA;
+ }
+
+ /**
+ * Return whether a transferable block contains exception.
+ *
+ * @return true if contains exception.
+ */
+ public boolean isErrorBlock() {
+ return _isErrorBlock;
+ }
+
public byte[] toBytes()
throws IOException {
return _dataBlock.toBytes();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
index f2992c41df..4900ec193f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
@@ -18,9 +18,8 @@
*/
package org.apache.pinot.query.runtime.blocks;
+import java.util.Map;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datablock.BaseDataBlock;
import org.apache.pinot.core.common.datablock.DataBlockUtils;
@@ -28,24 +27,20 @@ public final class TransferableBlockUtils {
private TransferableBlockUtils() {
// do not instantiate.
}
- private static final TransferableBlock EOS_TRANSFERABLE_BLOCK =
- new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
- public static TransferableBlock getEndOfStreamTransferableBlock() {
- return EOS_TRANSFERABLE_BLOCK;
+ public static TransferableBlock getEndOfStreamTransferableBlock(DataSchema
dataSchema) {
+ return new
TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(dataSchema));
}
public static TransferableBlock getErrorTransferableBlock(Exception e) {
return new TransferableBlock(DataBlockUtils.getErrorDataBlock(e));
}
- public static TransferableBlock getEmptyTransferableBlock(DataSchema
dataSchema) {
- return new TransferableBlock(DataBlockUtils.getEmptyDataBlock(dataSchema));
+ public static TransferableBlock getErrorTransferableBlock(Map<Integer,
String> exceptions) {
+ return new TransferableBlock(DataBlockUtils.getErrorDataBlock(exceptions));
}
public static boolean isEndOfStream(TransferableBlock transferableBlock) {
- return transferableBlock.getType().equals(BaseDataBlock.Type.METADATA)
- &&
"END_OF_STREAM".equals(transferableBlock.getDataBlock().getMetadata()
- .get(DataTable.MetadataKey.TABLE.getName()));
+ return transferableBlock.isEndOfStreamBlock();
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index afe8617830..9c978f0ad9 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -100,38 +100,39 @@ public class WorkerQueryExecutor {
}
// TODO: split this PhysicalPlanner into a separate module
+ // TODO: optimize this into a framework. (physical planner)
private BaseOperator<TransferableBlock> getOperator(long requestId,
StageNode stageNode,
Map<Integer, StageMetadata> metadataMap) {
- // TODO: optimize this into a framework. (physical planner)
if (stageNode instanceof MailboxReceiveNode) {
MailboxReceiveNode receiveNode = (MailboxReceiveNode) stageNode;
List<ServerInstance> sendingInstances =
metadataMap.get(receiveNode.getSenderStageId()).getServerInstances();
- return new MailboxReceiveOperator(_mailboxService,
RelDistribution.Type.ANY, sendingInstances, _hostName, _port,
- requestId, receiveNode.getSenderStageId());
+ return new MailboxReceiveOperator(_mailboxService,
receiveNode.getDataSchema(), RelDistribution.Type.ANY,
+ sendingInstances, _hostName, _port, requestId,
receiveNode.getSenderStageId());
} else if (stageNode instanceof MailboxSendNode) {
MailboxSendNode sendNode = (MailboxSendNode) stageNode;
BaseOperator<TransferableBlock> nextOperator = getOperator(requestId,
sendNode.getInputs().get(0), metadataMap);
StageMetadata receivingStageMetadata =
metadataMap.get(sendNode.getReceiverStageId());
- return new MailboxSendOperator(_mailboxService, nextOperator,
receivingStageMetadata.getServerInstances(),
- sendNode.getExchangeType(), sendNode.getPartitionKeySelector(),
_hostName, _port, requestId,
- sendNode.getStageId());
+ return new MailboxSendOperator(_mailboxService,
sendNode.getDataSchema(), nextOperator,
+ receivingStageMetadata.getServerInstances(),
sendNode.getExchangeType(), sendNode.getPartitionKeySelector(),
+ _hostName, _port, requestId, sendNode.getStageId());
} else if (stageNode instanceof JoinNode) {
JoinNode joinNode = (JoinNode) stageNode;
BaseOperator<TransferableBlock> leftOperator = getOperator(requestId,
joinNode.getInputs().get(0), metadataMap);
BaseOperator<TransferableBlock> rightOperator = getOperator(requestId,
joinNode.getInputs().get(1), metadataMap);
- return new HashJoinOperator(leftOperator, rightOperator,
joinNode.getCriteria());
+ return new HashJoinOperator(leftOperator,
joinNode.getInputs().get(0).getDataSchema(), rightOperator,
+ joinNode.getInputs().get(1).getDataSchema(),
joinNode.getDataSchema(), joinNode.getCriteria());
} else if (stageNode instanceof AggregateNode) {
AggregateNode aggregateNode = (AggregateNode) stageNode;
BaseOperator<TransferableBlock> inputOperator =
getOperator(requestId, aggregateNode.getInputs().get(0),
metadataMap);
- return new AggregateOperator(inputOperator, aggregateNode.getAggCalls(),
aggregateNode.getGroupSet(),
- aggregateNode.getInputs().get(0).getDataSchema());
+ return new AggregateOperator(inputOperator,
aggregateNode.getDataSchema(), aggregateNode.getAggCalls(),
+ aggregateNode.getGroupSet(),
aggregateNode.getInputs().get(0).getDataSchema());
} else if (stageNode instanceof FilterNode) {
throw new UnsupportedOperationException("Unsupported!");
} else if (stageNode instanceof ProjectNode) {
ProjectNode projectNode = (ProjectNode) stageNode;
return new TransformOperator(getOperator(requestId,
projectNode.getInputs().get(0), metadataMap),
- projectNode.getProjects(),
projectNode.getInputs().get(0).getDataSchema());
+ projectNode.getDataSchema(), projectNode.getProjects(),
projectNode.getInputs().get(0).getDataSchema());
} else {
throw new UnsupportedOperationException(
String.format("Stage node type %s is not supported!",
stageNode.getClass().getSimpleName()));
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 1949cee595..200336ed77 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -29,8 +29,6 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
-import org.apache.pinot.core.common.datablock.DataBlockBuilder;
-import org.apache.pinot.core.common.datablock.DataBlockUtils;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -61,15 +59,17 @@ public class AggregateOperator extends
BaseOperator<TransferableBlock> {
private final Map<Integer, Object[]> _groupByKeyHolder;
private DataSchema _upstreamDataSchema;
+ private TransferableBlock _upstreamErrorBlock;
private boolean _isCumulativeBlockConstructed;
// TODO: refactor Pinot Reducer code to support the intermediate stage agg
operator.
- public AggregateOperator(BaseOperator<TransferableBlock> inputOperator,
List<RexExpression> aggCalls,
- List<RexExpression> groupSet, DataSchema upstreamDataSchema) {
+ public AggregateOperator(BaseOperator<TransferableBlock> inputOperator,
DataSchema dataSchema,
+ List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema
upstreamDataSchema) {
_inputOperator = inputOperator;
_aggCalls = aggCalls;
_groupSet = groupSet;
_upstreamDataSchema = upstreamDataSchema;
+ _upstreamErrorBlock = null;
_aggregationFunctions = new AggregationFunction[_aggCalls.size()];
_aggregationFunctionInputRefs = new int[_aggCalls.size()];
@@ -80,20 +80,7 @@ public class AggregateOperator extends
BaseOperator<TransferableBlock> {
_aggregationFunctions[i] = toAggregationFunction(aggCalls.get(i),
_aggregationFunctionInputRefs[i]);
_groupByResultHolders[i] = new HashMap<Integer, Object>();
}
-
- String[] columnNames = new String[_groupSet.size() + _aggCalls.size()];
- DataSchema.ColumnDataType[] columnDataTypes = new
DataSchema.ColumnDataType[_groupSet.size() + _aggCalls.size()];
- for (int i = 0; i < _groupSet.size(); i++) {
- int idx = ((RexExpression.InputRef) groupSet.get(i)).getIndex();
- columnNames[i] = _upstreamDataSchema.getColumnName(idx);
- columnDataTypes[i] = _upstreamDataSchema.getColumnDataType(idx);
- }
- for (int i = 0; i < _aggCalls.size(); i++) {
- int idx = i + _groupSet.size();
- columnNames[idx] = _aggregationFunctions[i].getColumnName();
- columnDataTypes[idx] =
_aggregationFunctions[i].getFinalResultColumnType();
- }
- _resultSchema = new DataSchema(columnNames, columnDataTypes);
+ _resultSchema = dataSchema;
_isCumulativeBlockConstructed = false;
}
@@ -119,15 +106,18 @@ public class AggregateOperator extends
BaseOperator<TransferableBlock> {
@Override
protected TransferableBlock getNextBlock() {
try {
- cumulateAggregationBlocks();
- return new TransferableBlock(toResultBlock());
+ consumeInputBlocks();
+ return produceAggregatedBlock();
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
}
}
- private BaseDataBlock toResultBlock()
+ private TransferableBlock produceAggregatedBlock()
throws IOException {
+ if (_upstreamErrorBlock != null) {
+ return _upstreamErrorBlock;
+ }
if (!_isCumulativeBlockConstructed) {
List<Object[]> rows = new ArrayList<>(_groupByKeyHolder.size());
for (Map.Entry<Integer, Object[]> e : _groupByKeyHolder.entrySet()) {
@@ -143,36 +133,42 @@ public class AggregateOperator extends
BaseOperator<TransferableBlock> {
}
_isCumulativeBlockConstructed = true;
if (rows.size() == 0) {
- return DataBlockUtils.getEmptyDataBlock(_resultSchema);
+ return
TransferableBlockUtils.getEndOfStreamTransferableBlock(_resultSchema);
} else {
- return DataBlockBuilder.buildFromRows(rows, null, _resultSchema);
+ return new TransferableBlock(rows, _resultSchema,
BaseDataBlock.Type.ROW);
}
} else {
- return DataBlockUtils.getEndOfStreamDataBlock();
+ return
TransferableBlockUtils.getEndOfStreamTransferableBlock(_resultSchema);
}
}
- private void cumulateAggregationBlocks() {
- TransferableBlock block = _inputOperator.nextBlock();
- while (!TransferableBlockUtils.isEndOfStream(block)) {
- BaseDataBlock dataBlock = block.getDataBlock();
- int numRows = dataBlock.getNumberOfRows();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object[] row =
SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
- Key key = extraRowKey(row, _groupSet);
- int keyHashCode = key.hashCode();
- _groupByKeyHolder.put(keyHashCode, key.getValues());
- for (int i = 0; i < _aggregationFunctions.length; i++) {
- Object currentRes = _groupByResultHolders[i].get(keyHashCode);
- if (currentRes == null) {
- _groupByResultHolders[i].put(keyHashCode,
row[_aggregationFunctionInputRefs[i]]);
- } else {
- _groupByResultHolders[i].put(keyHashCode,
- merge(_aggCalls.get(i), currentRes,
row[_aggregationFunctionInputRefs[i]]));
+ private void consumeInputBlocks() {
+ if (!_isCumulativeBlockConstructed) {
+ TransferableBlock block = _inputOperator.nextBlock();
+ while (!TransferableBlockUtils.isEndOfStream(block)) {
+ BaseDataBlock dataBlock = block.getDataBlock();
+ int numRows = dataBlock.getNumberOfRows();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object[] row =
SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+ Key key = extraRowKey(row, _groupSet);
+ int keyHashCode = key.hashCode();
+ _groupByKeyHolder.put(keyHashCode, key.getValues());
+ for (int i = 0; i < _aggregationFunctions.length; i++) {
+ Object currentRes = _groupByResultHolders[i].get(keyHashCode);
+ if (currentRes == null) {
+ _groupByResultHolders[i].put(keyHashCode,
row[_aggregationFunctionInputRefs[i]]);
+ } else {
+ _groupByResultHolders[i].put(keyHashCode,
+ merge(_aggCalls.get(i), currentRes,
row[_aggregationFunctionInputRefs[i]]));
+ }
}
}
+ block = _inputOperator.nextBlock();
+ }
+ // setting upstream error block
+ if (block.isErrorBlock()) {
+ _upstreamErrorBlock = block;
}
- block = _inputOperator.nextBlock();
}
}
@@ -212,7 +208,7 @@ public class AggregateOperator extends
BaseOperator<TransferableBlock> {
return ((Number) left).doubleValue() + ((Number) right).doubleValue();
case "COUNT":
case "$COUNT":
- return (int) left + (int) right;
+ return ((Number) left).longValue() + ((Number) right).longValue();
case "MIN":
case "$MIN":
case "$MIN0":
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 5c5ac00c0f..ed9743e88a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.stage.JoinNode;
@@ -47,22 +48,29 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
private final HashMap<Integer, List<Object[]>> _broadcastHashTable;
private final BaseOperator<TransferableBlock> _leftTableOperator;
private final BaseOperator<TransferableBlock> _rightTableOperator;
-
- private DataSchema _leftTableSchema;
- private DataSchema _rightTableSchema;
- private int _resultRowSize;
+ private final DataSchema _resultSchema;
+ private final DataSchema _leftTableSchema;
+ private final DataSchema _rightTableSchema;
+ private final int _resultRowSize;
private boolean _isHashTableBuilt;
+ private TransferableBlock _upstreamErrorBlock;
private KeySelector<Object[], Object[]> _leftKeySelector;
private KeySelector<Object[], Object[]> _rightKeySelector;
- public HashJoinOperator(BaseOperator<TransferableBlock> leftTableOperator,
- BaseOperator<TransferableBlock> rightTableOperator,
List<JoinNode.JoinClause> criteria) {
+ public HashJoinOperator(BaseOperator<TransferableBlock> leftTableOperator,
DataSchema leftSchema,
+ BaseOperator<TransferableBlock> rightTableOperator, DataSchema
rightSchema, DataSchema outputSchema,
+ List<JoinNode.JoinClause> criteria) {
_leftKeySelector = criteria.get(0).getLeftJoinKeySelector();
_rightKeySelector = criteria.get(0).getRightJoinKeySelector();
_leftTableOperator = leftTableOperator;
_rightTableOperator = rightTableOperator;
+ _resultSchema = outputSchema;
+ _leftTableSchema = leftSchema;
+ _rightTableSchema = rightSchema;
+ _resultRowSize = _resultSchema.size();
_isHashTableBuilt = false;
_broadcastHashTable = new HashMap<>();
+ _upstreamErrorBlock = null;
}
@Override
@@ -79,7 +87,12 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
@Override
protected TransferableBlock getNextBlock() {
+ // Build JOIN hash table
buildBroadcastHashTable();
+ if (_upstreamErrorBlock != null) {
+ return _upstreamErrorBlock;
+ }
+ // JOIN each left block with the right block.
try {
return buildJoinedDataBlock(_leftTableOperator.nextBlock());
} catch (Exception e) {
@@ -91,7 +104,6 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
if (!_isHashTableBuilt) {
TransferableBlock rightBlock = _rightTableOperator.nextBlock();
while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
- _rightTableSchema = rightBlock.getDataSchema();
List<Object[]> container = rightBlock.getContainer();
// put all the rows into corresponding hash collections keyed by the
key selector function.
for (Object[] row : container) {
@@ -101,27 +113,32 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
}
rightBlock = _rightTableOperator.nextBlock();
}
+ if (rightBlock.isErrorBlock()) {
+ _upstreamErrorBlock = rightBlock;
+ }
_isHashTableBuilt = true;
}
}
private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock)
throws Exception {
- if (TransferableBlockUtils.isEndOfStream(leftBlock)) {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
- }
- List<Object[]> rows = new ArrayList<>();
- _leftTableSchema = leftBlock.getDataSchema();
- _resultRowSize = _leftTableSchema.size() + _rightTableSchema.size();
- List<Object[]> container = leftBlock.getContainer();
- for (Object[] leftRow : container) {
- List<Object[]> hashCollection =
-
_broadcastHashTable.getOrDefault(_leftKeySelector.computeHash(leftRow),
Collections.emptyList());
- for (Object[] rightRow : hashCollection) {
- rows.add(joinRow(leftRow, rightRow));
+ if (!TransferableBlockUtils.isEndOfStream(leftBlock)) {
+ List<Object[]> rows = new ArrayList<>();
+ List<Object[]> container = leftBlock.getContainer();
+ for (Object[] leftRow : container) {
+ List<Object[]> hashCollection = _broadcastHashTable.getOrDefault(
+ _leftKeySelector.computeHash(leftRow), Collections.emptyList());
+ for (Object[] rightRow : hashCollection) {
+ rows.add(joinRow(leftRow, rightRow));
+ }
}
+ return new TransferableBlock(rows, computeSchema(),
BaseDataBlock.Type.ROW);
+ } else if (leftBlock.isErrorBlock()) {
+ _upstreamErrorBlock = leftBlock;
+ return _upstreamErrorBlock;
+ } else {
+ return new
TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(_resultSchema));
}
- return new TransferableBlock(rows, computeSchema(),
BaseDataBlock.Type.ROW);
}
private Object[] joinRow(Object[] leftRow, Object[] rightRow) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 0b8d88cd00..014cf49bed 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -27,6 +27,7 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
import org.apache.pinot.core.common.datablock.DataBlockUtils;
+import org.apache.pinot.core.common.datablock.MetadataBlock;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
@@ -50,14 +51,17 @@ public class MailboxReceiveOperator extends
BaseOperator<TransferableBlock> {
private final MailboxService<Mailbox.MailboxContent> _mailboxService;
private final RelDistribution.Type _exchangeType;
private final List<ServerInstance> _sendingStageInstances;
+ private final DataSchema _dataSchema;
private final String _hostName;
private final int _port;
private final long _jobId;
private final int _stageId;
+ private TransferableBlock _upstreamErrorBlock;
- public MailboxReceiveOperator(MailboxService<Mailbox.MailboxContent>
mailboxService,
+ public MailboxReceiveOperator(MailboxService<Mailbox.MailboxContent>
mailboxService, DataSchema dataSchema,
RelDistribution.Type exchangeType, List<ServerInstance>
sendingStageInstances, String hostName, int port,
long jobId, int stageId) {
+ _dataSchema = dataSchema;
_mailboxService = mailboxService;
_exchangeType = exchangeType;
_sendingStageInstances = sendingStageInstances;
@@ -65,6 +69,7 @@ public class MailboxReceiveOperator extends
BaseOperator<TransferableBlock> {
_port = port;
_jobId = jobId;
_stageId = stageId;
+ _upstreamErrorBlock = null;
}
@Override
@@ -81,9 +86,11 @@ public class MailboxReceiveOperator extends
BaseOperator<TransferableBlock> {
@Override
protected TransferableBlock getNextBlock() {
+ if (_upstreamErrorBlock != null) {
+ return _upstreamErrorBlock;
+ }
// TODO: do a round robin check against all MailboxContentStreamObservers
and find which one that has data.
boolean hasOpenedMailbox = true;
- DataSchema dataSchema = null;
long timeoutWatermark = System.nanoTime() + DEFAULT_TIMEOUT_NANO;
while (hasOpenedMailbox && System.nanoTime() < timeoutWatermark) {
hasOpenedMailbox = false;
@@ -100,6 +107,10 @@ public class MailboxReceiveOperator extends
BaseOperator<TransferableBlock> {
ByteBuffer byteBuffer =
mailboxContent.getPayload().asReadOnlyByteBuffer();
if (byteBuffer.hasRemaining()) {
BaseDataBlock dataBlock =
DataBlockUtils.getDataBlock(byteBuffer);
+ if (dataBlock instanceof MetadataBlock &&
!dataBlock.getExceptions().isEmpty()) {
+ _upstreamErrorBlock =
TransferableBlockUtils.getErrorTransferableBlock(dataBlock.getExceptions());
+ return _upstreamErrorBlock;
+ }
if (dataBlock.getNumberOfRows() > 0) {
// here we only return data table block when it is not empty.
return new TransferableBlock(dataBlock);
@@ -117,7 +128,7 @@ public class MailboxReceiveOperator extends
BaseOperator<TransferableBlock> {
}
// TODO: we need to at least return one data table with schema if there's
no error.
// we need to condition this on whether there's already things being
returned or not.
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
}
public RelDistribution.Type getExchangeType() {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 632c617d4e..bcd49f0b9a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -24,21 +24,23 @@ import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.proto.Mailbox;
-import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
import org.apache.pinot.core.common.datablock.DataBlockBuilder;
-import org.apache.pinot.core.common.datablock.DataBlockUtils;
+import org.apache.pinot.core.common.datablock.MetadataBlock;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.mailbox.channel.ChannelUtils;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -55,6 +57,7 @@ public class MailboxSendOperator extends
BaseOperator<TransferableBlock> {
private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE =
ImmutableSet.of(RelDistribution.Type.SINGLETON,
RelDistribution.Type.RANDOM_DISTRIBUTED,
RelDistribution.Type.BROADCAST_DISTRIBUTED,
RelDistribution.Type.HASH_DISTRIBUTED);
+ private static final Random RANDOM = new Random();
private final List<ServerInstance> _receivingStageInstances;
private final RelDistribution.Type _exchangeType;
@@ -64,13 +67,14 @@ public class MailboxSendOperator extends
BaseOperator<TransferableBlock> {
private final long _jobId;
private final int _stageId;
private final MailboxService<Mailbox.MailboxContent> _mailboxService;
+ private final DataSchema _dataSchema;
private BaseOperator<TransferableBlock> _dataTableBlockBaseOperator;
- private BaseDataBlock _dataTable;
- public MailboxSendOperator(MailboxService<Mailbox.MailboxContent>
mailboxService,
+ public MailboxSendOperator(MailboxService<Mailbox.MailboxContent>
mailboxService, DataSchema dataSchema,
BaseOperator<TransferableBlock> dataTableBlockBaseOperator,
List<ServerInstance> receivingStageInstances,
RelDistribution.Type exchangeType, KeySelector<Object[], Object[]>
keySelector, String hostName, int port,
long jobId, int stageId) {
+ _dataSchema = dataSchema;
_mailboxService = mailboxService;
_dataTableBlockBaseOperator = dataTableBlockBaseOperator;
_receivingStageInstances = receivingStageInstances;
@@ -84,25 +88,6 @@ public class MailboxSendOperator extends
BaseOperator<TransferableBlock> {
String.format("Exchange type '%s' is not supported yet",
_exchangeType));
}
- /**
- * This is a temporary interface for connecting with server API.
remove/merge with InstanceResponseOperator once
- * we create a {@link
org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl} that can handle
the
- * creation of MailboxSendOperator we should not use this API.
- */
- public MailboxSendOperator(MailboxService<Mailbox.MailboxContent>
mailboxService, BaseDataBlock dataTable,
- List<ServerInstance> receivingStageInstances, RelDistribution.Type
exchangeType,
- KeySelector<Object[], Object[]> keySelector, String hostName, int port,
long jobId, int stageId) {
- _mailboxService = mailboxService;
- _dataTable = dataTable;
- _receivingStageInstances = receivingStageInstances;
- _exchangeType = exchangeType;
- _keySelector = keySelector;
- _serverHostName = hostName;
- _serverPort = port;
- _jobId = jobId;
- _stageId = stageId;
- }
-
@Override
public List<Operator> getChildOperators() {
// WorkerExecutor doesn't use getChildOperators, returns null here.
@@ -117,43 +102,40 @@ public class MailboxSendOperator extends
BaseOperator<TransferableBlock> {
@Override
protected TransferableBlock getNextBlock() {
- BaseDataBlock dataTable;
+ BaseDataBlock dataBlock;
TransferableBlock transferableBlock = null;
boolean isEndOfStream;
- if (_dataTableBlockBaseOperator != null) {
- transferableBlock = _dataTableBlockBaseOperator.nextBlock();
- dataTable = transferableBlock.getDataBlock();
- isEndOfStream = TransferableBlockUtils.isEndOfStream(transferableBlock);
- } else {
- dataTable = _dataTable;
- isEndOfStream = true;
- }
+ transferableBlock = _dataTableBlockBaseOperator.nextBlock();
+ dataBlock = transferableBlock.getDataBlock();
+ isEndOfStream = TransferableBlockUtils.isEndOfStream(transferableBlock);
try {
switch (_exchangeType) {
- // TODO: random and singleton distribution should've been selected in
planning phase.
case SINGLETON:
+ // TODO: singleton or random distribution should've been
distinguished in planning phase.
case RANDOM_DISTRIBUTED:
- // TODO: make random distributed actually random, this impl only
sends data to the first instances.
- for (ServerInstance serverInstance : _receivingStageInstances) {
- sendDataTableBlock(serverInstance, dataTable, isEndOfStream);
- // we no longer need to send data to the rest of the receiving
instances, but we still need to transfer
- // the dataTable over indicating that we are a potential sender.
thus next time a random server is selected
- // it might still be useful.
- dataTable =
DataBlockUtils.getEmptyDataBlock(dataTable.getDataSchema());
+ if (isEndOfStream) {
+ for (ServerInstance serverInstance : _receivingStageInstances) {
+ sendDataTableBlock(serverInstance, dataBlock);
+ }
+ } else {
+ int randomInstanceIdx = _exchangeType ==
RelDistribution.Type.SINGLETON ? 0
+ : RANDOM.nextInt(_receivingStageInstances.size());
+ ServerInstance randomInstance =
_receivingStageInstances.get(randomInstanceIdx);
+ sendDataTableBlock(randomInstance, dataBlock);
}
break;
case BROADCAST_DISTRIBUTED:
for (ServerInstance serverInstance : _receivingStageInstances) {
- sendDataTableBlock(serverInstance, dataTable, isEndOfStream);
+ sendDataTableBlock(serverInstance, dataBlock);
}
break;
case HASH_DISTRIBUTED:
// TODO: ensure that server instance list is sorted using same
function in sender.
- List<BaseDataBlock> dataTableList =
constructPartitionedDataBlock(dataTable, _keySelector,
- _receivingStageInstances.size());
+ List<BaseDataBlock> dataTableList =
constructPartitionedDataBlock(dataBlock, _keySelector,
+ _receivingStageInstances.size(), isEndOfStream);
for (int i = 0; i < _receivingStageInstances.size(); i++) {
- sendDataTableBlock(_receivingStageInstances.get(i),
dataTableList.get(i), isEndOfStream);
+ sendDataTableBlock(_receivingStageInstances.get(i),
dataTableList.get(i));
}
break;
case RANGE_DISTRIBUTED:
@@ -168,43 +150,51 @@ public class MailboxSendOperator extends
BaseOperator<TransferableBlock> {
return transferableBlock;
}
- private static List<BaseDataBlock> constructPartitionedDataBlock(DataTable
dataTable,
- KeySelector<Object[], Object[]> keySelector, int partitionSize)
+ private static List<BaseDataBlock>
constructPartitionedDataBlock(BaseDataBlock dataBlock,
+ KeySelector<Object[], Object[]> keySelector, int partitionSize, boolean
isEndOfStream)
throws Exception {
- List<List<Object[]>> temporaryRows = new ArrayList<>(partitionSize);
- for (int i = 0; i < partitionSize; i++) {
- temporaryRows.add(new ArrayList<>());
- }
- for (int rowId = 0; rowId < dataTable.getNumberOfRows(); rowId++) {
- Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataTable,
rowId);
- int partitionId = keySelector.computeHash(row) % partitionSize;
- temporaryRows.get(partitionId).add(row);
- }
- List<BaseDataBlock> dataTableList = new ArrayList<>(partitionSize);
- for (int i = 0; i < partitionSize; i++) {
- List<Object[]> objects = temporaryRows.get(i);
- dataTableList.add(DataBlockBuilder.buildFromRows(objects, null,
dataTable.getDataSchema()));
+ if (isEndOfStream) {
+ List<BaseDataBlock> dataTableList = new ArrayList<>(partitionSize);
+ for (int i = 0; i < partitionSize; i++) {
+ dataTableList.add(dataBlock);
+ }
+ return dataTableList;
+ } else {
+ List<List<Object[]>> temporaryRows = new ArrayList<>(partitionSize);
+ for (int i = 0; i < partitionSize; i++) {
+ temporaryRows.add(new ArrayList<>());
+ }
+ for (int rowId = 0; rowId < dataBlock.getNumberOfRows(); rowId++) {
+ Object[] row =
SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+ int partitionId = keySelector.computeHash(row) % partitionSize;
+ temporaryRows.get(partitionId).add(row);
+ }
+ List<BaseDataBlock> dataTableList = new ArrayList<>(partitionSize);
+ for (int i = 0; i < partitionSize; i++) {
+ List<Object[]> objects = temporaryRows.get(i);
+ dataTableList.add(DataBlockBuilder.buildFromRows(objects, null,
dataBlock.getDataSchema()));
+ }
+ return dataTableList;
}
- return dataTableList;
}
- private void sendDataTableBlock(ServerInstance serverInstance, BaseDataBlock
dataTable, boolean isEndOfStream)
+ private void sendDataTableBlock(ServerInstance serverInstance, BaseDataBlock
dataBlock)
throws IOException {
String mailboxId = toMailboxId(serverInstance);
SendingMailbox<Mailbox.MailboxContent> sendingMailbox =
_mailboxService.getSendingMailbox(mailboxId);
- Mailbox.MailboxContent mailboxContent = toMailboxContent(mailboxId,
dataTable, isEndOfStream);
+ Mailbox.MailboxContent mailboxContent = toMailboxContent(mailboxId,
dataBlock);
sendingMailbox.send(mailboxContent);
- if (mailboxContent.getMetadataMap().containsKey("finished")) {
+ if
(mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY))
{
sendingMailbox.complete();
}
}
- private Mailbox.MailboxContent toMailboxContent(String mailboxId,
BaseDataBlock dataTable, boolean isEndOfStream)
+ private Mailbox.MailboxContent toMailboxContent(String mailboxId,
BaseDataBlock dataBlock)
throws IOException {
Mailbox.MailboxContent.Builder builder =
Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
- .setPayload(ByteString.copyFrom(new
TransferableBlock(dataTable).toBytes()));
- if (isEndOfStream) {
- builder.putMetadata("finished", "true");
+ .setPayload(ByteString.copyFrom(new
TransferableBlock(dataBlock).toBytes()));
+ if (dataBlock instanceof MetadataBlock) {
+ builder.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY,
"true");
}
return builder.build();
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index 6744e35d99..7c2369e6c0 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.function.FunctionUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -44,22 +45,17 @@ public class TransformOperator extends
BaseOperator<TransferableBlock> {
private final List<TransformOperands> _transformOperandsList;
private final int _resultColumnSize;
private final DataSchema _resultSchema;
+ private TransferableBlock _upstreamErrorBlock;
- public TransformOperator(BaseOperator<TransferableBlock> upstreamOperator,
List<RexExpression> transforms,
- DataSchema upstreamDataSchema) {
+ public TransformOperator(BaseOperator<TransferableBlock> upstreamOperator,
DataSchema dataSchema,
+ List<RexExpression> transforms, DataSchema upstreamDataSchema) {
_upstreamOperator = upstreamOperator;
_resultColumnSize = transforms.size();
_transformOperandsList = new ArrayList<>(_resultColumnSize);
for (RexExpression rexExpression : transforms) {
_transformOperandsList.add(TransformOperands.toFunctionOperands(rexExpression,
upstreamDataSchema));
}
- String[] columnNames = new String[_resultColumnSize];
- DataSchema.ColumnDataType[] columnDataTypes = new
DataSchema.ColumnDataType[_resultColumnSize];
- for (int i = 0; i < _resultColumnSize; i++) {
- columnNames[i] = _transformOperandsList.get(i).getResultName();
- columnDataTypes[i] = _transformOperandsList.get(i).getResultType();
- }
- _resultSchema = new DataSchema(columnNames, columnDataTypes);
+ _resultSchema = dataSchema;
}
@Override
@@ -85,19 +81,26 @@ public class TransformOperator extends
BaseOperator<TransferableBlock> {
private TransferableBlock transform(TransferableBlock block)
throws Exception {
- if (TransferableBlockUtils.isEndOfStream(block)) {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ if (_upstreamErrorBlock != null) {
+ return _upstreamErrorBlock;
}
- List<Object[]> resultRows = new ArrayList<>();
- List<Object[]> container = block.getContainer();
- for (Object[] row : container) {
- Object[] resultRow = new Object[_resultColumnSize];
- for (int i = 0; i < _resultColumnSize; i++) {
- resultRow[i] = _transformOperandsList.get(i).apply(row);
+ if (!TransferableBlockUtils.isEndOfStream(block)) {
+ List<Object[]> resultRows = new ArrayList<>();
+ List<Object[]> container = block.getContainer();
+ for (Object[] row : container) {
+ Object[] resultRow = new Object[_resultColumnSize];
+ for (int i = 0; i < _resultColumnSize; i++) {
+ resultRow[i] = _transformOperandsList.get(i).apply(row);
+ }
+ resultRows.add(resultRow);
}
- resultRows.add(resultRow);
+ return new TransferableBlock(resultRows, _resultSchema,
BaseDataBlock.Type.ROW);
+ } else if (block.isErrorBlock()) {
+ _upstreamErrorBlock = block;
+ return _upstreamErrorBlock;
+ } else {
+ return new
TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(_resultSchema));
}
- return new TransferableBlock(resultRows, _resultSchema,
BaseDataBlock.Type.ROW);
}
private static abstract class TransformOperands {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index f0236eb7ff..70989caca0 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
import org.apache.pinot.core.transport.ServerInstance;
@@ -64,7 +65,7 @@ public class QueryDispatcher {
MailboxReceiveNode reduceNode = (MailboxReceiveNode)
queryPlan.getQueryStageMap().get(reduceStageId);
MailboxReceiveOperator mailboxReceiveOperator =
createReduceStageOperator(mailboxService,
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
- requestId, reduceNode.getSenderStageId(), mailboxService.getHostname(),
+ requestId, reduceNode.getSenderStageId(), reduceNode.getDataSchema(),
mailboxService.getHostname(),
mailboxService.getMailboxPort());
return reduceMailboxReceive(mailboxReceiveOperator);
}
@@ -118,6 +119,12 @@ public class QueryDispatcher {
while (true) {
transferableBlock = mailboxReceiveOperator.nextBlock();
if (TransferableBlockUtils.isEndOfStream(transferableBlock)) {
+ // TODO: we only received bubble up error from the execution stage
tree.
+ // TODO: query dispatch should also send cancel signal to the rest of
the execution stage tree.
+ if (transferableBlock.isErrorBlock()) {
+ throw new RuntimeException("Received error query execution result
block: "
+ + transferableBlock.getDataBlock().getExceptions());
+ }
break;
}
if (transferableBlock.getDataBlock() != null) {
@@ -129,10 +136,11 @@ public class QueryDispatcher {
}
public static MailboxReceiveOperator
createReduceStageOperator(MailboxService<Mailbox.MailboxContent> mailboxService,
- List<ServerInstance> sendingInstances, long jobId, int stageId, String
hostname, int port) {
+ List<ServerInstance> sendingInstances, long jobId, int stageId,
DataSchema dataSchema, String hostname,
+ int port) {
MailboxReceiveOperator mailboxReceiveOperator =
- new MailboxReceiveOperator(mailboxService, RelDistribution.Type.ANY,
sendingInstances, hostname, port, jobId,
- stageId);
+ new MailboxReceiveOperator(mailboxService, dataSchema,
RelDistribution.Type.ANY, sendingInstances, hostname,
+ port, jobId, stageId);
return mailboxReceiveOperator;
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index 07c69a05ac..5661d9cae7 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -24,7 +24,9 @@ import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Map;
import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.datablock.DataBlockUtils;
+import org.apache.pinot.query.mailbox.channel.ChannelUtils;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
@@ -66,8 +68,11 @@ public class GrpcMailboxServiceTest extends
GrpcMailboxServiceTestBase {
private Mailbox.MailboxContent getTestMailboxContent(String mailboxId)
throws IOException {
return Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
- .putAllMetadata(ImmutableMap.of("key", "value", "finished", "true"))
- .setPayload(ByteString.copyFrom(new
TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock()).toBytes()))
+ .putAllMetadata(ImmutableMap.of("key", "value",
ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true"))
+ .setPayload(ByteString.copyFrom(new
TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(new DataSchema(
+ new String[]{"foo", "bar"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.STRING}))
+ ).toBytes()))
.build();
}
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerExceptionTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerExceptionTest.java
new file mode 100644
index 0000000000..3b5a94d560
--- /dev/null
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerExceptionTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.service.QueryDispatcher;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class QueryRunnerExceptionTest extends QueryRunnerTestBase {
+
+ @Test(dataProvider = "testDataWithSqlExecutionExceptions")
+ public void testSqlWithFinalRowCountChecker(String sql, String exeptionMsg) {
+ QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
+ Map<String, String> requestMetadataMap =
+ ImmutableMap.of("REQUEST_ID",
String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()));
+ MailboxReceiveOperator mailboxReceiveOperator = null;
+ for (int stageId : queryPlan.getStageMetadataMap().keySet()) {
+ if (queryPlan.getQueryStageMap().get(stageId) instanceof
MailboxReceiveNode) {
+ MailboxReceiveNode reduceNode = (MailboxReceiveNode)
queryPlan.getQueryStageMap().get(stageId);
+ mailboxReceiveOperator =
QueryDispatcher.createReduceStageOperator(_mailboxService,
+
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
+ Long.parseLong(requestMetadataMap.get("REQUEST_ID")),
reduceNode.getSenderStageId(),
+ reduceNode.getDataSchema(), "localhost", _reducerGrpcPort);
+ } else {
+ for (ServerInstance serverInstance :
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
+ DistributedStagePlan distributedStagePlan =
+ QueryDispatcher.constructDistributedStagePlan(queryPlan,
stageId, serverInstance);
+ _servers.get(serverInstance).processQuery(distributedStagePlan,
requestMetadataMap);
+ }
+ }
+ }
+ Preconditions.checkNotNull(mailboxReceiveOperator);
+
+ try {
+ QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator);
+ } catch (RuntimeException rte) {
+ Assert.assertTrue(rte.getMessage().contains("Received error query
execution result block"));
+ Assert.assertTrue(rte.getMessage().contains(exeptionMsg));
+ }
+ }
+
+ @DataProvider(name = "testDataWithSqlExecutionExceptions")
+ private Object[][] provideTestSqlWithExecutionException() {
+ return new Object[][] {
+ // default planner will auto-cast string column to numeric on JOIN
condition, so exception is:
+ // "error while invoking cast function", because the cast cannot be
done.
+ new Object[]{"SELECT a.col2 - b.col3 FROM a JOIN b ON a.col1 =
b.col1", "transform function: cast"},
+ new Object[]{"SELECT a.col2, b.col1 FROM a JOIN b ON a.col1 = b.col3",
"transform function: cast"},
+ };
+ }
+}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 1142210de8..0fcddab91b 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -20,89 +20,21 @@ package org.apache.pinot.query.runtime;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.transport.ServerInstance;
-import org.apache.pinot.query.QueryEnvironment;
-import org.apache.pinot.query.QueryEnvironmentTestUtils;
-import org.apache.pinot.query.QueryServerEnclosure;
-import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.routing.WorkerInstance;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
-import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.query.service.QueryDispatcher;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import static
org.apache.pinot.core.query.selection.SelectionOperatorUtils.extractRowFromDataTable;
-
-
-public class QueryRunnerTest {
- private static final Random RANDOM_REQUEST_ID_GEN = new Random();
- private static final File INDEX_DIR_S1_A = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableA");
- private static final File INDEX_DIR_S1_B = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableB");
- private static final File INDEX_DIR_S2_A = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableA");
- private static final File INDEX_DIR_S1_C = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableC");
- private static final File INDEX_DIR_S2_C = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableC");
-
- private QueryEnvironment _queryEnvironment;
- private String _reducerHostname;
- private int _reducerGrpcPort;
- private Map<ServerInstance, QueryServerEnclosure> _servers = new HashMap<>();
- private GrpcMailboxService _mailboxService;
-
- @BeforeClass
- public void setUp()
- throws Exception {
- DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
- QueryServerEnclosure server1 = new
QueryServerEnclosure(Lists.newArrayList("a", "b", "c"),
- ImmutableMap.of("a", INDEX_DIR_S1_A, "b", INDEX_DIR_S1_B, "c",
INDEX_DIR_S1_C),
- QueryEnvironmentTestUtils.SERVER1_SEGMENTS);
- QueryServerEnclosure server2 = new
QueryServerEnclosure(Lists.newArrayList("a", "c"),
- ImmutableMap.of("a", INDEX_DIR_S2_A, "c", INDEX_DIR_S2_C),
QueryEnvironmentTestUtils.SERVER2_SEGMENTS);
-
- _reducerGrpcPort = QueryEnvironmentTestUtils.getAvailablePort();
- _reducerHostname = String.format("Broker_%s",
QueryConfig.DEFAULT_QUERY_RUNNER_HOSTNAME);
- Map<String, Object> reducerConfig = new HashMap<>();
- reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, _reducerGrpcPort);
- reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME,
_reducerHostname);
- _mailboxService = new GrpcMailboxService(_reducerHostname,
_reducerGrpcPort);
- _mailboxService.start();
-
- _queryEnvironment =
QueryEnvironmentTestUtils.getQueryEnvironment(_reducerGrpcPort,
server1.getPort(),
- server2.getPort());
- server1.start();
- server2.start();
- // this doesn't test the QueryServer functionality so the server port can
be the same as the mailbox port.
- // this is only use for test identifier purpose.
- int port1 = server1.getPort();
- int port2 = server2.getPort();
- _servers.put(new WorkerInstance("localhost", port1, port1, port1, port1),
server1);
- _servers.put(new WorkerInstance("localhost", port2, port2, port2, port2),
server2);
- }
- @AfterClass
- public void tearDown() {
- DataTableFactory.setDataTableVersion(DataTableFactory.DEFAULT_VERSION);
- for (QueryServerEnclosure server : _servers.values()) {
- server.shutDown();
- }
- _mailboxService.shutdown();
- }
+
+public class QueryRunnerTest extends QueryRunnerTestBase {
@Test(dataProvider = "testDataWithSqlToFinalRowCount")
public void testSqlWithFinalRowCountChecker(String sql, int
expectedRowCount) {
@@ -115,8 +47,8 @@ public class QueryRunnerTest {
MailboxReceiveNode reduceNode = (MailboxReceiveNode)
queryPlan.getQueryStageMap().get(stageId);
mailboxReceiveOperator =
QueryDispatcher.createReduceStageOperator(_mailboxService,
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
- Long.parseLong(requestMetadataMap.get("REQUEST_ID")),
reduceNode.getSenderStageId(), "localhost",
- _reducerGrpcPort);
+ Long.parseLong(requestMetadataMap.get("REQUEST_ID")),
reduceNode.getSenderStageId(),
+ reduceNode.getDataSchema(), "localhost", _reducerGrpcPort);
} else {
for (ServerInstance serverInstance :
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
DistributedStagePlan distributedStagePlan =
@@ -131,17 +63,6 @@ public class QueryRunnerTest {
Assert.assertEquals(resultRows.size(), expectedRowCount);
}
- private static List<Object[]> toRows(List<DataTable> dataTables) {
- List<Object[]> resultRows = new ArrayList<>();
- for (DataTable dataTable : dataTables) {
- int numRows = dataTable.getNumberOfRows();
- for (int rowId = 0; rowId < numRows; rowId++) {
- resultRows.add(extractRowFromDataTable(dataTable, rowId));
- }
- }
- return resultRows;
- }
-
@DataProvider(name = "testDataWithSqlToFinalRowCount")
private Object[][] provideTestSqlAndRowCount() {
return new Object[][] {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
new file mode 100644
index 0000000000..e32ee1db49
--- /dev/null
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.QueryEnvironment;
+import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.QueryServerEnclosure;
+import org.apache.pinot.query.mailbox.GrpcMailboxService;
+import org.apache.pinot.query.routing.WorkerInstance;
+import org.apache.pinot.query.service.QueryConfig;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+import static
org.apache.pinot.core.query.selection.SelectionOperatorUtils.extractRowFromDataTable;
+
+
+public class QueryRunnerTestBase {
+ private static final File INDEX_DIR_S1_A = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableA");
+ private static final File INDEX_DIR_S1_B = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableB");
+ private static final File INDEX_DIR_S2_A = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableA");
+ private static final File INDEX_DIR_S1_C = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableC");
+ private static final File INDEX_DIR_S2_C = new
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableC");
+
+ protected static final Random RANDOM_REQUEST_ID_GEN = new Random();
+
+ protected QueryEnvironment _queryEnvironment;
+ protected String _reducerHostname;
+ protected int _reducerGrpcPort;
+ protected Map<ServerInstance, QueryServerEnclosure> _servers = new
HashMap<>();
+ protected GrpcMailboxService _mailboxService;
+
+ protected static List<Object[]> toRows(List<DataTable> dataTables) {
+ List<Object[]> resultRows = new ArrayList<>();
+ for (DataTable dataTable : dataTables) {
+ int numRows = dataTable.getNumberOfRows();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ resultRows.add(extractRowFromDataTable(dataTable, rowId));
+ }
+ }
+ return resultRows;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
+ QueryServerEnclosure server1 = new
QueryServerEnclosure(Lists.newArrayList("a", "b", "c"),
+ ImmutableMap.of("a", INDEX_DIR_S1_A, "b", INDEX_DIR_S1_B, "c",
INDEX_DIR_S1_C),
+ QueryEnvironmentTestUtils.SERVER1_SEGMENTS);
+ QueryServerEnclosure server2 = new
QueryServerEnclosure(Lists.newArrayList("a", "c"),
+ ImmutableMap.of("a", INDEX_DIR_S2_A, "c", INDEX_DIR_S2_C),
QueryEnvironmentTestUtils.SERVER2_SEGMENTS);
+
+ _reducerGrpcPort = QueryEnvironmentTestUtils.getAvailablePort();
+ _reducerHostname = String.format("Broker_%s",
QueryConfig.DEFAULT_QUERY_RUNNER_HOSTNAME);
+ Map<String, Object> reducerConfig = new HashMap<>();
+ reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, _reducerGrpcPort);
+ reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME,
_reducerHostname);
+ _mailboxService = new GrpcMailboxService(_reducerHostname,
_reducerGrpcPort);
+ _mailboxService.start();
+
+ _queryEnvironment =
QueryEnvironmentTestUtils.getQueryEnvironment(_reducerGrpcPort,
server1.getPort(),
+ server2.getPort());
+ server1.start();
+ server2.start();
+ // this doesn't test the QueryServer functionality so the server port can
be the same as the mailbox port.
+ // this is only use for test identifier purpose.
+ int port1 = server1.getPort();
+ int port2 = server2.getPort();
+ _servers.put(new WorkerInstance("localhost", port1, port1, port1, port1),
server1);
+ _servers.put(new WorkerInstance("localhost", port2, port2, port2, port2),
server2);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ DataTableFactory.setDataTableVersion(DataTableFactory.DEFAULT_VERSION);
+ for (QueryServerEnclosure server : _servers.values()) {
+ server.shutDown();
+ }
+ _mailboxService.shutdown();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]