Copilot commented on code in PR #17474: URL: https://github.com/apache/pinot/pull/17474#discussion_r2670418337
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesQueryDispatcher.java: ########## @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.service.dispatch.timeseries; + +import com.google.common.base.Preconditions; +import io.grpc.Deadline; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.proto.Worker; +import org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesBrokerPlanVisitor; +import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys; +import org.apache.pinot.tsdb.planner.TimeSeriesExchangeNode; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; + + +public class TimeSeriesQueryDispatcher { + private final PhysicalTimeSeriesBrokerPlanVisitor _planVisitor = new PhysicalTimeSeriesBrokerPlanVisitor(); + private final Map<String, TimeSeriesDispatchClient> _dispatchClientMap = new ConcurrentHashMap<>(); + + public void start() { + } + + public void shutdown() { + for (TimeSeriesDispatchClient dispatchClient : _dispatchClientMap.values()) { + dispatchClient.getChannel().shutdown(); + } + _dispatchClientMap.clear(); + } + Review Comment: The `submitAndGet` method should have a Javadoc comment explaining its purpose, parameters, return value, and any exceptions it may throw. This is especially important for public API methods in complex query execution paths. ```suggestion /** * Submits a time series dispatchable plan to the configured query servers and returns the broker-side result block. * <p> * The method computes an absolute deadline from the given timeout, dispatches the serialized server fragments to * each {@link TimeSeriesQueryServerInstance}, and then executes the broker fragment locally using a compiled * {@link BaseTimeSeriesOperator}. The method blocks until the broker operator produces the first * {@link TimeSeriesBlock} or until the timeout elapses. * * @param requestId unique identifier for this query request, propagated in the metadata to downstream servers * @param plan dispatchable time series plan containing the broker fragment and serialized server fragments * @param timeoutMs timeout in milliseconds for the entire query execution, used to derive the deadline * @param requestContext request context carrying query- and trace-related metadata * @return the {@link TimeSeriesBlock} produced by the broker operator for this query; may be empty if no data * matches the query * @throws IllegalStateException if the computed deadline is already expired before the query can be sent to servers */ ``` ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesQueryDispatcher.java: ########## @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.service.dispatch.timeseries; + +import com.google.common.base.Preconditions; +import io.grpc.Deadline; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.proto.Worker; +import org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesBrokerPlanVisitor; +import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys; +import org.apache.pinot.tsdb.planner.TimeSeriesExchangeNode; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; + + +public class TimeSeriesQueryDispatcher { + private final PhysicalTimeSeriesBrokerPlanVisitor _planVisitor = new PhysicalTimeSeriesBrokerPlanVisitor(); + private final Map<String, TimeSeriesDispatchClient> _dispatchClientMap = new ConcurrentHashMap<>(); + + public void start() { + } + + public void shutdown() { + for (TimeSeriesDispatchClient dispatchClient : _dispatchClientMap.values()) { + dispatchClient.getChannel().shutdown(); + } + _dispatchClientMap.clear(); + } + + public TimeSeriesBlock submitAndGet(long requestId, TimeSeriesDispatchablePlan plan, long timeoutMs, + RequestContext requestContext) { + long deadlineMs = System.currentTimeMillis() + timeoutMs; + BaseTimeSeriesPlanNode brokerFragment = plan.getBrokerFragment(); + Map<String, BlockingQueue<Object>> receiversByPlanId = new HashMap<>(); + populateConsumers(brokerFragment, receiversByPlanId); + TimeSeriesExecutionContext brokerExecutionContext = + new TimeSeriesExecutionContext(plan.getLanguage(), plan.getTimeBuckets(), deadlineMs, Map.of(), Map.of(), + receiversByPlanId); + BaseTimeSeriesOperator brokerOperator = + _planVisitor.compile(brokerFragment, brokerExecutionContext, plan.getNumInputServersForExchangePlanNode()); + for (TimeSeriesQueryServerInstance serverInstance : plan.getQueryServerInstances()) { + String serverId = serverInstance.getInstanceId(); + Deadline deadline = Deadline.after(deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + Preconditions.checkState(!deadline.isExpired(), "Deadline expired before query could be sent to servers"); + Worker.TimeSeriesQueryRequest request = Worker.TimeSeriesQueryRequest.newBuilder() + .addAllDispatchPlan(plan.getSerializedServerFragments()) + .putAllMetadata(initializeTimeSeriesMetadataMap(plan, deadlineMs, requestContext, serverId)) + .putMetadata(MetadataKeys.REQUEST_ID, Long.toString(requestId)) + .build(); + TimeSeriesDispatchObserver dispatchObserver = new TimeSeriesDispatchObserver(receiversByPlanId); + getOrCreateTimeSeriesDispatchClient(serverInstance).submit(request, deadline, dispatchObserver); + } + return brokerOperator.nextBlock(); + } + + private void populateConsumers(BaseTimeSeriesPlanNode planNode, Map<String, BlockingQueue<Object>> receiverMap) { + if (planNode instanceof TimeSeriesExchangeNode) { + receiverMap.put(planNode.getId(), new ArrayBlockingQueue<>(TimeSeriesDispatchObserver.MAX_QUEUE_CAPACITY)); + } + for (BaseTimeSeriesPlanNode childNode : planNode.getInputs()) { + populateConsumers(childNode, receiverMap); + } + } + + Map<String, String> initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan dispatchablePlan, long deadlineMs, + RequestContext requestContext, String instanceId) { + Map<String, String> result = new HashMap<>(); + TimeBuckets timeBuckets = dispatchablePlan.getTimeBuckets(); + result.put(MetadataKeys.TimeSeries.LANGUAGE, dispatchablePlan.getLanguage()); + result.put(MetadataKeys.TimeSeries.START_TIME_SECONDS, Long.toString(timeBuckets.getTimeBuckets()[0])); + result.put(MetadataKeys.TimeSeries.WINDOW_SECONDS, Long.toString(timeBuckets.getBucketSize().getSeconds())); + result.put(MetadataKeys.TimeSeries.NUM_ELEMENTS, Long.toString(timeBuckets.getTimeBuckets().length)); + result.put(MetadataKeys.TimeSeries.DEADLINE_MS, Long.toString(deadlineMs)); + Map<String, List<String>> leafIdToSegments = dispatchablePlan.getLeafIdToSegmentsByInstanceId().get(instanceId); + for (Map.Entry<String, List<String>> entry : leafIdToSegments.entrySet()) { + result.put(MetadataKeys.TimeSeries.encodeSegmentListKey(entry.getKey()), String.join(",", entry.getValue())); + } + result.put(MetadataKeys.REQUEST_ID, Long.toString(requestContext.getRequestId())); + result.put(MetadataKeys.BROKER_ID, requestContext.getBrokerId()); + return result; + } + Review Comment: The `getOrCreateTimeSeriesDispatchClient` method should have a Javadoc comment explaining its caching behavior and the key format used for the client map. ```suggestion /** * Returns a cached {@link TimeSeriesDispatchClient} for the given server instance, creating it if necessary. * <p> * Clients are cached in {@link #_dispatchClientMap} and keyed by a string of the form * {@code "<hostname>_<port>"} where {@code <hostname>} is obtained from * {@link TimeSeriesQueryServerInstance#getHostname()} and {@code <port>} from * {@link TimeSeriesQueryServerInstance#getQueryServicePort()}. This ensures that all time series queries * targeting the same host and port share the same underlying gRPC channel, avoiding repeated connection setup. * * @param queryServerInstance the query server instance for which a dispatch client is required * @return a cached or newly created {@link TimeSeriesDispatchClient} associated with the given instance */ ``` ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesQueryDispatcher.java: ########## @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.service.dispatch.timeseries; + +import com.google.common.base.Preconditions; +import io.grpc.Deadline; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.proto.Worker; +import org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesBrokerPlanVisitor; +import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys; +import org.apache.pinot.tsdb.planner.TimeSeriesExchangeNode; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; + + +public class TimeSeriesQueryDispatcher { + private final PhysicalTimeSeriesBrokerPlanVisitor _planVisitor = new PhysicalTimeSeriesBrokerPlanVisitor(); + private final Map<String, TimeSeriesDispatchClient> _dispatchClientMap = new ConcurrentHashMap<>(); + + public void start() { + } + + public void shutdown() { + for (TimeSeriesDispatchClient dispatchClient : _dispatchClientMap.values()) { + dispatchClient.getChannel().shutdown(); + } + _dispatchClientMap.clear(); + } + + public TimeSeriesBlock submitAndGet(long requestId, TimeSeriesDispatchablePlan plan, long timeoutMs, + RequestContext requestContext) { + long deadlineMs = System.currentTimeMillis() + timeoutMs; + BaseTimeSeriesPlanNode brokerFragment = plan.getBrokerFragment(); + Map<String, BlockingQueue<Object>> receiversByPlanId = new HashMap<>(); + populateConsumers(brokerFragment, receiversByPlanId); + TimeSeriesExecutionContext brokerExecutionContext = + new TimeSeriesExecutionContext(plan.getLanguage(), plan.getTimeBuckets(), deadlineMs, Map.of(), Map.of(), + receiversByPlanId); + BaseTimeSeriesOperator brokerOperator = + _planVisitor.compile(brokerFragment, brokerExecutionContext, plan.getNumInputServersForExchangePlanNode()); + for (TimeSeriesQueryServerInstance serverInstance : plan.getQueryServerInstances()) { + String serverId = serverInstance.getInstanceId(); + Deadline deadline = Deadline.after(deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + Preconditions.checkState(!deadline.isExpired(), "Deadline expired before query could be sent to servers"); + Worker.TimeSeriesQueryRequest request = Worker.TimeSeriesQueryRequest.newBuilder() + .addAllDispatchPlan(plan.getSerializedServerFragments()) + .putAllMetadata(initializeTimeSeriesMetadataMap(plan, deadlineMs, requestContext, serverId)) + .putMetadata(MetadataKeys.REQUEST_ID, Long.toString(requestId)) + .build(); + TimeSeriesDispatchObserver dispatchObserver = new TimeSeriesDispatchObserver(receiversByPlanId); + getOrCreateTimeSeriesDispatchClient(serverInstance).submit(request, deadline, dispatchObserver); + } + return brokerOperator.nextBlock(); + } + + private void populateConsumers(BaseTimeSeriesPlanNode planNode, Map<String, BlockingQueue<Object>> receiverMap) { + if (planNode instanceof TimeSeriesExchangeNode) { + receiverMap.put(planNode.getId(), new ArrayBlockingQueue<>(TimeSeriesDispatchObserver.MAX_QUEUE_CAPACITY)); + } + for (BaseTimeSeriesPlanNode childNode : planNode.getInputs()) { + populateConsumers(childNode, receiverMap); + } + } + Review Comment: The `initializeTimeSeriesMetadataMap` method should have a Javadoc comment documenting its parameters and return value, particularly explaining what metadata keys are being set and why. ```suggestion /** * Initializes the metadata map attached to a time series query request for a specific query server instance. * <p> * The returned map contains: * <ul> * <li>{@link MetadataKeys.TimeSeries#LANGUAGE}: the query language used by the {@code dispatchablePlan}.</li> * <li>{@link MetadataKeys.TimeSeries#START_TIME_SECONDS}: the start time (epoch seconds) of the first time bucket.</li> * <li>{@link MetadataKeys.TimeSeries#WINDOW_SECONDS}: the size of each time bucket in seconds.</li> * <li>{@link MetadataKeys.TimeSeries#NUM_ELEMENTS}: the number of time buckets in the query.</li> * <li>{@link MetadataKeys.TimeSeries#DEADLINE_MS}: the absolute deadline for the request in milliseconds since epoch.</li> * <li>Segment list keys produced via {@link MetadataKeys.TimeSeries#encodeSegmentListKey(String)} for each leaf * plan node, whose values are CSV-encoded segment lists assigned to that leaf on the given {@code instanceId}.</li> * <li>{@link MetadataKeys#REQUEST_ID}: the logical request identifier from {@code requestContext}.</li> * <li>{@link MetadataKeys#BROKER_ID}: the broker identifier from {@code requestContext}.</li> * </ul> * * @param dispatchablePlan the dispatchable time series plan, used to derive time bucket configuration and segment * assignment per leaf for the target {@code instanceId} * @param deadlineMs the absolute deadline for the request, in milliseconds since epoch, propagated to the server * @param requestContext the request context providing broker-level metadata such as request and broker identifiers * @param instanceId the identifier of the target query server instance whose segment assignments should be encoded * @return an immutable-style {@link Map} of string metadata keys to string values used to describe the time series * query to the server */ ``` ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesQueryDispatcher.java: ########## @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.service.dispatch.timeseries; + +import com.google.common.base.Preconditions; +import io.grpc.Deadline; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.proto.Worker; +import org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesBrokerPlanVisitor; +import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys; +import org.apache.pinot.tsdb.planner.TimeSeriesExchangeNode; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; + + +public class TimeSeriesQueryDispatcher { + private final PhysicalTimeSeriesBrokerPlanVisitor _planVisitor = new PhysicalTimeSeriesBrokerPlanVisitor(); + private final Map<String, TimeSeriesDispatchClient> _dispatchClientMap = new ConcurrentHashMap<>(); + + public void start() { Review Comment: The empty `start()` method should either be removed if it's not needed, or include a comment explaining why it's a no-op. If it's intended for future use, document that with a TODO or explanatory comment. ```suggestion public void start() { // No-op: TimeSeriesQueryDispatcher does not require explicit startup initialization. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
