This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 529432c7d7 [multistage] Add Callbacks for Complete Events (#10564)
529432c7d7 is described below

commit 529432c7d7b26cb9690611196210f187def473a3
Author: Ankit Sultana <[email protected]>
AuthorDate: Sat Apr 22 02:27:13 2023 +0530

    [multistage] Add Callbacks for Complete Events (#10564)
    
    * [multistage] Add Callbacks for Complete Events
    * Add callback for cancel
    * Add tests
---
 .../pinot/query/mailbox/InMemorySendingMailbox.java       |  2 ++
 .../mailbox/channel/MailboxContentStreamObserver.java     |  1 +
 .../pinot/query/mailbox/GrpcMailboxServiceTest.java       |  8 +++++++-
 .../pinot/query/mailbox/InMemoryMailboxServiceTest.java   | 15 +++++++++++++--
 4 files changed, 23 insertions(+), 3 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index 808011e2a2..18cdd5db36 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -52,6 +52,7 @@ public class InMemorySendingMailbox implements 
SendingMailbox<TransferableBlock>
   public void complete()
       throws Exception {
     _transferStream.complete();
+    _gotMailCallback.accept(_mailboxId);
   }
 
   @Override
@@ -64,6 +65,7 @@ public class InMemorySendingMailbox implements 
SendingMailbox<TransferableBlock>
     if (isInitialized() && !_transferStream.isCancelled()) {
       _transferStream.cancel();
     }
+    _gotMailCallback.accept(_mailboxId);
   }
 
   private void initialize() {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index c8375d4eed..622dfe5742 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -148,6 +148,7 @@ public class MailboxContentStreamObserver implements 
StreamObserver<Mailbox.Mail
   @Override
   public void onCompleted() {
     _isCompleted.set(true);
+    _gotMailCallback.accept(_mailboxId);
     _responseObserver.onCompleted();
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index 7d08439601..6f2802fa48 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -86,7 +86,11 @@ public class GrpcMailboxServiceTest {
     SendingMailbox<TransferableBlock> sendingMailbox = 
_mailboxService1.getSendingMailbox(mailboxId, deadlineMs);
     ReceivingMailbox<TransferableBlock> receivingMailbox = 
_mailboxService2.getReceivingMailbox(mailboxId);
     CountDownLatch gotData = new CountDownLatch(1);
-    _mail2GotData.set(ignored -> gotData.countDown());
+    CountDownLatch timesCallbackCalled = new CountDownLatch(2);
+    _mail2GotData.set(ignored -> {
+      gotData.countDown();
+      timesCallbackCalled.countDown();
+    });
 
     // When:
     TransferableBlock testBlock = getTestTransferableBlock();
@@ -98,6 +102,8 @@ public class GrpcMailboxServiceTest {
     Assert.assertEquals(receivedBlock.getDataBlock().toBytes(), 
testBlock.getDataBlock().toBytes());
     sendingMailbox.complete();
 
+    Assert.assertTrue(timesCallbackCalled.await(1, TimeUnit.SECONDS));
+
     TestUtils.waitForCondition(aVoid -> {
       return receivingMailbox.isClosed();
     }, 5000L, "Receiving mailbox is not closed properly!");
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
index 78edcec092..386716332c 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.routing.VirtualServerAddress;
@@ -49,7 +50,10 @@ public class InMemoryMailboxServiceTest {
   public void testHappyPath()
       throws Exception {
     long deadlineMs = System.currentTimeMillis() + 10_000;
-    InMemoryMailboxService mailboxService = new 
InMemoryMailboxService("localhost", 0, ignored -> { });
+    AtomicInteger timesCallbackCalled = new AtomicInteger(0);
+    InMemoryMailboxService mailboxService = new 
InMemoryMailboxService("localhost", 0, ignored -> {
+      timesCallbackCalled.incrementAndGet();
+    });
     InMemoryReceivingMailbox receivingMailbox = (InMemoryReceivingMailbox) 
mailboxService.getReceivingMailbox(
         MAILBOX_ID);
     InMemorySendingMailbox sendingMailbox =
@@ -60,6 +64,8 @@ public class InMemoryMailboxServiceTest {
       sendingMailbox.send(getTestTransferableBlock(i, i + 1 == NUM_ENTRIES));
     }
     sendingMailbox.complete();
+    // The callback should be called for each send and complete call
+    Assert.assertEquals(NUM_ENTRIES + 1, timesCallbackCalled.get());
 
     // Iterate 1 less time than the loop above
     for (int i = 0; i + 1 < NUM_ENTRIES; i++) {
@@ -141,7 +147,10 @@ public class InMemoryMailboxServiceTest {
   public void testInMemoryStreamCancellationBySender()
       throws Exception {
     long deadlineMs = System.currentTimeMillis() + 10_000;
-    InMemoryMailboxService mailboxService = new 
InMemoryMailboxService("localhost", 0, ignored -> { });
+    AtomicInteger timesCallbackCalled = new AtomicInteger(0);
+    InMemoryMailboxService mailboxService = new 
InMemoryMailboxService("localhost", 0, ignored -> {
+      timesCallbackCalled.incrementAndGet();
+    });
 
     SendingMailbox<TransferableBlock> sendingMailbox = 
mailboxService.getSendingMailbox(MAILBOX_ID, deadlineMs);
     ReceivingMailbox<TransferableBlock> receivingMailbox = 
mailboxService.getReceivingMailbox(MAILBOX_ID);
@@ -154,6 +163,8 @@ public class InMemoryMailboxServiceTest {
 
     sendingMailbox.cancel(new RuntimeException("foo"));
 
+    // If the sender cancels the stream, the receiver should get a callback
+    Assert.assertEquals(2, timesCallbackCalled.get());
     // After the stream is cancelled, receiver will get error-blocks
     receivedBlock = receivingMailbox.receive();
     Assert.assertNotNull(receivedBlock);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to