ankitsultana commented on code in PR #10791:
URL: https://github.com/apache/pinot/pull/10791#discussion_r1204677238
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java:
##########
@@ -51,12 +53,23 @@ public static DistributedStagePlan
deserialize(Worker.StagePlan stagePlan) {
return distributedStagePlan;
}
- public static Worker.StagePlan serialize(DistributedStagePlan
distributedStagePlan) {
+ public static List<DistributedStagePlan> deserialize(Worker.QueryRequest
request) {
+ List<DistributedStagePlan> distributedStagePlans = new ArrayList<>();
+ for (Worker.StagePlan stagePlan : request.getStagePlanList()) {
+ distributedStagePlans.add(deserialize(stagePlan));
+ }
+ return distributedStagePlans;
+ }
+
+ public static Worker.StagePlan serialize(DispatchableSubPlan
dispatchableSubPlan, int stageId,
+ VirtualServerAddress serverAddress) {
Review Comment:
Left a comment in the other part of the code where the serialize is being
called (num-workers * num-stages) times per query.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -139,22 +140,24 @@ int submit(long requestId, DispatchableSubPlan
dispatchableSubPlan, long timeout
for (Map.Entry<QueryServerInstance, List<Integer>> queryServerEntry
:
dispatchableSubPlan.getQueryStageList().get(stageId).getServerInstanceToWorkerIdMap().entrySet())
{
QueryServerInstance queryServerInstance = queryServerEntry.getKey();
+ Worker.QueryRequest.Builder queryRequestBuilder =
Worker.QueryRequest.newBuilder();
+ String host = queryServerInstance.getHostname();
+ int servicePort = queryServerInstance.getQueryServicePort();
+ int mailboxPort = queryServerInstance.getQueryMailboxPort();
for (int workerId : queryServerEntry.getValue()) {
- String host = queryServerInstance.getHostname();
- int servicePort = queryServerInstance.getQueryServicePort();
- int mailboxPort = queryServerInstance.getQueryMailboxPort();
VirtualServerAddress virtualServerAddress = new
VirtualServerAddress(host, mailboxPort, workerId);
- DispatchClient client = getOrCreateDispatchClient(host,
servicePort);
dispatchCalls++;
- int finalStageId = stageId;
- _executorService.submit(() ->
client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
- QueryPlanSerDeUtils.serialize(
- constructDistributedStagePlan(dispatchableSubPlan,
finalStageId, virtualServerAddress)))
- .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID,
String.valueOf(requestId))
- .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
String.valueOf(timeoutMs))
- .putAllMetadata(queryOptions).build(), finalStageId,
queryServerInstance, deadline,
- dispatchCallbacks::offer));
+ queryRequestBuilder.addStagePlan(
+ QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId,
virtualServerAddress));
Review Comment:
This would serialize the entire sub plan for each worker and stageId
combination. Big use-cases would usually have 256 partitions (i.e. workers) and
~10 or so stages.
So this means we'll call this ~2000 times. For low qps use-cases this should
be fine but relatively higher use-cases might start getting bottlenecked.
@walterddr : I remember we had discussed this a couple of months ago. Was
wondering if we are planning to fix this anytime soon. cc: @xiangfu0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]