This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8eb40796057 Move time-series constants into CommonConstants (#16673)
8eb40796057 is described below
commit 8eb40796057384216be663ba8b927e4151ba951d
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Sat Aug 23 19:11:56 2025 -0700
Move time-series constants into CommonConstants (#16673)
---
.../apache/pinot/query/runtime/QueryRunner.java | 78 +++++----
.../query/service/dispatch/QueryDispatcher.java | 180 +++++++++------------
.../timeseries/TimeSeriesDispatchObserver.java | 15 +-
.../apache/pinot/spi/utils/CommonConstants.java | 29 ++++
.../tsdb/planner/TimeSeriesPlanConstants.java | 58 -------
5 files changed, 150 insertions(+), 210 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 4d21dd8ae41..0c67d30676c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -19,7 +19,6 @@
package org.apache.pinot.query.runtime;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.util.ArrayList;
@@ -84,14 +83,15 @@ import org.apache.pinot.spi.executor.MetricsExecutor;
import org.apache.pinot.spi.executor.ThrottleOnCriticalHeapUsageExecutor;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.query.QueryThreadExceedStrategy;
-import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.WindowOverFlowMode;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Request;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Response;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.sql.parsers.rewriter.RlsUtils;
-import
org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerRequestMetadataKeys;
-import
org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerResponseMetadataKeys;
import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
@@ -152,12 +152,12 @@ public class QueryRunner {
*/
public void init(PinotConfiguration serverConf, InstanceDataManager
instanceDataManager,
@Nullable TlsConfig tlsConfig, BooleanSupplier sendStats,
ThreadResourceUsageAccountant resourceUsageAccountant) {
- String hostname =
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
- if (hostname.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
- hostname =
hostname.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH);
+ String hostname =
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
+ if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
+ hostname = hostname.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH);
}
- int port =
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
- CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT);
+ int port =
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
+ MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT);
// TODO: Consider using separate config for intermediate stage and leaf
stage
String numGroupsLimitStr =
serverConf.getProperty(Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT);
@@ -184,18 +184,16 @@ public class QueryRunner {
_mseMaxInitialResultHolderCapacity =
mseMaxInitialGroupHolderCapacity != null ?
Integer.parseInt(mseMaxInitialGroupHolderCapacity) : null;
- String maxRowsInJoinStr =
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_JOIN);
+ String maxRowsInJoinStr =
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_JOIN);
_maxRowsInJoin = maxRowsInJoinStr != null ?
Integer.parseInt(maxRowsInJoinStr) : null;
- String joinOverflowModeStr =
-
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE);
+ String joinOverflowModeStr =
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE);
_joinOverflowMode = joinOverflowModeStr != null ?
JoinOverFlowMode.valueOf(joinOverflowModeStr) : null;
- String maxRowsInWindowStr =
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_WINDOW);
+ String maxRowsInWindowStr =
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_WINDOW);
_maxRowsInWindow = maxRowsInWindowStr != null ?
Integer.parseInt(maxRowsInWindowStr) : null;
- String windowOverflowModeStr =
-
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_WINDOW_OVERFLOW_MODE);
+ String windowOverflowModeStr =
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_WINDOW_OVERFLOW_MODE);
_windowOverflowMode = windowOverflowModeStr != null ?
WindowOverFlowMode.valueOf(windowOverflowModeStr) : null;
ExecutorService baseExecutorService =
@@ -211,17 +209,15 @@ public class QueryRunner {
int hardLimit =
HardLimitExecutor.getMultiStageExecutorHardLimit(serverConf);
if (hardLimit > 0) {
- String strategyStr = serverConf.getProperty(
-
CommonConstants.Server.CONFIG_OF_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY,
-
CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
+ String strategyStr =
serverConf.getProperty(Server.CONFIG_OF_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY,
+ Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
QueryThreadExceedStrategy exceedStrategy;
try {
exceedStrategy =
QueryThreadExceedStrategy.valueOf(strategyStr.toUpperCase());
} catch (IllegalArgumentException e) {
LOGGER.error("Invalid exceed strategy: {}, using default: {}",
strategyStr,
-
CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
- exceedStrategy = QueryThreadExceedStrategy.valueOf(
-
CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
+ Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
+ exceedStrategy =
QueryThreadExceedStrategy.valueOf(Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
}
LOGGER.info("Setting multi-stage executor hardLimit: {} exceedStrategy:
{}", hardLimit, exceedStrategy);
@@ -306,9 +302,9 @@ public class QueryRunner {
}
// run OpChain
- OpChainExecutionContext executionContext =
OpChainExecutionContext.fromQueryContext(_mailboxService,
- opChainMetadata, stageMetadata, workerMetadata, pipelineBreakerResult,
parentContext,
- _sendStats.getAsBoolean());
+ OpChainExecutionContext executionContext =
+ OpChainExecutionContext.fromQueryContext(_mailboxService,
opChainMetadata, stageMetadata, workerMetadata,
+ pipelineBreakerResult, parentContext, _sendStats.getAsBoolean());
OpChain opChain;
if (workerMetadata.isLeafStageWorker()) {
Map<String, String> rlsFilters =
RlsUtils.extractRlsFilters(requestMetadata);
@@ -327,8 +323,8 @@ public class QueryRunner {
}
}
- private void notifyErrorAfterSubmission(int stageId, ErrorMseBlock
errorBlock,
- WorkerMetadata workerMetadata, StagePlan stagePlan) {
+ private void notifyErrorAfterSubmission(int stageId, ErrorMseBlock
errorBlock, WorkerMetadata workerMetadata,
+ StagePlan stagePlan) {
long requestId = QueryThreadContext.getRequestId();
LOGGER.error("Error executing pipeline breaker for request: {}, stage: {},
sending error block: {}", requestId,
stageId, errorBlock);
@@ -377,10 +373,10 @@ public class QueryRunner {
try {
String planId = pair.getRight();
Map<String, String> errorMetadata = new HashMap<>();
- errorMetadata.put(WorkerResponseMetadataKeys.ERROR_TYPE,
t.getClass().getSimpleName());
- errorMetadata.put(WorkerResponseMetadataKeys.ERROR_MESSAGE,
+ errorMetadata.put(Response.MetadataKeys.TimeSeries.ERROR_TYPE,
t.getClass().getSimpleName());
+ errorMetadata.put(Response.MetadataKeys.TimeSeries.ERROR_MESSAGE,
t.getMessage() == null ? "Unknown error: no message" :
t.getMessage());
- errorMetadata.put(WorkerResponseMetadataKeys.PLAN_ID, planId);
+ errorMetadata.put(Response.MetadataKeys.TimeSeries.PLAN_ID, planId);
// TODO(timeseries): remove logging for failed queries.
LOGGER.warn("time-series query failed:", t);
responseObserver.onNext(Worker.TimeSeriesResponse.newBuilder().putAllMetadata(errorMetadata).build());
@@ -401,8 +397,9 @@ public class QueryRunner {
List<BaseTimeSeriesPlanNode> fragmentRoots =
serializedPlanFragments.stream().map(TimeSeriesPlanSerde::deserialize).collect(Collectors.toList());
TimeSeriesExecutionContext context =
- new
TimeSeriesExecutionContext(metadata.get(WorkerRequestMetadataKeys.LANGUAGE),
extractTimeBuckets(metadata),
- deadlineMs, metadata, extractPlanToSegmentMap(metadata),
Collections.emptyMap());
+ new
TimeSeriesExecutionContext(metadata.get(Request.MetadataKeys.TimeSeries.LANGUAGE),
+ extractTimeBuckets(metadata), deadlineMs, metadata,
extractPlanToSegmentMap(metadata),
+ Collections.emptyMap());
final List<BaseTimeSeriesOperator> fragmentOpChains =
fragmentRoots.stream().map(x -> {
return _timeSeriesPhysicalPlanVisitor.compile(x, context);
}).collect(Collectors.toList());
@@ -416,7 +413,7 @@ public class QueryRunner {
TimeSeriesBlock seriesBlock = fragmentOpChain.nextBlock();
Worker.TimeSeriesResponse response =
Worker.TimeSeriesResponse.newBuilder()
.setPayload(TimeSeriesBlockSerde.serializeTimeSeriesBlock(seriesBlock))
-
.putAllMetadata(ImmutableMap.of(WorkerResponseMetadataKeys.PLAN_ID,
currentPlanId))
+
.putAllMetadata(Map.of(Response.MetadataKeys.TimeSeries.PLAN_ID, currentPlanId))
.build();
responseObserver.onNext(response);
}
@@ -548,8 +545,9 @@ public class QueryRunner {
}
};
// compile OpChain
- OpChainExecutionContext executionContext =
OpChainExecutionContext.fromQueryContext(_mailboxService,
- opChainMetadata, stageMetadata, workerMetadata, null, null, false);
+ OpChainExecutionContext executionContext =
+ OpChainExecutionContext.fromQueryContext(_mailboxService,
opChainMetadata, stageMetadata, workerMetadata, null,
+ null, false);
OpChain opChain =
ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan,
_leafQueryExecutor, _executorService,
@@ -585,21 +583,21 @@ public class QueryRunner {
// Time series related utility methods below
private long extractDeadlineMs(Map<String, String> metadataMap) {
- return
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.DEADLINE_MS));
+ return
Long.parseLong(metadataMap.get(Request.MetadataKeys.TimeSeries.DEADLINE_MS));
}
private TimeBuckets extractTimeBuckets(Map<String, String> metadataMap) {
- long startTimeSeconds =
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.START_TIME_SECONDS));
- long windowSeconds =
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.WINDOW_SECONDS));
- int numElements =
Integer.parseInt(metadataMap.get(WorkerRequestMetadataKeys.NUM_ELEMENTS));
+ long startTimeSeconds =
Long.parseLong(metadataMap.get(Request.MetadataKeys.TimeSeries.START_TIME_SECONDS));
+ long windowSeconds =
Long.parseLong(metadataMap.get(Request.MetadataKeys.TimeSeries.WINDOW_SECONDS));
+ int numElements =
Integer.parseInt(metadataMap.get(Request.MetadataKeys.TimeSeries.NUM_ELEMENTS));
return TimeBuckets.ofSeconds(startTimeSeconds,
Duration.ofSeconds(windowSeconds), numElements);
}
private Map<String, List<String>> extractPlanToSegmentMap(Map<String,
String> metadataMap) {
Map<String, List<String>> result = new HashMap<>();
for (var entry : metadataMap.entrySet()) {
- if (WorkerRequestMetadataKeys.isKeySegmentList(entry.getKey())) {
- String planId =
WorkerRequestMetadataKeys.decodeSegmentListKey(entry.getKey());
+ if (Request.MetadataKeys.TimeSeries.isKeySegmentList(entry.getKey())) {
+ String planId =
Request.MetadataKeys.TimeSeries.decodeSegmentListKey(entry.getKey());
String[] segments = entry.getValue().split(",");
result.put(planId,
Stream.of(segments).map(String::strip).collect(Collectors.toList()));
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 2848054dee8..c7e4bd5c9ba 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -29,7 +29,6 @@ import java.io.DataInputStream;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -94,9 +93,11 @@ import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.Tracing;
-import org.apache.pinot.spi.utils.CommonConstants;
+import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.PlanVersions;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys;
+import
org.apache.pinot.spi.utils.CommonConstants.Query.Response.ServerResponseStatus;
import org.apache.pinot.tsdb.planner.TimeSeriesExchangeNode;
-import
org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerRequestMetadataKeys;
import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance;
import org.apache.pinot.tsdb.spi.TimeBuckets;
@@ -122,8 +123,8 @@ public class QueryDispatcher {
private final TlsConfig _tlsConfig;
// maps broker-generated query id to the set of servers that the query was
dispatched to
private final Map<Long, Set<QueryServerInstance>> _serversByQuery;
- private final PhysicalTimeSeriesBrokerPlanVisitor
_timeSeriesBrokerPlanVisitor
- = new PhysicalTimeSeriesBrokerPlanVisitor();
+ private final PhysicalTimeSeriesBrokerPlanVisitor
_timeSeriesBrokerPlanVisitor =
+ new PhysicalTimeSeriesBrokerPlanVisitor();
private final FailureDetector _failureDetector;
private final Duration _cancelTimeout;
@@ -223,19 +224,17 @@ public class QueryDispatcher {
long requestId = context.getRequestId();
List<PlanNode> planNodes = new ArrayList<>();
- Set<DispatchablePlanFragment> plans = Collections.singleton(fragment);
+ Set<DispatchablePlanFragment> plans = Set.of(fragment);
Set<QueryServerInstance> servers = new HashSet<>();
try {
SendRequest<Worker.QueryRequest, List<Worker.ExplainResponse>>
requestSender = DispatchClient::explain;
execute(requestId, plans, timeoutMs, queryOptions, requestSender,
servers, (responses, serverInstance) -> {
for (Worker.ExplainResponse response : responses) {
- if
(response.containsMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR))
{
+ if (response.containsMetadata(ServerResponseStatus.STATUS_ERROR)) {
cancel(requestId, servers);
throw new RuntimeException(
String.format("Unable to explain query plan for request: %d on
server: %s, ERROR: %s", requestId,
- serverInstance,
-
response.getMetadataOrDefault(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR,
- "null")));
+ serverInstance,
response.getMetadataOrDefault(ServerResponseStatus.STATUS_ERROR, "null")));
}
for (Worker.StagePlan stagePlan : response.getStagePlanList()) {
try {
@@ -244,8 +243,8 @@ public class QueryDispatcher {
planNodes.add(PlanNodeDeserializer.process(planNode));
} catch (InvalidProtocolBufferException e) {
cancel(requestId, servers);
- throw new RuntimeException("Failed to parse explain plan node
for request " + requestId + " from server "
- + serverInstance, e);
+ throw new RuntimeException(
+ "Failed to parse explain plan node for request " + requestId
+ " from server " + serverInstance, e);
}
}
}
@@ -259,23 +258,20 @@ public class QueryDispatcher {
}
@VisibleForTesting
- void submit(
- long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
Set<QueryServerInstance> serversOut,
- Map<String, String> queryOptions)
+ void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long
timeoutMs,
+ Set<QueryServerInstance> serversOut, Map<String, String> queryOptions)
throws Exception {
SendRequest<Worker.QueryRequest, Worker.QueryResponse> requestSender =
DispatchClient::submit;
Set<DispatchablePlanFragment> plansWithoutRoot =
dispatchableSubPlan.getQueryStagesWithoutRoot();
execute(requestId, plansWithoutRoot, timeoutMs, queryOptions,
requestSender, serversOut,
(response, serverInstance) -> {
- if
(response.containsMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR))
{
- cancel(requestId, serversOut);
- throw new RuntimeException(
- String.format("Unable to execute query plan for request: %d on
server: %s, ERROR: %s", requestId,
- serverInstance,
-
response.getMetadataOrDefault(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR,
- "null")));
- }
- });
+ if (response.containsMetadata(ServerResponseStatus.STATUS_ERROR)) {
+ cancel(requestId, serversOut);
+ throw new RuntimeException(
+ String.format("Unable to execute query plan for request: %d on
server: %s, ERROR: %s", requestId,
+ serverInstance,
response.getMetadataOrDefault(ServerResponseStatus.STATUS_ERROR, "null")));
+ }
+ });
if (isQueryCancellationEnabled()) {
_serversByQuery.put(requestId, serversOut);
}
@@ -308,17 +304,13 @@ public class QueryDispatcher {
return _serversByQuery != null;
}
- private <E> void execute(long requestId, Set<DispatchablePlanFragment>
stagePlans,
- long timeoutMs, Map<String, String> queryOptions,
- SendRequest<Worker.QueryRequest, E> sendRequest,
Set<QueryServerInstance> serverInstancesOut,
- BiConsumer<E, QueryServerInstance> resultConsumer)
+ private <E> void execute(long requestId, Set<DispatchablePlanFragment>
stagePlans, long timeoutMs,
+ Map<String, String> queryOptions, SendRequest<Worker.QueryRequest, E>
sendRequest,
+ Set<QueryServerInstance> serverInstancesOut, BiConsumer<E,
QueryServerInstance> resultConsumer)
throws ExecutionException, InterruptedException, TimeoutException {
-
Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS);
- Map<DispatchablePlanFragment, StageInfo> stageInfos =
- serializePlanFragments(stagePlans, serverInstancesOut, deadline);
-
+ Map<DispatchablePlanFragment, StageInfo> stageInfos =
serializePlanFragments(stagePlans, serverInstancesOut);
if (serverInstancesOut.isEmpty()) {
return;
}
@@ -372,10 +364,8 @@ public class QueryDispatcher {
// subsequent query failures
if
(getOrCreateDispatchClient(resp.getServerInstance()).getChannel().getState(false)
!= ConnectivityState.READY) {
- _failureDetector.markServerUnhealthy(
- resp.getServerInstance().getInstanceId(),
- resp.getServerInstance().getHostname()
- );
+
_failureDetector.markServerUnhealthy(resp.getServerInstance().getInstanceId(),
+ resp.getServerInstance().getHostname());
}
throw new RuntimeException(
String.format("Error dispatching query: %d to server: %s",
requestId, resp.getServerInstance()),
@@ -399,24 +389,24 @@ public class QueryDispatcher {
RequestContext requestContext, String instanceId) {
Map<String, String> result = new HashMap<>();
TimeBuckets timeBuckets = dispatchablePlan.getTimeBuckets();
- result.put(WorkerRequestMetadataKeys.LANGUAGE,
dispatchablePlan.getLanguage());
- result.put(WorkerRequestMetadataKeys.START_TIME_SECONDS,
Long.toString(timeBuckets.getTimeBuckets()[0]));
- result.put(WorkerRequestMetadataKeys.WINDOW_SECONDS,
Long.toString(timeBuckets.getBucketSize().getSeconds()));
- result.put(WorkerRequestMetadataKeys.NUM_ELEMENTS,
Long.toString(timeBuckets.getTimeBuckets().length));
- result.put(WorkerRequestMetadataKeys.DEADLINE_MS,
Long.toString(deadlineMs));
+ result.put(MetadataKeys.TimeSeries.LANGUAGE,
dispatchablePlan.getLanguage());
+ result.put(MetadataKeys.TimeSeries.START_TIME_SECONDS,
Long.toString(timeBuckets.getTimeBuckets()[0]));
+ result.put(MetadataKeys.TimeSeries.WINDOW_SECONDS,
Long.toString(timeBuckets.getBucketSize().getSeconds()));
+ result.put(MetadataKeys.TimeSeries.NUM_ELEMENTS,
Long.toString(timeBuckets.getTimeBuckets().length));
+ result.put(MetadataKeys.TimeSeries.DEADLINE_MS, Long.toString(deadlineMs));
Map<String, List<String>> leafIdToSegments =
dispatchablePlan.getLeafIdToSegmentsByInstanceId().get(instanceId);
for (Map.Entry<String, List<String>> entry : leafIdToSegments.entrySet()) {
-
result.put(WorkerRequestMetadataKeys.encodeSegmentListKey(entry.getKey()),
String.join(",", entry.getValue()));
+ result.put(MetadataKeys.TimeSeries.encodeSegmentListKey(entry.getKey()),
String.join(",", entry.getValue()));
}
- result.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
Long.toString(requestContext.getRequestId()));
- result.put(CommonConstants.Query.Request.MetadataKeys.BROKER_ID,
requestContext.getBrokerId());
+ result.put(MetadataKeys.REQUEST_ID,
Long.toString(requestContext.getRequestId()));
+ result.put(MetadataKeys.BROKER_ID, requestContext.getBrokerId());
return result;
}
private static Worker.QueryRequest createRequest(QueryServerInstance
serverInstance,
Map<DispatchablePlanFragment, StageInfo> stageInfos, ByteString
protoRequestMetadata) {
Worker.QueryRequest.Builder requestBuilder =
Worker.QueryRequest.newBuilder();
-
requestBuilder.setVersion(CommonConstants.MultiStageQueryRunner.PlanVersions.V1);
+ requestBuilder.setVersion(PlanVersions.V1);
for (Map.Entry<DispatchablePlanFragment, StageInfo> entry :
stageInfos.entrySet()) {
DispatchablePlanFragment stagePlan = entry.getKey();
@@ -433,13 +423,11 @@ public class QueryDispatcher {
Worker.StagePlan requestStagePlan = Worker.StagePlan.newBuilder()
.setRootNode(stageInfo._rootNode)
- .setStageMetadata(
- Worker.StageMetadata.newBuilder()
- .setStageId(stagePlan.getPlanFragment().getFragmentId())
- .addAllWorkerMetadata(protoWorkerMetadataList)
- .setCustomProperty(stageInfo._customProperty)
- .build()
- )
+ .setStageMetadata(Worker.StageMetadata.newBuilder()
+ .setStageId(stagePlan.getPlanFragment().getFragmentId())
+ .addAllWorkerMetadata(protoWorkerMetadataList)
+ .setCustomProperty(stageInfo._customProperty)
+ .build())
.build();
requestBuilder.addStagePlan(requestStagePlan);
}
@@ -451,19 +439,17 @@ public class QueryDispatcher {
private static Map<String, String> prepareRequestMetadata(long requestId,
String cid,
Map<String, String> queryOptions, Deadline deadline) {
Map<String, String> requestMetadata = new HashMap<>();
- requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
Long.toString(requestId));
-
requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.CORRELATION_ID,
cid);
-
requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
- Long.toString(deadline.timeRemaining(TimeUnit.MILLISECONDS)));
-
requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS,
+ requestMetadata.put(MetadataKeys.REQUEST_ID, Long.toString(requestId));
+ requestMetadata.put(MetadataKeys.CORRELATION_ID, cid);
+ requestMetadata.put(QueryOptionKey.TIMEOUT_MS,
Long.toString(deadline.timeRemaining(TimeUnit.MILLISECONDS)));
+ requestMetadata.put(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS,
Long.toString(QueryThreadContext.getPassiveDeadlineMs() -
QueryThreadContext.getActiveDeadlineMs()));
requestMetadata.putAll(queryOptions);
return requestMetadata;
}
- private Map<DispatchablePlanFragment, StageInfo> serializePlanFragments(
- Set<DispatchablePlanFragment> stagePlans,
- Set<QueryServerInstance> serverInstances, Deadline deadline)
+ private Map<DispatchablePlanFragment, StageInfo>
serializePlanFragments(Set<DispatchablePlanFragment> stagePlans,
+ Set<QueryServerInstance> serverInstances)
throws InterruptedException, ExecutionException {
List<CompletableFuture<Pair<DispatchablePlanFragment, StageInfo>>>
stageInfoFutures =
new ArrayList<>(stagePlans.size());
@@ -530,7 +516,6 @@ public class QueryDispatcher {
return true;
}
-
@Nullable
private MultiStageQueryStats cancelWithStats(long requestId, @Nullable
Set<QueryServerInstance> servers) {
if (servers == null) {
@@ -539,8 +524,8 @@ public class QueryDispatcher {
Deadline deadline = Deadline.after(_cancelTimeout.toMillis(),
TimeUnit.MILLISECONDS);
SendRequest<Long, Worker.CancelResponse> sendRequest =
DispatchClient::cancel;
- BlockingQueue<AsyncResponse<Worker.CancelResponse>> dispatchCallbacks =
dispatch(sendRequest, servers, deadline,
- serverInstance -> requestId);
+ BlockingQueue<AsyncResponse<Worker.CancelResponse>> dispatchCallbacks =
+ dispatch(sendRequest, servers, deadline, serverInstance -> requestId);
MultiStageQueryStats stats = MultiStageQueryStats.emptyStats(0);
StatMap<BaseMailboxReceiveOperator.StatKey> rootStats = new
StatMap<>(BaseMailboxReceiveOperator.StatKey.class);
@@ -549,8 +534,7 @@ public class QueryDispatcher {
processResults(requestId, servers.size(), (response, server) -> {
Map<Integer, ByteString> statsByStage = response.getStatsByStageMap();
for (Map.Entry<Integer, ByteString> entry : statsByStage.entrySet()) {
- try (InputStream is = entry.getValue().newInput();
- DataInputStream dis = new DataInputStream(is)) {
+ try (InputStream is = entry.getValue().newInput(); DataInputStream
dis = new DataInputStream(is)) {
MultiStageQueryStats.StageStats.Closed closed =
MultiStageQueryStats.StageStats.Closed.deserialize(dis);
stats.mergeUpstream(entry.getKey(), closed);
} catch (Exception e) {
@@ -585,19 +569,10 @@ public class QueryDispatcher {
/// Concatenates the results of the sub-plan and returns a [QueryResult]
with the concatenated result.
///
/// This method assumes the caller thread is a query thread and therefore
[QueryThreadContext] has been initialized.
- private static QueryResult runReducer(
- DispatchableSubPlan subPlan,
- Map<String, String> queryOptions,
- MailboxService mailboxService
- ) {
- return runReducer(
- QueryThreadContext.getRequestId(),
- subPlan,
- QueryThreadContext.getActiveDeadlineMs(),
- QueryThreadContext.getPassiveDeadlineMs(),
- queryOptions,
- mailboxService
- );
+ private static QueryResult runReducer(DispatchableSubPlan subPlan,
Map<String, String> queryOptions,
+ MailboxService mailboxService) {
+ return runReducer(QueryThreadContext.getRequestId(), subPlan,
QueryThreadContext.getActiveDeadlineMs(),
+ QueryThreadContext.getPassiveDeadlineMs(), queryOptions,
mailboxService);
}
/// Concatenates the results of the sub-plan and returns a [QueryResult]
with the concatenated result.
@@ -607,26 +582,22 @@ public class QueryDispatcher {
///
/// Remember that in MSE there is no actual reduce but rather a single stage
that concatenates the results.
@VisibleForTesting
- public static QueryResult runReducer(long requestId,
- DispatchableSubPlan subPlan,
- long activeDeadlineMs,
- long passiveDeadlineMs,
- Map<String, String> queryOptions,
- MailboxService mailboxService) {
+ public static QueryResult runReducer(long requestId, DispatchableSubPlan
subPlan, long activeDeadlineMs,
+ long passiveDeadlineMs, Map<String, String> queryOptions, MailboxService
mailboxService) {
long startTimeMs = System.currentTimeMillis();
// NOTE: Reduce stage is always stage 0
DispatchablePlanFragment stagePlan = subPlan.getQueryStageMap().get(0);
PlanFragment planFragment = stagePlan.getPlanFragment();
PlanNode rootNode = planFragment.getFragmentRoot();
List<WorkerMetadata> workerMetadata = stagePlan.getWorkerMetadataList();
- Preconditions.checkState(workerMetadata.size() == 1,
- "Expecting single worker for reduce stage, got: %s",
workerMetadata.size());
+ Preconditions.checkState(workerMetadata.size() == 1, "Expecting single
worker for reduce stage, got: %s",
+ workerMetadata.size());
StageMetadata stageMetadata = new StageMetadata(0, workerMetadata,
stagePlan.getCustomProperties());
ThreadExecutionContext parentContext =
Tracing.getThreadAccountant().getThreadExecutionContext();
OpChainExecutionContext executionContext =
- new OpChainExecutionContext(mailboxService, requestId,
activeDeadlineMs, passiveDeadlineMs,
- queryOptions, stageMetadata, workerMetadata.get(0), null,
parentContext, true);
+ new OpChainExecutionContext(mailboxService, requestId,
activeDeadlineMs, passiveDeadlineMs, queryOptions,
+ stageMetadata, workerMetadata.get(0), null, parentContext, true);
PairList<Integer, String> resultFields = subPlan.getQueryResultFields();
DataSchema sourceSchema = rootNode.getDataSchema();
@@ -643,9 +614,9 @@ public class QueryDispatcher {
ArrayList<Object[]> resultRows = new ArrayList<>();
MseBlock block;
MultiStageQueryStats queryStats;
- try (
- QueryThreadContext.CloseableContext mseCloseableCtx =
MseWorkerThreadContext.open();
- OpChain opChain = PlanNodeToOpChain.convert(rootNode,
executionContext, (a, b) -> { })) {
+ try (QueryThreadContext.CloseableContext mseCloseableCtx =
MseWorkerThreadContext.open();
+ OpChain opChain = PlanNodeToOpChain.convert(rootNode,
executionContext, (a, b) -> {
+ })) {
MseWorkerThreadContext.setStageId(0);
MseWorkerThreadContext.setWorkerId(0);
MultiStageOperator rootOperator = opChain.getRoot();
@@ -692,11 +663,10 @@ public class QueryDispatcher {
error = queryExceptions.entrySet().iterator().next();
errorMessage = "Received 1 error" + from + ": " + error.getValue();
} else {
- error = queryExceptions.entrySet().stream()
- .max(QueryDispatcher::compareErrors)
- .orElseThrow();
- errorMessage = "Received " + queryExceptions.size() + " errors" + from
+ ". "
- + "The one with highest priority is: " + error.getValue();
+ error =
queryExceptions.entrySet().stream().max(QueryDispatcher::compareErrors).orElseThrow();
+ errorMessage =
+ "Received " + queryExceptions.size() + " errors" + from + ". " +
"The one with highest priority is: "
+ + error.getValue();
}
QueryProcessingException processingEx = new
QueryProcessingException(error.getKey().getId(), errorMessage);
return new QueryResult(processingEx, queryStats,
System.currentTimeMillis() - startTimeMs);
@@ -748,10 +718,11 @@ public class QueryDispatcher {
Map<String, BlockingQueue<Object>> receiversByPlanId = new HashMap<>();
populateConsumers(brokerFragment, receiversByPlanId);
// Compile brokerFragment to get operators
- TimeSeriesExecutionContext brokerExecutionContext = new
TimeSeriesExecutionContext(plan.getLanguage(),
- plan.getTimeBuckets(), deadlineMs, Collections.emptyMap(),
Collections.emptyMap(), receiversByPlanId);
- BaseTimeSeriesOperator brokerOperator =
_timeSeriesBrokerPlanVisitor.compile(brokerFragment,
- brokerExecutionContext, plan.getNumInputServersForExchangePlanNode());
+ TimeSeriesExecutionContext brokerExecutionContext =
+ new TimeSeriesExecutionContext(plan.getLanguage(),
plan.getTimeBuckets(), deadlineMs, Map.of(), Map.of(),
+ receiversByPlanId);
+ BaseTimeSeriesOperator brokerOperator =
_timeSeriesBrokerPlanVisitor.compile(brokerFragment, brokerExecutionContext,
+ plan.getNumInputServersForExchangePlanNode());
// Create dispatch observer for each query server
for (TimeSeriesQueryServerInstance serverInstance :
plan.getQueryServerInstances()) {
String serverId = serverInstance.getInstanceId();
@@ -761,10 +732,9 @@ public class QueryDispatcher {
Worker.TimeSeriesQueryRequest request =
Worker.TimeSeriesQueryRequest.newBuilder()
.addAllDispatchPlan(plan.getSerializedServerFragments())
.putAllMetadata(initializeTimeSeriesMetadataMap(plan, deadlineMs,
requestContext, serverId))
- .putMetadata(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
Long.toString(requestId))
+ .putMetadata(MetadataKeys.REQUEST_ID, Long.toString(requestId))
.build();
- TimeSeriesDispatchObserver
- dispatchObserver = new TimeSeriesDispatchObserver(receiversByPlanId);
+ TimeSeriesDispatchObserver dispatchObserver = new
TimeSeriesDispatchObserver(receiversByPlanId);
getOrCreateTimeSeriesDispatchClient(serverInstance).submit(request,
deadline, dispatchObserver);
}
// Execute broker fragment
@@ -853,7 +823,7 @@ public class QueryDispatcher {
}
private interface SendRequest<R, E> {
- void send(DispatchClient dispatchClient, R request, QueryServerInstance
serverInstance,
- Deadline deadline, Consumer<AsyncResponse<E>> callbackConsumer);
+ void send(DispatchClient dispatchClient, R request, QueryServerInstance
serverInstance, Deadline deadline,
+ Consumer<AsyncResponse<E>> callbackConsumer);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
index 599ce414c0c..f285418fc7b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
@@ -23,7 +23,7 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.query.runtime.timeseries.serde.TimeSeriesBlockSerde;
-import
org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerResponseMetadataKeys;
+import
org.apache.pinot.spi.utils.CommonConstants.Query.Response.MetadataKeys.TimeSeries;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,13 +50,13 @@ public class TimeSeriesDispatchObserver implements
StreamObserver<Worker.TimeSer
@Override
public void onNext(Worker.TimeSeriesResponse timeSeriesResponse) {
- if
(timeSeriesResponse.containsMetadata(WorkerResponseMetadataKeys.ERROR_TYPE)) {
- String errorType =
timeSeriesResponse.getMetadataOrDefault(WorkerResponseMetadataKeys.ERROR_TYPE,
"");
- String errorMessage =
timeSeriesResponse.getMetadataOrDefault(WorkerResponseMetadataKeys.ERROR_MESSAGE,
"");
+ if (timeSeriesResponse.containsMetadata(TimeSeries.ERROR_TYPE)) {
+ String errorType =
timeSeriesResponse.getMetadataOrDefault(TimeSeries.ERROR_TYPE, "");
+ String errorMessage =
timeSeriesResponse.getMetadataOrDefault(TimeSeries.ERROR_MESSAGE, "");
onError(new Throwable(String.format("Error in server (type: %s): %s",
errorType, errorMessage)));
return;
}
- String planId =
timeSeriesResponse.getMetadataMap().get(WorkerResponseMetadataKeys.PLAN_ID);
+ String planId =
timeSeriesResponse.getMetadataMap().get(TimeSeries.PLAN_ID);
TimeSeriesBlock block = null;
Throwable error = null;
try {
@@ -66,8 +66,9 @@ public class TimeSeriesDispatchObserver implements
StreamObserver<Worker.TimeSer
}
BlockingQueue<Object> receiverForPlanId =
_exchangeReceiversByPlanId.get(planId);
if (receiverForPlanId == null) {
- String message = String.format("Receiver is not initialized for planId:
%s. Receivers exist only for planIds: %s",
- planId, _exchangeReceiversByPlanId.keySet());
+ String message =
+ String.format("Receiver is not initialized for planId: %s. Receivers
exist only for planIds: %s", planId,
+ _exchangeReceiversByPlanId.keySet());
LOGGER.warn(message);
onError(new IllegalStateException(message));
} else {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 10cf20e7ab4..5697eef08e6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1801,6 +1801,29 @@ public class CommonConstants {
public static final String ENABLE_TRACE = "enableTrace";
public static final String ENABLE_STREAMING = "enableStreaming";
public static final String PAYLOAD_TYPE = "payloadType";
+
+ public static class TimeSeries {
+ public static final String LANGUAGE = "language";
+ public static final String START_TIME_SECONDS = "startTimeSeconds";
+ public static final String WINDOW_SECONDS = "windowSeconds";
+ public static final String NUM_ELEMENTS = "numElements";
+ public static final String DEADLINE_MS = "deadlineMs";
+
+ public static final String SEGMENT_MAP_ENTRY_PREFIX =
"$segmentMapEntry#";
+
+ public static boolean isKeySegmentList(String key) {
+ return key.startsWith(SEGMENT_MAP_ENTRY_PREFIX);
+ }
+
+ public static String encodeSegmentListKey(String planId) {
+ return SEGMENT_MAP_ENTRY_PREFIX + planId;
+ }
+
+ /// Returns the plan-id corresponding to the encoded key.
+ public static String decodeSegmentListKey(String key) {
+ return key.substring(SEGMENT_MAP_ENTRY_PREFIX.length());
+ }
+ }
}
public static class PayloadType {
@@ -1812,6 +1835,12 @@ public class CommonConstants {
public static class Response {
public static class MetadataKeys {
public static final String RESPONSE_TYPE = "responseType";
+
+ public static class TimeSeries {
+ public static final String PLAN_ID = "planId";
+ public static final String ERROR_TYPE = "errorType";
+ public static final String ERROR_MESSAGE = "error";
+ }
}
public static class ResponseType {
diff --git
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
deleted file mode 100644
index eaf5b363861..00000000000
---
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.tsdb.planner;
-
-public class TimeSeriesPlanConstants {
- private TimeSeriesPlanConstants() {
- }
-
- public static class WorkerRequestMetadataKeys {
- private static final String SEGMENT_MAP_ENTRY_PREFIX = "$segmentMapEntry#";
-
- private WorkerRequestMetadataKeys() {
- }
-
- public static final String LANGUAGE = "language";
- public static final String START_TIME_SECONDS = "startTimeSeconds";
- public static final String WINDOW_SECONDS = "windowSeconds";
- public static final String NUM_ELEMENTS = "numElements";
- public static final String DEADLINE_MS = "deadlineMs";
-
- public static boolean isKeySegmentList(String key) {
- return key.startsWith(SEGMENT_MAP_ENTRY_PREFIX);
- }
-
- public static String encodeSegmentListKey(String planId) {
- return SEGMENT_MAP_ENTRY_PREFIX + planId;
- }
-
- /**
- * Returns the plan-id corresponding to the encoded key.
- */
- public static String decodeSegmentListKey(String key) {
- return key.substring(SEGMENT_MAP_ENTRY_PREFIX.length());
- }
- }
-
- public static class WorkerResponseMetadataKeys {
- public static final String PLAN_ID = "planId";
- public static final String ERROR_TYPE = "errorType";
- public static final String ERROR_MESSAGE = "error";
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]