This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eb40796057 Move time-series constants into CommonConstants (#16673)
8eb40796057 is described below

commit 8eb40796057384216be663ba8b927e4151ba951d
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Sat Aug 23 19:11:56 2025 -0700

    Move time-series constants into CommonConstants (#16673)
---
 .../apache/pinot/query/runtime/QueryRunner.java    |  78 +++++----
 .../query/service/dispatch/QueryDispatcher.java    | 180 +++++++++------------
 .../timeseries/TimeSeriesDispatchObserver.java     |  15 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |  29 ++++
 .../tsdb/planner/TimeSeriesPlanConstants.java      |  58 -------
 5 files changed, 150 insertions(+), 210 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 4d21dd8ae41..0c67d30676c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.query.runtime;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
 import io.grpc.stub.StreamObserver;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -84,14 +83,15 @@ import org.apache.pinot.spi.executor.MetricsExecutor;
 import org.apache.pinot.spi.executor.ThrottleOnCriticalHeapUsageExecutor;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.query.QueryThreadExceedStrategy;
-import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
 import 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
 import 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.WindowOverFlowMode;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Request;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Response;
 import org.apache.pinot.spi.utils.CommonConstants.Server;
 import org.apache.pinot.sql.parsers.rewriter.RlsUtils;
-import 
org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerRequestMetadataKeys;
-import 
org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerResponseMetadataKeys;
 import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
 import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
@@ -152,12 +152,12 @@ public class QueryRunner {
    */
   public void init(PinotConfiguration serverConf, InstanceDataManager 
instanceDataManager,
       @Nullable TlsConfig tlsConfig, BooleanSupplier sendStats, 
ThreadResourceUsageAccountant resourceUsageAccountant) {
-    String hostname = 
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
-    if (hostname.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
-      hostname = 
hostname.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH);
+    String hostname = 
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
+    if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
+      hostname = hostname.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH);
     }
-    int port = 
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
-        CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT);
+    int port = 
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
+        MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT);
 
     // TODO: Consider using separate config for intermediate stage and leaf 
stage
     String numGroupsLimitStr = 
serverConf.getProperty(Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT);
@@ -184,18 +184,16 @@ public class QueryRunner {
     _mseMaxInitialResultHolderCapacity =
         mseMaxInitialGroupHolderCapacity != null ? 
Integer.parseInt(mseMaxInitialGroupHolderCapacity) : null;
 
-    String maxRowsInJoinStr = 
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_JOIN);
+    String maxRowsInJoinStr = 
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_JOIN);
     _maxRowsInJoin = maxRowsInJoinStr != null ? 
Integer.parseInt(maxRowsInJoinStr) : null;
 
-    String joinOverflowModeStr =
-        
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE);
+    String joinOverflowModeStr = 
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE);
     _joinOverflowMode = joinOverflowModeStr != null ? 
JoinOverFlowMode.valueOf(joinOverflowModeStr) : null;
 
-    String maxRowsInWindowStr = 
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_WINDOW);
+    String maxRowsInWindowStr = 
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_WINDOW);
     _maxRowsInWindow = maxRowsInWindowStr != null ? 
Integer.parseInt(maxRowsInWindowStr) : null;
 
-    String windowOverflowModeStr =
-        
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_WINDOW_OVERFLOW_MODE);
+    String windowOverflowModeStr = 
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_WINDOW_OVERFLOW_MODE);
     _windowOverflowMode = windowOverflowModeStr != null ? 
WindowOverFlowMode.valueOf(windowOverflowModeStr) : null;
 
     ExecutorService baseExecutorService =
