gortiz commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1289877958
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -118,46 +118,52 @@ public String toExplainString() {
@Override
protected TransferableBlock getNextBlock() {
- boolean canContinue = true;
TransferableBlock transferableBlock;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("==[SEND]== Enter getNextBlock from: " + _context.getId());
+ }
try {
transferableBlock = _sourceOperator.nextBlock();
- if (transferableBlock.isNoOpBlock()) {
- return transferableBlock;
- } else if (transferableBlock.isEndOfStreamBlock()) {
- if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
- // Stats need to be populated here because the block is being sent
to the mailbox
- // and the receiving opChain will not be able to access the stats
from the previous opChain
- TransferableBlock eosBlockWithStats =
TransferableBlockUtils.getEndOfStreamTransferableBlock(
-
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
- sendTransferableBlock(eosBlockWithStats);
- } else {
- sendTransferableBlock(transferableBlock);
- }
- } else { // normal blocks
- // check whether we should continue depending on exchange queue
condition.
- canContinue = sendTransferableBlock(transferableBlock);
+ if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
+ // Stats need to be populated here because the block is being sent to
the mailbox
+ // and the receiving opChain will not be able to access the stats from
the previous opChain
+ TransferableBlock eosBlockWithStats =
TransferableBlockUtils.getEndOfStreamTransferableBlock(
+
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
+ sendTransferableBlock(eosBlockWithStats, false);
+ } else {
+ sendTransferableBlock(transferableBlock, true);
}
} catch (Exception e) {
transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
try {
LOGGER.error("Exception while transferring data on opChain: " +
_context.getId(), e);
- sendTransferableBlock(transferableBlock);
+ sendTransferableBlock(transferableBlock, false);
} catch (Exception e2) {
LOGGER.error("Exception while sending error block.", e2);
}
}
- // yield if we cannot continue to put transferable block into the sending
queue
- return canContinue ? transferableBlock :
TransferableBlockUtils.getNoOpTransferableBlock();
+ return transferableBlock;
}
- private boolean sendTransferableBlock(TransferableBlock block)
+ private void sendTransferableBlock(TransferableBlock block, boolean
throwIfTimeout)
Review Comment:
The idea here is that sometimes we don't want to do that. Specially when we
found an exception in the happy path. There we send the exception downstream as
an error block. In case that error blocks times out... what do we want to do?
If we decide to throw a timeout there we would hide the original exception.
IIRC @walterddr's original code is the one that introduce this _I don't want to
throw timeout_ parameter and I though it was a good idea. But we can discuss
about that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]