walterddr commented on code in PR #10425:
URL: https://github.com/apache/pinot/pull/10425#discussion_r1137817503
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -185,4 +187,13 @@ private void cancelOpChain(OpChain opChain, Throwable e) {
_scheduler.deregister(opChain);
}
}
+
+ private String getErrorMessage(TransferableBlock block) {
Review Comment:
let's put this in transferable block util. there's already something similar
in DataBlockUtils as well
##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java:
##########
@@ -294,6 +295,10 @@ public void testStreamCancellationBySender()
GrpcSendingMailbox grpcSendingMailbox =
(GrpcSendingMailbox) _mailboxService1.getSendingMailbox(mailboxId,
deadlineMs);
+ // Do cancellations immediately for this test
+ grpcSendingMailbox = Mockito.spy(grpcSendingMailbox);
+ Mockito.doReturn(0L).when(grpcSendingMailbox).getCancellationDelayMs();
Review Comment:
is there any test we can add for this cancellation?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -84,14 +93,15 @@ public boolean isInitialized() {
public void cancel(Throwable t) {
if (_initialized.get() && !_statusObserver.isFinished()) {
LOGGER.warn("GrpcSendingMailbox={} cancelling stream", _mailboxId);
- try {
- _mailboxContentStreamObserver.onError(Status.fromThrowable(
- new RuntimeException("Cancelled by the
sender")).asRuntimeException());
- } catch (Exception e) {
- // TODO: We don't necessarily need to log this since this is
relatively quite likely to happen. Logging this
- // anyways as info for now so we can see how frequently this happens.
- LOGGER.info("Unexpected error issuing onError to
MailboxContentStreamObserver: {}", e.getMessage());
- }
+ DELAYED_CANCEL_SCHEDULER.schedule(() -> {
+ try {
+
_mailboxContentStreamObserver.onError(Status.CANCELLED.asRuntimeException());
Review Comment:
shouldn't the onError call be propagate to the receiving side? and populate
the errorContent block? how come the receiving end is still error out on
timeout?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]