walterddr commented on code in PR #10708:
URL: https://github.com/apache/pinot/pull/10708#discussion_r1184514389


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java:
##########
@@ -60,10 +60,17 @@ public QueryPlan constructDispatchablePlan(StageNode 
globalReceiverNode,
     dispatchablePlanContext.getDispatchablePlanStageRootMap().put(0, 
globalReceiverNode);
     // 3. add worker assignment after the dispatchable plan context is 
fulfilled after the visit.
     computeWorkerAssignment(globalReceiverNode, dispatchablePlanContext);
-    // 4. convert it into query plan.
+    // 4. compute the mailbox assignment for each stage.
+    computeMailboxAssignment(dispatchablePlanContext);
+    // 5. convert it into query plan.

Review Comment:
   Let's add a TODO here for the 2 methods. later we want to refactor these 
into pluggable algorithms. (e.g. worker assignment visitor and mailbox 
assignment visitor)
   
   but so far the entrypoint looks super clean and i love it



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java:
##########
@@ -122,7 +134,37 @@ private static Worker.StageMetadata 
toProtoStageMetadata(StageMetadata stageMeta
   private static Worker.WorkerMetadata toProtoWorkerMetadata(WorkerMetadata 
workerMetadata) {
     Worker.WorkerMetadata.Builder builder = Worker.WorkerMetadata.newBuilder();
     
builder.setVirtualAddress(addressToProto(workerMetadata.getVirtualServerAddress()));
+    
builder.putAllMailboxMetadata(toProtoMailboxMap(workerMetadata.getMailBoxInfosMap()));
     builder.putAllCustomProperty(workerMetadata.getCustomProperties());
     return builder.build();
   }
+
+  private static Map<Integer, Worker.MailboxMetadata> toProtoMailboxMap(
+      Map<Integer, List<MailboxMetadata>> mailBoxInfosMap) {
+    Map<Integer, Worker.MailboxMetadata> mailboxMetadataMap = new HashMap<>();
+    for (Map.Entry<Integer, List<MailboxMetadata>> entry : 
mailBoxInfosMap.entrySet()) {
+      mailboxMetadataMap.put(entry.getKey(), toProtoMailbox(entry.getValue()));
+    }
+    return mailboxMetadataMap;
+  }
+
+  private static Worker.MailboxMetadata toProtoMailbox(List<MailboxMetadata> 
mailboxMetadataList) {
+    Worker.MailboxMetadata.Builder builder = 
Worker.MailboxMetadata.newBuilder();
+    for (MailboxMetadata mailboxMetadata : mailboxMetadataList) {
+      builder.addMailboxId(mailboxMetadata.getMailBoxId());
+      
builder.addVirtualAddress(mailboxMetadata.getVirtualAddress().toString());
+      mailboxMetadata.getCustomProperties().entrySet().stream().forEach(
+          entry -> builder.putCustomProperty(
+              
toMailboxCustomPropertiesKeyPrefix(mailboxMetadata.getMailBoxId(), 
entry.getKey()), entry.getValue()));
+    }
+    return builder.build();
+  }
+
+  private static String toMailboxCustomPropertiesKeyPrefix(String mailBoxId, 
String key) {
+    return String.format("%s.%s", mailBoxId, key);
+  }
+
+  private static String fromMailboxCustomPropertiesKeyPrefix(String mailBoxId, 
String key) {
+    return key.split(mailBoxId + "\\.")[1];
+  }

Review Comment:
   for these mailbox specific customProperty utils we can put them in 
MailboxMetadata class IMO (to make it consistent with Stage and Worker level 
metadata class)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java:
##########
@@ -98,16 +93,33 @@ private static StageMetadata 
fromProtoStageMetadata(Worker.StageMetadata protoSt
   private static WorkerMetadata fromProtoWorkerMetadata(Worker.WorkerMetadata 
protoWorkerMetadata) {
     WorkerMetadata.Builder builder = new WorkerMetadata.Builder();
     
builder.setVirtualServerAddress(protoToAddress(protoWorkerMetadata.getVirtualAddress()));
+    
builder.putAllMailBoxInfosMap(protoToMailbox(protoWorkerMetadata.getMailboxMetadataMap()));
     builder.putAllCustomProperties(protoWorkerMetadata.getCustomPropertyMap());
     return builder.build();
   }
 
-  public static List<Worker.StageMetadata> 
stageMetadataListToProtoList(List<StageMetadata> stageMetadataList) {
-    List<Worker.StageMetadata> protoList = new ArrayList<>();
-    for (StageMetadata stageMetadata : stageMetadataList) {
-      protoList.add(toProtoStageMetadata(stageMetadata));
+  private static Map<Integer, List<MailboxMetadata>> protoToMailbox(

Review Comment:
   ```suggestion
     private static Map<Integer, List<MailboxMetadata>> fromProtoMailboxMap(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to