@@ -211,17 +209,15 @@ public class QueryRunner {
 
     int hardLimit = 
HardLimitExecutor.getMultiStageExecutorHardLimit(serverConf);
     if (hardLimit > 0) {
-      String strategyStr = serverConf.getProperty(
-          
CommonConstants.Server.CONFIG_OF_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY,
-          
CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
+      String strategyStr = 
serverConf.getProperty(Server.CONFIG_OF_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY,
+          Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
       QueryThreadExceedStrategy exceedStrategy;
       try {
         exceedStrategy = 
QueryThreadExceedStrategy.valueOf(strategyStr.toUpperCase());
       } catch (IllegalArgumentException e) {
         LOGGER.error("Invalid exceed strategy: {}, using default: {}", 
strategyStr,
-            
CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
-        exceedStrategy = QueryThreadExceedStrategy.valueOf(
-            
CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
+            Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
+        exceedStrategy = 
QueryThreadExceedStrategy.valueOf(Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
       }
 
       LOGGER.info("Setting multi-stage executor hardLimit: {} exceedStrategy: 
{}", hardLimit, exceedStrategy);
@@ -306,9 +302,9 @@ public class QueryRunner {
     }
 
     // run OpChain
-    OpChainExecutionContext executionContext = 
OpChainExecutionContext.fromQueryContext(_mailboxService,
-        opChainMetadata, stageMetadata, workerMetadata, pipelineBreakerResult, 
parentContext,
-        _sendStats.getAsBoolean());
+    OpChainExecutionContext executionContext =
+        OpChainExecutionContext.fromQueryContext(_mailboxService, 
opChainMetadata, stageMetadata, workerMetadata,
+            pipelineBreakerResult, parentContext, _sendStats.getAsBoolean());
     OpChain opChain;
     if (workerMetadata.isLeafStageWorker()) {
       Map<String, String> rlsFilters = 
RlsUtils.extractRlsFilters(requestMetadata);
@@ -327,8 +323,8 @@ public class QueryRunner {
     }
   }
 
-  private void notifyErrorAfterSubmission(int stageId, ErrorMseBlock 
errorBlock,
-      WorkerMetadata workerMetadata, StagePlan stagePlan) {
+  private void notifyErrorAfterSubmission(int stageId, ErrorMseBlock 
errorBlock, WorkerMetadata workerMetadata,
+      StagePlan stagePlan) {
     long requestId = QueryThreadContext.getRequestId();
     LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, 
sending error block: {}", requestId,
         stageId, errorBlock);
@@ -377,10 +373,10 @@ public class QueryRunner {
       try {
         String planId = pair.getRight();
         Map<String, String> errorMetadata = new HashMap<>();
-        errorMetadata.put(WorkerResponseMetadataKeys.ERROR_TYPE, 
t.getClass().getSimpleName());
-        errorMetadata.put(WorkerResponseMetadataKeys.ERROR_MESSAGE,
+        errorMetadata.put(Response.MetadataKeys.TimeSeries.ERROR_TYPE, 
t.getClass().getSimpleName());
+        errorMetadata.put(Response.MetadataKeys.TimeSeries.ERROR_MESSAGE,
             t.getMessage() == null ? "Unknown error: no message" : 
t.getMessage());
-        errorMetadata.put(WorkerResponseMetadataKeys.PLAN_ID, planId);
+        errorMetadata.put(Response.MetadataKeys.TimeSeries.PLAN_ID, planId);
         // TODO(timeseries): remove logging for failed queries.
         LOGGER.warn("time-series query failed:", t);
         
responseObserver.onNext(Worker.TimeSeriesResponse.newBuilder().putAllMetadata(errorMetadata).build());
@@ -401,8 +397,9 @@ public class QueryRunner {
       List<BaseTimeSeriesPlanNode> fragmentRoots =
           
serializedPlanFragments.stream().map(TimeSeriesPlanSerde::deserialize).collect(Collectors.toList());
       TimeSeriesExecutionContext context =
-          new 
TimeSeriesExecutionContext(metadata.get(WorkerRequestMetadataKeys.LANGUAGE), 
extractTimeBuckets(metadata),
-              deadlineMs, metadata, extractPlanToSegmentMap(metadata), 
Collections.emptyMap());
+          new 
TimeSeriesExecutionContext(metadata.get(Request.MetadataKeys.TimeSeries.LANGUAGE),
+              extractTimeBuckets(metadata), deadlineMs, metadata, 
extractPlanToSegmentMap(metadata),
+              Collections.emptyMap());
       final List<BaseTimeSeriesOperator> fragmentOpChains = 
fragmentRoots.stream().map(x -> {
         return _timeSeriesPhysicalPlanVisitor.compile(x, context);
       }).collect(Collectors.toList());
@@ -416,7 +413,7 @@ public class QueryRunner {
             TimeSeriesBlock seriesBlock = fragmentOpChain.nextBlock();
             Worker.TimeSeriesResponse response = 
Worker.TimeSeriesResponse.newBuilder()
                 
.setPayload(TimeSeriesBlockSerde.serializeTimeSeriesBlock(seriesBlock))
-                
.putAllMetadata(ImmutableMap.of(WorkerResponseMetadataKeys.PLAN_ID, 
currentPlanId))
+                
.putAllMetadata(Map.of(Response.MetadataKeys.TimeSeries.PLAN_ID, currentPlanId))
                 .build();
             responseObserver.onNext(response);
           }
@@ -548,8 +545,9 @@ public class QueryRunner {
       }
     };
     // compile OpChain
-    OpChainExecutionContext executionContext = 
OpChainExecutionContext.fromQueryContext(_mailboxService,
-        opChainMetadata, stageMetadata, workerMetadata, null, null, false);
+    OpChainExecutionContext executionContext =
+        OpChainExecutionContext.fromQueryContext(_mailboxService, 
opChainMetadata, stageMetadata, workerMetadata, null,
+            null, false);
 
     OpChain opChain =
         ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan, 
_leafQueryExecutor, _executorService,
@@ -585,21 +583,21 @@ public class QueryRunner {
   // Time series related utility methods below
 
   private long extractDeadlineMs(Map<String, String> metadataMap) {
-    return 
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.DEADLINE_MS));
+    return 
Long.parseLong(metadataMap.get(Request.MetadataKeys.TimeSeries.DEADLINE_MS));
   }
 
   private TimeBuckets extractTimeBuckets(Map<String, String> metadataMap) {
-    long startTimeSeconds = 
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.START_TIME_SECONDS));
-    long windowSeconds = 
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.WINDOW_SECONDS));
-    int numElements = 
Integer.parseInt(metadataMap.get(WorkerRequestMetadataKeys.NUM_ELEMENTS));
+    long startTimeSeconds = 
Long.parseLong(metadataMap.get(Request.MetadataKeys.TimeSeries.START_TIME_SECONDS));
+    long windowSeconds = 
Long.parseLong(metadataMap.get(Request.MetadataKeys.TimeSeries.WINDOW_SECONDS));
+    int numElements = 
Integer.parseInt(metadataMap.get(Request.MetadataKeys.TimeSeries.NUM_ELEMENTS));
     return TimeBuckets.ofSeconds(startTimeSeconds, 
Duration.ofSeconds(windowSeconds), numElements);
   }
 
   private Map<String, List<String>> extractPlanToSegmentMap(Map<String, 
String> metadataMap) {
     Map<String, List<String>> result = new HashMap<>();
     for (var entry : metadataMap.entrySet()) {
-      if (WorkerRequestMetadataKeys.isKeySegmentList(entry.getKey())) {
-        String planId = 
WorkerRequestMetadataKeys.decodeSegmentListKey(entry.getKey());
+      if (Request.MetadataKeys.TimeSeries.isKeySegmentList(entry.getKey())) {
+        String planId = 
Request.MetadataKeys.TimeSeries.decodeSegmentListKey(entry.getKey());
         String[] segments = entry.getValue().split(",");
         result.put(planId, 
Stream.of(segments).map(String::strip).collect(Collectors.toList()));
       }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 2848054dee8..c7e4bd5c9ba 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -29,7 +29,6 @@ import java.io.DataInputStream;
 import java.io.InputStream;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -94,9 +93,11 @@ import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.trace.Tracing;
-import org.apache.pinot.spi.utils.CommonConstants;
+import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.PlanVersions;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys;
+import 
org.apache.pinot.spi.utils.CommonConstants.Query.Response.ServerResponseStatus;
 import org.apache.pinot.tsdb.planner.TimeSeriesExchangeNode;
-import 
org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerRequestMetadataKeys;
 import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
 import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
@@ -122,8 +123,8 @@ public class QueryDispatcher {
   private final TlsConfig _tlsConfig;
   // maps broker-generated query id to the set of servers that the query was 
dispatched to
   private final Map<Long, Set<QueryServerInstance>> _serversByQuery;
-  private final PhysicalTimeSeriesBrokerPlanVisitor 
_timeSeriesBrokerPlanVisitor
-      = new PhysicalTimeSeriesBrokerPlanVisitor();
+  private final PhysicalTimeSeriesBrokerPlanVisitor 
_timeSeriesBrokerPlanVisitor =
+      new PhysicalTimeSeriesBrokerPlanVisitor();
   private final FailureDetector _failureDetector;
   private final Duration _cancelTimeout;
 
@@ -223,19 +224,17 @@ public class QueryDispatcher {
     long requestId = context.getRequestId();
     List<PlanNode> planNodes = new ArrayList<>();
 
-    Set<DispatchablePlanFragment> plans = Collections.singleton(fragment);
+    Set<DispatchablePlanFragment> plans = Set.of(fragment);
     Set<QueryServerInstance> servers = new HashSet<>();
     try {
       SendRequest<Worker.QueryRequest, List<Worker.ExplainResponse>> 
requestSender = DispatchClient::explain;
       execute(requestId, plans, timeoutMs, queryOptions, requestSender, 
servers, (responses, serverInstance) -> {
         for (Worker.ExplainResponse response : responses) {
-          if 
(response.containsMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR))
 {
+          if (response.containsMetadata(ServerResponseStatus.STATUS_ERROR)) {
             cancel(requestId, servers);
             throw new RuntimeException(
                 String.format("Unable to explain query plan for request: %d on 
server: %s, ERROR: %s", requestId,
-                    serverInstance,
-                    
response.getMetadataOrDefault(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR,
-                        "null")));
+                    serverInstance, 
response.getMetadataOrDefault(ServerResponseStatus.STATUS_ERROR, "null")));
           }
           for (Worker.StagePlan stagePlan : response.getStagePlanList()) {
             try {
@@ -244,8 +243,8 @@ public class QueryDispatcher {
               planNodes.add(PlanNodeDeserializer.process(planNode));
             } catch (InvalidProtocolBufferException e) {
               cancel(requestId, servers);
-              throw new RuntimeException("Failed to parse explain plan node 
for request " + requestId + " from server "
-                  + serverInstance, e);
+              throw new RuntimeException(
+                  "Failed to parse explain plan node for request " + requestId 
+ " from server " + serverInstance, e);
             }
           }
         }
@@ -259,23 +258,20 @@ public class QueryDispatcher {
   }
 
   @VisibleForTesting
-  void submit(
-      long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, 
Set<QueryServerInstance> serversOut,
-      Map<String, String> queryOptions)
+  void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long 
timeoutMs,
+      Set<QueryServerInstance> serversOut, Map<String, String> queryOptions)
       throws Exception {
     SendRequest<Worker.QueryRequest, Worker.QueryResponse> requestSender = 
DispatchClient::submit;
     Set<DispatchablePlanFragment> plansWithoutRoot = 
dispatchableSubPlan.getQueryStagesWithoutRoot();
     execute(requestId, plansWithoutRoot, timeoutMs, queryOptions, 
requestSender, serversOut,
         (response, serverInstance) -> {
-      if 
(response.containsMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR))
 {
-        cancel(requestId, serversOut);
-        throw new RuntimeException(
-            String.format("Unable to execute query plan for request: %d on 
server: %s, ERROR: %s", requestId,
-                serverInstance,
-                
response.getMetadataOrDefault(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR,
-                    "null")));
-      }
-    });
+          if (response.containsMetadata(ServerResponseStatus.STATUS_ERROR)) {
+            cancel(requestId, serversOut);
+            throw new RuntimeException(
+                String.format("Unable to execute query plan for request: %d on 
server: %s, ERROR: %s", requestId,
+                    serverInstance, 
response.getMetadataOrDefault(ServerResponseStatus.STATUS_ERROR, "null")));
+          }
+        });
     if (isQueryCancellationEnabled()) {
       _serversByQuery.put(requestId, serversOut);
     }
@@ -308,17 +304,13 @@ public class QueryDispatcher {
     return _serversByQuery != null;
   }
 
-  private <E> void execute(long requestId, Set<DispatchablePlanFragment> 
stagePlans,
-      long timeoutMs, Map<String, String> queryOptions,
-      SendRequest<Worker.QueryRequest, E> sendRequest, 
Set<QueryServerInstance> serverInstancesOut,
-      BiConsumer<E, QueryServerInstance> resultConsumer)
+  private <E> void execute(long requestId, Set<DispatchablePlanFragment> 
stagePlans, long timeoutMs,
+      Map<String, String> queryOptions, SendRequest<Worker.QueryRequest, E> 
sendRequest,
+      Set<QueryServerInstance> serverInstancesOut, BiConsumer<E, 
QueryServerInstance> resultConsumer)
       throws ExecutionException, InterruptedException, TimeoutException {
-
     Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS);
 
-    Map<DispatchablePlanFragment, StageInfo> stageInfos =
-        serializePlanFragments(stagePlans, serverInstancesOut, deadline);
-
+    Map<DispatchablePlanFragment, StageInfo> stageInfos = 
serializePlanFragments(stagePlans, serverInstancesOut);
     if (serverInstancesOut.isEmpty()) {
       return;
     }
@@ -372,10 +364,8 @@ public class QueryDispatcher {
           // subsequent query failures
           if 
(getOrCreateDispatchClient(resp.getServerInstance()).getChannel().getState(false)
               != ConnectivityState.READY) {
-            _failureDetector.markServerUnhealthy(
-                resp.getServerInstance().getInstanceId(),
-                resp.getServerInstance().getHostname()
-            );
+            
_failureDetector.markServerUnhealthy(resp.getServerInstance().getInstanceId(),
+                resp.getServerInstance().getHostname());
           }
           throw new RuntimeException(
               String.format("Error dispatching query: %d to server: %s", 
requestId, resp.getServerInstance()),
@@ -399,24 +389,24 @@ public class QueryDispatcher {
       RequestContext requestContext, String instanceId) {
     Map<String, String> result = new HashMap<>();
     TimeBuckets timeBuckets = dispatchablePlan.getTimeBuckets();
-    result.put(WorkerRequestMetadataKeys.LANGUAGE, 
dispatchablePlan.getLanguage());
-    result.put(WorkerRequestMetadataKeys.START_TIME_SECONDS, 
Long.toString(timeBuckets.getTimeBuckets()[0]));
-    result.put(WorkerRequestMetadataKeys.WINDOW_SECONDS, 
Long.toString(timeBuckets.getBucketSize().getSeconds()));
-    result.put(WorkerRequestMetadataKeys.NUM_ELEMENTS, 
Long.toString(timeBuckets.getTimeBuckets().length));
-    result.put(WorkerRequestMetadataKeys.DEADLINE_MS, 
Long.toString(deadlineMs));
+    result.put(MetadataKeys.TimeSeries.LANGUAGE, 
dispatchablePlan.getLanguage());
+    result.put(MetadataKeys.TimeSeries.START_TIME_SECONDS, 
Long.toString(timeBuckets.getTimeBuckets()[0]));
+    result.put(MetadataKeys.TimeSeries.WINDOW_SECONDS, 
Long.toString(timeBuckets.getBucketSize().getSeconds()));
+    result.put(MetadataKeys.TimeSeries.NUM_ELEMENTS, 
Long.toString(timeBuckets.getTimeBuckets().length));
+    result.put(MetadataKeys.TimeSeries.DEADLINE_MS, Long.toString(deadlineMs));
     Map<String, List<String>> leafIdToSegments = 
dispatchablePlan.getLeafIdToSegmentsByInstanceId().get(instanceId);
     for (Map.Entry<String, List<String>> entry : leafIdToSegments.entrySet()) {
-      
result.put(WorkerRequestMetadataKeys.encodeSegmentListKey(entry.getKey()), 
String.join(",", entry.getValue()));
+      result.put(MetadataKeys.TimeSeries.encodeSegmentListKey(entry.getKey()), 
String.join(",", entry.getValue()));
     }
-    result.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, 
Long.toString(requestContext.getRequestId()));
-    result.put(CommonConstants.Query.Request.MetadataKeys.BROKER_ID, 
requestContext.getBrokerId());
+    result.put(MetadataKeys.REQUEST_ID, 
Long.toString(requestContext.getRequestId()));
+    result.put(MetadataKeys.BROKER_ID, requestContext.getBrokerId());
     return result;
   }
 
   private static Worker.QueryRequest createRequest(QueryServerInstance 
serverInstance,
       Map<DispatchablePlanFragment, StageInfo> stageInfos, ByteString 
protoRequestMetadata) {
     Worker.QueryRequest.Builder requestBuilder = 
Worker.QueryRequest.newBuilder();
-    
requestBuilder.setVersion(CommonConstants.MultiStageQueryRunner.PlanVersions.V1);
+    requestBuilder.setVersion(PlanVersions.V1);
 
     for (Map.Entry<DispatchablePlanFragment, StageInfo> entry : 
stageInfos.entrySet()) {
       DispatchablePlanFragment stagePlan = entry.getKey();
@@ -433,13 +423,11 @@ public class QueryDispatcher {
 
         Worker.StagePlan requestStagePlan = Worker.StagePlan.newBuilder()
             .setRootNode(stageInfo._rootNode)
-            .setStageMetadata(
-                Worker.StageMetadata.newBuilder()
-                    .setStageId(stagePlan.getPlanFragment().getFragmentId())
-                    .addAllWorkerMetadata(protoWorkerMetadataList)
-                    .setCustomProperty(stageInfo._customProperty)
-                    .build()
-            )
+            .setStageMetadata(Worker.StageMetadata.newBuilder()
+                .setStageId(stagePlan.getPlanFragment().getFragmentId())
+                .addAllWorkerMetadata(protoWorkerMetadataList)
+                .setCustomProperty(stageInfo._customProperty)
+                .build())
             .build();
         requestBuilder.addStagePlan(requestStagePlan);
       }
@@ -451,19 +439,17 @@ public class QueryDispatcher {
   private static Map<String, String> prepareRequestMetadata(long requestId, 
String cid,
       Map<String, String> queryOptions, Deadline deadline) {
     Map<String, String> requestMetadata = new HashMap<>();
-    requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, 
Long.toString(requestId));
-    
requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.CORRELATION_ID, 
cid);
-    
requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
-        Long.toString(deadline.timeRemaining(TimeUnit.MILLISECONDS)));
-    
requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS,
+    requestMetadata.put(MetadataKeys.REQUEST_ID, Long.toString(requestId));
+    requestMetadata.put(MetadataKeys.CORRELATION_ID, cid);
+    requestMetadata.put(QueryOptionKey.TIMEOUT_MS, 
Long.toString(deadline.timeRemaining(TimeUnit.MILLISECONDS)));
+    requestMetadata.put(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS,
         Long.toString(QueryThreadContext.getPassiveDeadlineMs() - 
QueryThreadContext.getActiveDeadlineMs()));
     requestMetadata.putAll(queryOptions);
     return requestMetadata;
   }
 
-  private Map<DispatchablePlanFragment, StageInfo> serializePlanFragments(
-      Set<DispatchablePlanFragment> stagePlans,
-      Set<QueryServerInstance> serverInstances, Deadline deadline)
+  private Map<DispatchablePlanFragment, StageInfo> 
serializePlanFragments(Set<DispatchablePlanFragment> stagePlans,
+      Set<QueryServerInstance> serverInstances)
       throws InterruptedException, ExecutionException {
     List<CompletableFuture<Pair<DispatchablePlanFragment, StageInfo>>> 
stageInfoFutures =
         new ArrayList<>(stagePlans.size());
@@ -530,7 +516,6 @@ public class QueryDispatcher {
     return true;
   }
 
-
   @Nullable
   private MultiStageQueryStats cancelWithStats(long requestId, @Nullable 
Set<QueryServerInstance> servers) {
     if (servers == null) {
@@ -539,8 +524,8 @@ public class QueryDispatcher {
 
     Deadline deadline = Deadline.after(_cancelTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
     SendRequest<Long, Worker.CancelResponse> sendRequest = 
DispatchClient::cancel;
-    BlockingQueue<AsyncResponse<Worker.CancelResponse>> dispatchCallbacks = 
dispatch(sendRequest, servers, deadline,
-        serverInstance -> requestId);
+    BlockingQueue<AsyncResponse<Worker.CancelResponse>> dispatchCallbacks =
+        dispatch(sendRequest, servers, deadline, serverInstance -> requestId);
 
     MultiStageQueryStats stats = MultiStageQueryStats.emptyStats(0);
     StatMap<BaseMailboxReceiveOperator.StatKey> rootStats = new 
StatMap<>(BaseMailboxReceiveOperator.StatKey.class);
@@ -549,8 +534,7 @@ public class QueryDispatcher {
       processResults(requestId, servers.size(), (response, server) -> {
         Map<Integer, ByteString> statsByStage = response.getStatsByStageMap();
         for (Map.Entry<Integer, ByteString> entry : statsByStage.entrySet()) {
-          try (InputStream is = entry.getValue().newInput();
-              DataInputStream dis = new DataInputStream(is)) {
+          try (InputStream is = entry.getValue().newInput(); DataInputStream 
dis = new DataInputStream(is)) {
             MultiStageQueryStats.StageStats.Closed closed = 
MultiStageQueryStats.StageStats.Closed.deserialize(dis);
             stats.mergeUpstream(entry.getKey(), closed);
           } catch (Exception e) {
@@ -585,19 +569,10 @@ public class QueryDispatcher {
   /// Concatenates the results of the sub-plan and returns a [QueryResult] 
with the concatenated result.
   ///
   /// This method assumes the caller thread is a query thread and therefore 
[QueryThreadContext] has been initialized.
-  private static QueryResult runReducer(
-      DispatchableSubPlan subPlan,
-      Map<String, String> queryOptions,
-      MailboxService mailboxService
-  ) {
-    return runReducer(
-        QueryThreadContext.getRequestId(),
-        subPlan,
-        QueryThreadContext.getActiveDeadlineMs(),
-        QueryThreadContext.getPassiveDeadlineMs(),
-        queryOptions,
-        mailboxService
-    );
+  private static QueryResult runReducer(DispatchableSubPlan subPlan, 
Map<String, String> queryOptions,
+      MailboxService mailboxService) {
+    return runReducer(QueryThreadContext.getRequestId(), subPlan, 
QueryThreadContext.getActiveDeadlineMs(),
+        QueryThreadContext.getPassiveDeadlineMs(), queryOptions, 
mailboxService);
   }
 
   /// Concatenates the results of the sub-plan and returns a [QueryResult] 
with the concatenated result.
@@ -607,26 +582,22 @@ public class QueryDispatcher {
   ///
   /// Remember that in MSE there is no actual reduce but rather a single stage 
that concatenates the results.
   @VisibleForTesting
-  public static QueryResult runReducer(long requestId,
-      DispatchableSubPlan subPlan,
-      long activeDeadlineMs,
-      long passiveDeadlineMs,
-      Map<String, String> queryOptions,
-      MailboxService mailboxService) {
+  public static QueryResult runReducer(long requestId, DispatchableSubPlan 
subPlan, long activeDeadlineMs,
+      long passiveDeadlineMs, Map<String, String> queryOptions, MailboxService 
mailboxService) {
     long startTimeMs = System.currentTimeMillis();
     // NOTE: Reduce stage is always stage 0
     DispatchablePlanFragment stagePlan = subPlan.getQueryStageMap().get(0);
     PlanFragment planFragment = stagePlan.getPlanFragment();
     PlanNode rootNode = planFragment.getFragmentRoot();
     List<WorkerMetadata> workerMetadata = stagePlan.getWorkerMetadataList();
-    Preconditions.checkState(workerMetadata.size() == 1,
-        "Expecting single worker for reduce stage, got: %s", 
workerMetadata.size());
+    Preconditions.checkState(workerMetadata.size() == 1, "Expecting single 
worker for reduce stage, got: %s",
+        workerMetadata.size());
 
     StageMetadata stageMetadata = new StageMetadata(0, workerMetadata, 
stagePlan.getCustomProperties());
     ThreadExecutionContext parentContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
     OpChainExecutionContext executionContext =
-        new OpChainExecutionContext(mailboxService, requestId, 
activeDeadlineMs, passiveDeadlineMs,
-            queryOptions, stageMetadata, workerMetadata.get(0), null, 
parentContext, true);
+        new OpChainExecutionContext(mailboxService, requestId, 
activeDeadlineMs, passiveDeadlineMs, queryOptions,
+            stageMetadata, workerMetadata.get(0), null, parentContext, true);
 
     PairList<Integer, String> resultFields = subPlan.getQueryResultFields();
     DataSchema sourceSchema = rootNode.getDataSchema();
@@ -643,9 +614,9 @@ public class QueryDispatcher {
     ArrayList<Object[]> resultRows = new ArrayList<>();
     MseBlock block;
     MultiStageQueryStats queryStats;
-    try (
-        QueryThreadContext.CloseableContext mseCloseableCtx = 
MseWorkerThreadContext.open();
-        OpChain opChain = PlanNodeToOpChain.convert(rootNode, 
executionContext, (a, b) -> { })) {
+    try (QueryThreadContext.CloseableContext mseCloseableCtx = 
MseWorkerThreadContext.open();
+        OpChain opChain = PlanNodeToOpChain.convert(rootNode, 
executionContext, (a, b) -> {
+        })) {
       MseWorkerThreadContext.setStageId(0);
       MseWorkerThreadContext.setWorkerId(0);
       MultiStageOperator rootOperator = opChain.getRoot();
@@ -692,11 +663,10 @@ public class QueryDispatcher {
         error = queryExceptions.entrySet().iterator().next();
         errorMessage = "Received 1 error" + from + ": " + error.getValue();
       } else {
-        error = queryExceptions.entrySet().stream()
-            .max(QueryDispatcher::compareErrors)
-            .orElseThrow();
-        errorMessage = "Received " + queryExceptions.size() + " errors" + from 
+ ". "
-                + "The one with highest priority is: " + error.getValue();
+        error = 
queryExceptions.entrySet().stream().max(QueryDispatcher::compareErrors).orElseThrow();
+        errorMessage =
+            "Received " + queryExceptions.size() + " errors" + from + ". " + 
"The one with highest priority is: "
+                + error.getValue();
       }
       QueryProcessingException processingEx = new 
QueryProcessingException(error.getKey().getId(), errorMessage);
       return new QueryResult(processingEx, queryStats, 
System.currentTimeMillis() - startTimeMs);
@@ -748,10 +718,11 @@ public class QueryDispatcher {
     Map<String, BlockingQueue<Object>> receiversByPlanId = new HashMap<>();
     populateConsumers(brokerFragment, receiversByPlanId);
     // Compile brokerFragment to get operators
-    TimeSeriesExecutionContext brokerExecutionContext = new 
TimeSeriesExecutionContext(plan.getLanguage(),
-        plan.getTimeBuckets(), deadlineMs, Collections.emptyMap(), 
Collections.emptyMap(), receiversByPlanId);
-    BaseTimeSeriesOperator brokerOperator = 
_timeSeriesBrokerPlanVisitor.compile(brokerFragment,
-        brokerExecutionContext, plan.getNumInputServersForExchangePlanNode());
+    TimeSeriesExecutionContext brokerExecutionContext =
+        new TimeSeriesExecutionContext(plan.getLanguage(), 
plan.getTimeBuckets(), deadlineMs, Map.of(), Map.of(),
+            receiversByPlanId);
+    BaseTimeSeriesOperator brokerOperator = 
_timeSeriesBrokerPlanVisitor.compile(brokerFragment, brokerExecutionContext,
+        plan.getNumInputServersForExchangePlanNode());
     // Create dispatch observer for each query server
     for (TimeSeriesQueryServerInstance serverInstance : 
plan.getQueryServerInstances()) {
       String serverId = serverInstance.getInstanceId();
@@ -761,10 +732,9 @@ public class QueryDispatcher {
       Worker.TimeSeriesQueryRequest request = 
Worker.TimeSeriesQueryRequest.newBuilder()
           .addAllDispatchPlan(plan.getSerializedServerFragments())
           .putAllMetadata(initializeTimeSeriesMetadataMap(plan, deadlineMs, 
requestContext, serverId))
-          .putMetadata(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, 
Long.toString(requestId))
+          .putMetadata(MetadataKeys.REQUEST_ID, Long.toString(requestId))
           .build();
-      TimeSeriesDispatchObserver
-          dispatchObserver = new TimeSeriesDispatchObserver(receiversByPlanId);
+      TimeSeriesDispatchObserver dispatchObserver = new 
TimeSeriesDispatchObserver(receiversByPlanId);
       getOrCreateTimeSeriesDispatchClient(serverInstance).submit(request, 
deadline, dispatchObserver);
     }
     // Execute broker fragment
@@ -853,7 +823,7 @@ public class QueryDispatcher {
   }
 
   private interface SendRequest<R, E> {
-    void send(DispatchClient dispatchClient, R request, QueryServerInstance 
serverInstance,
-        Deadline deadline, Consumer<AsyncResponse<E>> callbackConsumer);
+    void send(DispatchClient dispatchClient, R request, QueryServerInstance 
serverInstance, Deadline deadline,
+        Consumer<AsyncResponse<E>> callbackConsumer);
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
index 599ce414c0c..f285418fc7b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.query.runtime.timeseries.serde.TimeSeriesBlockSerde;
-import 
org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerResponseMetadataKeys;
+import 
org.apache.pinot.spi.utils.CommonConstants.Query.Response.MetadataKeys.TimeSeries;
 import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,13 +50,13 @@ public class TimeSeriesDispatchObserver implements 
StreamObserver<Worker.TimeSer
 
   @Override
   public void onNext(Worker.TimeSeriesResponse timeSeriesResponse) {
-    if 
(timeSeriesResponse.containsMetadata(WorkerResponseMetadataKeys.ERROR_TYPE)) {
-      String errorType = 
timeSeriesResponse.getMetadataOrDefault(WorkerResponseMetadataKeys.ERROR_TYPE, 
"");
-      String errorMessage = 
timeSeriesResponse.getMetadataOrDefault(WorkerResponseMetadataKeys.ERROR_MESSAGE,
 "");
+    if (timeSeriesResponse.containsMetadata(TimeSeries.ERROR_TYPE)) {
+      String errorType = 
timeSeriesResponse.getMetadataOrDefault(TimeSeries.ERROR_TYPE, "");
+      String errorMessage = 
timeSeriesResponse.getMetadataOrDefault(TimeSeries.ERROR_MESSAGE, "");
       onError(new Throwable(String.format("Error in server (type: %s): %s", 
errorType, errorMessage)));
       return;
     }
-    String planId = 
timeSeriesResponse.getMetadataMap().get(WorkerResponseMetadataKeys.PLAN_ID);
+    String planId = 
timeSeriesResponse.getMetadataMap().get(TimeSeries.PLAN_ID);
     TimeSeriesBlock block = null;
     Throwable error = null;
     try {
@@ -66,8 +66,9 @@ public class TimeSeriesDispatchObserver implements 
StreamObserver<Worker.TimeSer
     }
     BlockingQueue<Object> receiverForPlanId = 
_exchangeReceiversByPlanId.get(planId);
     if (receiverForPlanId == null) {
-      String message = String.format("Receiver is not initialized for planId: 
%s. Receivers exist only for planIds: %s",
-          planId, _exchangeReceiversByPlanId.keySet());
+      String message =
+          String.format("Receiver is not initialized for planId: %s. Receivers 
exist only for planIds: %s", planId,
+              _exchangeReceiversByPlanId.keySet());
       LOGGER.warn(message);
       onError(new IllegalStateException(message));
     } else {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 10cf20e7ab4..5697eef08e6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1801,6 +1801,29 @@ public class CommonConstants {
         public static final String ENABLE_TRACE = "enableTrace";
         public static final String ENABLE_STREAMING = "enableStreaming";
         public static final String PAYLOAD_TYPE = "payloadType";
+
+        public static class TimeSeries {
+          public static final String LANGUAGE = "language";
+          public static final String START_TIME_SECONDS = "startTimeSeconds";
+          public static final String WINDOW_SECONDS = "windowSeconds";
+          public static final String NUM_ELEMENTS = "numElements";
+          public static final String DEADLINE_MS = "deadlineMs";
+
+          public static final String SEGMENT_MAP_ENTRY_PREFIX = 
"$segmentMapEntry#";
+
+          public static boolean isKeySegmentList(String key) {
+            return key.startsWith(SEGMENT_MAP_ENTRY_PREFIX);
+          }
+
+          public static String encodeSegmentListKey(String planId) {
+            return SEGMENT_MAP_ENTRY_PREFIX + planId;
+          }
+
+          /// Returns the plan-id corresponding to the encoded key.
+          public static String decodeSegmentListKey(String key) {
+            return key.substring(SEGMENT_MAP_ENTRY_PREFIX.length());
+          }
+        }
       }
 
       public static class PayloadType {
@@ -1812,6 +1835,12 @@ public class CommonConstants {
     public static class Response {
       public static class MetadataKeys {
         public static final String RESPONSE_TYPE = "responseType";
+
+        public static class TimeSeries {
+          public static final String PLAN_ID = "planId";
+          public static final String ERROR_TYPE = "errorType";
+          public static final String ERROR_MESSAGE = "error";
+        }
       }
 
       public static class ResponseType {
diff --git 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
deleted file mode 100644
index eaf5b363861..00000000000
--- 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.tsdb.planner;
-
-public class TimeSeriesPlanConstants {
-  private TimeSeriesPlanConstants() {
-  }
-
-  public static class WorkerRequestMetadataKeys {
-    private static final String SEGMENT_MAP_ENTRY_PREFIX = "$segmentMapEntry#";
-
-    private WorkerRequestMetadataKeys() {
-    }
-
-    public static final String LANGUAGE = "language";
-    public static final String START_TIME_SECONDS = "startTimeSeconds";
-    public static final String WINDOW_SECONDS = "windowSeconds";
-    public static final String NUM_ELEMENTS = "numElements";
-    public static final String DEADLINE_MS = "deadlineMs";
-
-    public static boolean isKeySegmentList(String key) {
-      return key.startsWith(SEGMENT_MAP_ENTRY_PREFIX);
-    }
-
-    public static String encodeSegmentListKey(String planId) {
-      return SEGMENT_MAP_ENTRY_PREFIX + planId;
-    }
-
-    /**
-     * Returns the plan-id corresponding to the encoded key.
-     */
-    public static String decodeSegmentListKey(String key) {
-      return key.substring(SEGMENT_MAP_ENTRY_PREFIX.length());
-    }
-  }
-
-  public static class WorkerResponseMetadataKeys {
-    public static final String PLAN_ID = "planId";
-    public static final String ERROR_TYPE = "errorType";
-    public static final String ERROR_MESSAGE = "error";
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to