Copilot commented on code in PR #17474:
URL: https://github.com/apache/pinot/pull/17474#discussion_r2670418337


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesQueryDispatcher.java:
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service.dispatch.timeseries;
+
+import com.google.common.base.Preconditions;
+import io.grpc.Deadline;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.proto.Worker;
+import 
org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesBrokerPlanVisitor;
+import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys;
+import org.apache.pinot.tsdb.planner.TimeSeriesExchangeNode;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+public class TimeSeriesQueryDispatcher {
+  private final PhysicalTimeSeriesBrokerPlanVisitor _planVisitor = new 
PhysicalTimeSeriesBrokerPlanVisitor();
+  private final Map<String, TimeSeriesDispatchClient> _dispatchClientMap = new 
ConcurrentHashMap<>();
+
+  public void start() {
+  }
+
+  public void shutdown() {
+    for (TimeSeriesDispatchClient dispatchClient : 
_dispatchClientMap.values()) {
+      dispatchClient.getChannel().shutdown();
+    }
+    _dispatchClientMap.clear();
+  }
+

Review Comment:
   The `submitAndGet` method should have a Javadoc comment explaining its 
purpose, parameters, return value, and any exceptions it may throw. This is 
especially important for public API methods in complex query execution paths.
   ```suggestion
   
     /**
      * Submits a time series dispatchable plan to the configured query servers 
and returns the broker-side result block.
      * <p>
      * The method computes an absolute deadline from the given timeout, 
dispatches the serialized server fragments to
      * each {@link TimeSeriesQueryServerInstance}, and then executes the 
broker fragment locally using a compiled
      * {@link BaseTimeSeriesOperator}. The method blocks until the broker 
operator produces the first
      * {@link TimeSeriesBlock} or until the timeout elapses.
      *
      * @param requestId unique identifier for this query request, propagated 
in the metadata to downstream servers
      * @param plan dispatchable time series plan containing the broker 
fragment and serialized server fragments
      * @param timeoutMs timeout in milliseconds for the entire query 
execution, used to derive the deadline
      * @param requestContext request context carrying query- and trace-related 
metadata
      * @return the {@link TimeSeriesBlock} produced by the broker operator for 
this query; may be empty if no data
      *         matches the query
      * @throws IllegalStateException if the computed deadline is already 
expired before the query can be sent to servers
      */
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesQueryDispatcher.java:
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service.dispatch.timeseries;
+
+import com.google.common.base.Preconditions;
+import io.grpc.Deadline;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.proto.Worker;
+import 
org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesBrokerPlanVisitor;
+import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys;
+import org.apache.pinot.tsdb.planner.TimeSeriesExchangeNode;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+public class TimeSeriesQueryDispatcher {
+  private final PhysicalTimeSeriesBrokerPlanVisitor _planVisitor = new 
PhysicalTimeSeriesBrokerPlanVisitor();
+  private final Map<String, TimeSeriesDispatchClient> _dispatchClientMap = new 
ConcurrentHashMap<>();
+
+  public void start() {
+  }
+
+  public void shutdown() {
+    for (TimeSeriesDispatchClient dispatchClient : 
_dispatchClientMap.values()) {
+      dispatchClient.getChannel().shutdown();
+    }
+    _dispatchClientMap.clear();
+  }
+
+  public TimeSeriesBlock submitAndGet(long requestId, 
TimeSeriesDispatchablePlan plan, long timeoutMs,
+      RequestContext requestContext) {
+    long deadlineMs = System.currentTimeMillis() + timeoutMs;
+    BaseTimeSeriesPlanNode brokerFragment = plan.getBrokerFragment();
+    Map<String, BlockingQueue<Object>> receiversByPlanId = new HashMap<>();
+    populateConsumers(brokerFragment, receiversByPlanId);
+    TimeSeriesExecutionContext brokerExecutionContext =
+        new TimeSeriesExecutionContext(plan.getLanguage(), 
plan.getTimeBuckets(), deadlineMs, Map.of(), Map.of(),
+            receiversByPlanId);
+    BaseTimeSeriesOperator brokerOperator =
+        _planVisitor.compile(brokerFragment, brokerExecutionContext, 
plan.getNumInputServersForExchangePlanNode());
+    for (TimeSeriesQueryServerInstance serverInstance : 
plan.getQueryServerInstances()) {
+      String serverId = serverInstance.getInstanceId();
+      Deadline deadline = Deadline.after(deadlineMs - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+      Preconditions.checkState(!deadline.isExpired(), "Deadline expired before 
query could be sent to servers");
+      Worker.TimeSeriesQueryRequest request = 
Worker.TimeSeriesQueryRequest.newBuilder()
+          .addAllDispatchPlan(plan.getSerializedServerFragments())
+          .putAllMetadata(initializeTimeSeriesMetadataMap(plan, deadlineMs, 
requestContext, serverId))
+          .putMetadata(MetadataKeys.REQUEST_ID, Long.toString(requestId))
+          .build();
+      TimeSeriesDispatchObserver dispatchObserver = new 
TimeSeriesDispatchObserver(receiversByPlanId);
+      getOrCreateTimeSeriesDispatchClient(serverInstance).submit(request, 
deadline, dispatchObserver);
+    }
+    return brokerOperator.nextBlock();
+  }
+
+  private void populateConsumers(BaseTimeSeriesPlanNode planNode, Map<String, 
BlockingQueue<Object>> receiverMap) {
+    if (planNode instanceof TimeSeriesExchangeNode) {
+      receiverMap.put(planNode.getId(), new 
ArrayBlockingQueue<>(TimeSeriesDispatchObserver.MAX_QUEUE_CAPACITY));
+    }
+    for (BaseTimeSeriesPlanNode childNode : planNode.getInputs()) {
+      populateConsumers(childNode, receiverMap);
+    }
+  }
+
+  Map<String, String> 
initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan dispatchablePlan, 
long deadlineMs,
+      RequestContext requestContext, String instanceId) {
+    Map<String, String> result = new HashMap<>();
+    TimeBuckets timeBuckets = dispatchablePlan.getTimeBuckets();
+    result.put(MetadataKeys.TimeSeries.LANGUAGE, 
dispatchablePlan.getLanguage());
+    result.put(MetadataKeys.TimeSeries.START_TIME_SECONDS, 
Long.toString(timeBuckets.getTimeBuckets()[0]));
+    result.put(MetadataKeys.TimeSeries.WINDOW_SECONDS, 
Long.toString(timeBuckets.getBucketSize().getSeconds()));
+    result.put(MetadataKeys.TimeSeries.NUM_ELEMENTS, 
Long.toString(timeBuckets.getTimeBuckets().length));
+    result.put(MetadataKeys.TimeSeries.DEADLINE_MS, Long.toString(deadlineMs));
+    Map<String, List<String>> leafIdToSegments = 
dispatchablePlan.getLeafIdToSegmentsByInstanceId().get(instanceId);
+    for (Map.Entry<String, List<String>> entry : leafIdToSegments.entrySet()) {
+      result.put(MetadataKeys.TimeSeries.encodeSegmentListKey(entry.getKey()), 
String.join(",", entry.getValue()));
+    }
+    result.put(MetadataKeys.REQUEST_ID, 
Long.toString(requestContext.getRequestId()));
+    result.put(MetadataKeys.BROKER_ID, requestContext.getBrokerId());
+    return result;
+  }
+

Review Comment:
   The `getOrCreateTimeSeriesDispatchClient` method should have a Javadoc 
comment explaining its caching behavior and the key format used for the client 
map.
   ```suggestion
   
     /**
      * Returns a cached {@link TimeSeriesDispatchClient} for the given server 
instance, creating it if necessary.
      * <p>
      * Clients are cached in {@link #_dispatchClientMap} and keyed by a string 
of the form
      * {@code "<hostname>_<port>"} where {@code <hostname>} is obtained from
      * {@link TimeSeriesQueryServerInstance#getHostname()} and {@code <port>} 
from
      * {@link TimeSeriesQueryServerInstance#getQueryServicePort()}. This 
ensures that all time series queries
      * targeting the same host and port share the same underlying gRPC 
channel, avoiding repeated connection setup.
      *
      * @param queryServerInstance the query server instance for which a 
dispatch client is required
      * @return a cached or newly created {@link TimeSeriesDispatchClient} 
associated with the given instance
      */
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesQueryDispatcher.java:
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service.dispatch.timeseries;
+
+import com.google.common.base.Preconditions;
+import io.grpc.Deadline;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.proto.Worker;
+import 
org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesBrokerPlanVisitor;
+import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys;
+import org.apache.pinot.tsdb.planner.TimeSeriesExchangeNode;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+public class TimeSeriesQueryDispatcher {
+  private final PhysicalTimeSeriesBrokerPlanVisitor _planVisitor = new 
PhysicalTimeSeriesBrokerPlanVisitor();
+  private final Map<String, TimeSeriesDispatchClient> _dispatchClientMap = new 
ConcurrentHashMap<>();
+
+  public void start() {
+  }
+
+  public void shutdown() {
+    for (TimeSeriesDispatchClient dispatchClient : 
_dispatchClientMap.values()) {
+      dispatchClient.getChannel().shutdown();
+    }
+    _dispatchClientMap.clear();
+  }
+
+  public TimeSeriesBlock submitAndGet(long requestId, 
TimeSeriesDispatchablePlan plan, long timeoutMs,
+      RequestContext requestContext) {
+    long deadlineMs = System.currentTimeMillis() + timeoutMs;
+    BaseTimeSeriesPlanNode brokerFragment = plan.getBrokerFragment();
+    Map<String, BlockingQueue<Object>> receiversByPlanId = new HashMap<>();
+    populateConsumers(brokerFragment, receiversByPlanId);
+    TimeSeriesExecutionContext brokerExecutionContext =
+        new TimeSeriesExecutionContext(plan.getLanguage(), 
plan.getTimeBuckets(), deadlineMs, Map.of(), Map.of(),
+            receiversByPlanId);
+    BaseTimeSeriesOperator brokerOperator =
+        _planVisitor.compile(brokerFragment, brokerExecutionContext, 
plan.getNumInputServersForExchangePlanNode());
+    for (TimeSeriesQueryServerInstance serverInstance : 
plan.getQueryServerInstances()) {
+      String serverId = serverInstance.getInstanceId();
+      Deadline deadline = Deadline.after(deadlineMs - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+      Preconditions.checkState(!deadline.isExpired(), "Deadline expired before 
query could be sent to servers");
+      Worker.TimeSeriesQueryRequest request = 
Worker.TimeSeriesQueryRequest.newBuilder()
+          .addAllDispatchPlan(plan.getSerializedServerFragments())
+          .putAllMetadata(initializeTimeSeriesMetadataMap(plan, deadlineMs, 
requestContext, serverId))
+          .putMetadata(MetadataKeys.REQUEST_ID, Long.toString(requestId))
+          .build();
+      TimeSeriesDispatchObserver dispatchObserver = new 
TimeSeriesDispatchObserver(receiversByPlanId);
+      getOrCreateTimeSeriesDispatchClient(serverInstance).submit(request, 
deadline, dispatchObserver);
+    }
+    return brokerOperator.nextBlock();
+  }
+
+  private void populateConsumers(BaseTimeSeriesPlanNode planNode, Map<String, 
BlockingQueue<Object>> receiverMap) {
+    if (planNode instanceof TimeSeriesExchangeNode) {
+      receiverMap.put(planNode.getId(), new 
ArrayBlockingQueue<>(TimeSeriesDispatchObserver.MAX_QUEUE_CAPACITY));
+    }
+    for (BaseTimeSeriesPlanNode childNode : planNode.getInputs()) {
+      populateConsumers(childNode, receiverMap);
+    }
+  }
+

Review Comment:
   The `initializeTimeSeriesMetadataMap` method should have a Javadoc comment 
documenting its parameters and return value, particularly explaining what 
metadata keys are being set and why.
   ```suggestion
   
     /**
      * Initializes the metadata map attached to a time series query request 
for a specific query server instance.
      * <p>
      * The returned map contains:
      * <ul>
      *   <li>{@link MetadataKeys.TimeSeries#LANGUAGE}: the query language used 
by the {@code dispatchablePlan}.</li>
      *   <li>{@link MetadataKeys.TimeSeries#START_TIME_SECONDS}: the start 
time (epoch seconds) of the first time bucket.</li>
      *   <li>{@link MetadataKeys.TimeSeries#WINDOW_SECONDS}: the size of each 
time bucket in seconds.</li>
      *   <li>{@link MetadataKeys.TimeSeries#NUM_ELEMENTS}: the number of time 
buckets in the query.</li>
      *   <li>{@link MetadataKeys.TimeSeries#DEADLINE_MS}: the absolute 
deadline for the request in milliseconds since epoch.</li>
      *   <li>Segment list keys produced via {@link 
MetadataKeys.TimeSeries#encodeSegmentListKey(String)} for each leaf
      *       plan node, whose values are CSV-encoded segment lists assigned to 
that leaf on the given {@code instanceId}.</li>
      *   <li>{@link MetadataKeys#REQUEST_ID}: the logical request identifier 
from {@code requestContext}.</li>
      *   <li>{@link MetadataKeys#BROKER_ID}: the broker identifier from {@code 
requestContext}.</li>
      * </ul>
      *
      * @param dispatchablePlan the dispatchable time series plan, used to 
derive time bucket configuration and segment
      *                         assignment per leaf for the target {@code 
instanceId}
      * @param deadlineMs the absolute deadline for the request, in 
milliseconds since epoch, propagated to the server
      * @param requestContext the request context providing broker-level 
metadata such as request and broker identifiers
      * @param instanceId the identifier of the target query server instance 
whose segment assignments should be encoded
      * @return an immutable-style {@link Map} of string metadata keys to 
string values used to describe the time series
      *         query to the server
      */
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesQueryDispatcher.java:
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service.dispatch.timeseries;
+
+import com.google.common.base.Preconditions;
+import io.grpc.Deadline;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.proto.Worker;
+import 
org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesBrokerPlanVisitor;
+import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys;
+import org.apache.pinot.tsdb.planner.TimeSeriesExchangeNode;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+public class TimeSeriesQueryDispatcher {
+  private final PhysicalTimeSeriesBrokerPlanVisitor _planVisitor = new 
PhysicalTimeSeriesBrokerPlanVisitor();
+  private final Map<String, TimeSeriesDispatchClient> _dispatchClientMap = new 
ConcurrentHashMap<>();
+
+  public void start() {

Review Comment:
   The empty `start()` method should either be removed if it's not needed, or 
include a comment explaining why it's a no-op. If it's intended for future use, 
document that with a TODO or explanatory comment.
   ```suggestion
     public void start() {
       // No-op: TimeSeriesQueryDispatcher does not require explicit startup 
initialization.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to