walterddr commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r997102381


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java:
##########
@@ -85,4 +104,27 @@ private boolean waitForInitialize()
   public String getMailboxId() {
     return _mailboxId;
   }
+
+  /**
+   * Converts the data sent by a {@link GrpcSendingMailbox} to a {@link 
TransferableBlock}.
+   *
+   * @param mailboxContent data sent by a GrpcSendingMailbox.
+   * @return null if the received MailboxContent didn't have any rows.
+   * @throws IOException if the MailboxContent cannot be converted to a 
TransferableBlock.

Review Comment:
   same as above plz change wording in javadoc



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryChannel.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+
+/**
+ * Used by {@link org.apache.pinot.query.mailbox.InMemoryMailboxService} for 
passing data between stages when sender
+ * and receiver are in the same process.
+ */
+public class InMemoryChannel<T> {

Review Comment:
   i think the part that confuses me is this `InMemoryChannel` 
   
   1. this is not the equivalent of the `ManagedChannel` in `ChannelManager`. 
it is actually a wrapper of the blocking queue. 
   2. the most resembling class should be the `MailboxContentStreamObserver`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryChannel.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+
+/**
+ * Used by {@link org.apache.pinot.query.mailbox.InMemoryMailboxService} for 
passing data between stages when sender
+ * and receiver are in the same process.
+ */
+public class InMemoryChannel<T> {
+  private final ArrayBlockingQueue<T> _channel;
+  private final String _hostname;
+  private final int _port;
+  private boolean _completed = false;
+
+  public InMemoryChannel(ArrayBlockingQueue<T> channel, String hostname, int 
port) {
+    _channel = channel;
+    _hostname = hostname;
+    _port = port;
+  }
+
+  public ArrayBlockingQueue<T> getChannel() {
+    return _channel;
+  }
+
+  public void complete() {
+    _completed = true;
+  }
+
+  public boolean isCompleted() {
+    return _completed;
+  }

Review Comment:
   ignored this comment. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryChannel.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+
+/**
+ * Used by {@link org.apache.pinot.query.mailbox.InMemoryMailboxService} for 
passing data between stages when sender
+ * and receiver are in the same process.
+ */
+public class InMemoryChannel<T> {
+  private final ArrayBlockingQueue<T> _channel;
+  private final String _hostname;
+  private final int _port;
+  private boolean _completed = false;
+
+  public InMemoryChannel(ArrayBlockingQueue<T> channel, String hostname, int 
port) {
+    _channel = channel;
+    _hostname = hostname;
+    _port = port;
+  }
+
+  public ArrayBlockingQueue<T> getChannel() {
+    return _channel;
+  }

Review Comment:
   so in GRPC mailbox the mailbox itself doesn't keep the info regarding a 
complete signal. which make sense b/c the sender is on RPC. 
   - for example the last metadatablock might get lost during the transport 
   - GRPC helps to make sure the `onComplete()` method is called.
   
   for in-memory mailbox, one can simply check whether a metadatablock is 
received to indicate whether it is completed (as you shall never lose a block 
on the ArrayBlockingQueue)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java:
##########
@@ -64,6 +65,12 @@ public MailboxContentStreamObserver(GrpcMailboxService 
mailboxService,
     _isEnabledFeedback = isEnabledFeedback;
   }
 
+  /**
+   * This method may return null. It can happen if there's a large enough gap 
between the time the receiver received
+   * the last block and the StreamObserver was detected as complete. In that 
case, the MailboxReceiveOperator would
+   * try to call this method again but since there's no new content to be 
received, we'll exit the loop. Returning
+   * null here means that MailboxReceiveOperator won't consider this as an 
error.
+   */

Review Comment:
   1. yes this is the correct behavior.
   2. we don't rely on this b/c we rely on `onCompleted()`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.query.mailbox.channel.InMemoryChannel;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryMailboxService implements 
MailboxService<TransferableBlock> {
+  // channel manager
+  private final String _hostname;
+  private final int _mailboxPort;
+  static final int DEFAULT_CHANNEL_CAPACITY = 5;
+  // TODO: This should come from a config and should be consistent with the 
timeout for GrpcMailboxService
+  static final int DEFAULT_CHANNEL_TIMEOUT_SECONDS = 120;
+
+  // maintaining a list of registered mailboxes.
+  private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> 
_receivingMailboxMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, SendingMailbox<TransferableBlock>> 
_sendingMailboxMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, InMemoryChannel<TransferableBlock>> 
_channelMap = new ConcurrentHashMap<>();

Review Comment:
   let's just use an array blocking queue here. it is simpler.
   also see my other comment in the InMemoryChannel class



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java:
##########
@@ -50,15 +58,26 @@ public void init(MailboxContentStreamObserver 
streamObserver) {
     }
   }
 
+  /**
+   * Polls the underlying channel and converts the received data into a 
TransferableBlock. This may return null in the
+   * following cases:
+   *
+   * <p>
+   *  1. If the mailbox hasn't initialized yet. This means we haven't received 
any data yet.
+   *  2. If the received block from the sender didn't have any rows.

Review Comment:
   if the received block is a metadata block it will still return a metadata 
block with 0 rows right? shouldn't this be 
   
   ```suggestion
      *  2. If the received block from the sender is a data block but have 0 
rows.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to