This is an automated email from the ASF dual-hosted git repository.
pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a8a0a33864 [timeseries] Fix Server Selection Bug + Enforce Timeout
(#14426)
a8a0a33864 is described below
commit a8a0a3386402aa9af13c3e046e2e4be80f0f27bf
Author: Ankit Sultana <[email protected]>
AuthorDate: Tue Nov 12 04:17:06 2024 -0600
[timeseries] Fix Server Selection Bug + Enforce Timeout (#14426)
* [timeseries] Fix Server Selection Bug + Enforce Timeout
* pass requestId and brokerId to servers
* undo the 2 server quickstart change
---
.../requesthandler/TimeSeriesRequestHandler.java | 2 +
.../apache/pinot/query/runtime/QueryRunner.java | 53 +++++++++++++---------
.../timeseries/PhysicalTimeSeriesPlanVisitor.java | 20 ++++++--
.../timeseries/TimeSeriesExecutionContext.java | 14 +++++-
.../query/service/dispatch/QueryDispatcher.java | 13 ++++--
.../PhysicalTimeSeriesPlanVisitorTest.java | 9 +++-
.../tsdb/planner/TimeSeriesPlanConstants.java | 1 +
7 files changed, 81 insertions(+), 31 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index ade9d6c9a6..1773fca957 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -90,6 +90,8 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
@Override
public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang,
String rawQueryParamString,
RequestContext requestContext) {
+ requestContext.setBrokerId(_brokerId);
+ requestContext.setRequestId(_requestIdGenerator.get());
RangeTimeSeriesRequest timeSeriesRequest = null;
try {
timeSeriesRequest = buildRangeTimeSeriesRequest(lang,
rawQueryParamString);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index a0739162a6..65c8d4118d 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query.runtime;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import java.nio.charset.StandardCharsets;
@@ -259,11 +260,14 @@ public class QueryRunner {
responseObserver.onCompleted();
};
try {
+ final long timeoutMs = extractTimeoutMs(metadata);
+ Preconditions.checkState(timeoutMs > 0,
+ "Query timed out before getting processed in server. Remaining time:
%s", timeoutMs);
// Deserialize plan, and compile to create a tree of operators.
BaseTimeSeriesPlanNode rootNode =
TimeSeriesPlanSerde.deserialize(serializedPlan);
TimeSeriesExecutionContext context = new TimeSeriesExecutionContext(
metadata.get(WorkerRequestMetadataKeys.LANGUAGE),
extractTimeBuckets(metadata),
- extractPlanToSegmentMap(metadata));
+ extractPlanToSegmentMap(metadata), timeoutMs, metadata);
BaseTimeSeriesOperator operator =
PhysicalTimeSeriesPlanVisitor.INSTANCE.compile(rootNode, context);
// Run the operator using the same executor service as
OpChainSchedulerService
_executorService.submit(() -> {
@@ -286,26 +290,6 @@ public class QueryRunner {
}
}
- public TimeBuckets extractTimeBuckets(Map<String, String> metadataMap) {
- long startTimeSeconds =
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.START_TIME_SECONDS));
- long windowSeconds =
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.WINDOW_SECONDS));
- int numElements =
Integer.parseInt(metadataMap.get(WorkerRequestMetadataKeys.NUM_ELEMENTS));
- return TimeBuckets.ofSeconds(startTimeSeconds,
Duration.ofSeconds(windowSeconds), numElements);
- }
-
- public Map<String, List<String>> extractPlanToSegmentMap(Map<String, String>
metadataMap) {
- Map<String, List<String>> result = new HashMap<>();
- for (var entry : metadataMap.entrySet()) {
- if (WorkerRequestMetadataKeys.isKeySegmentList(entry.getKey())) {
- String planId =
WorkerRequestMetadataKeys.decodeSegmentListKey(entry.getKey());
- String[] segments = entry.getValue().split(",");
- result.put(planId,
-
Stream.of(segments).map(String::strip).collect(Collectors.toList()));
- }
- }
- return result;
- }
-
private Map<String, String> consolidateMetadata(Map<String, String>
customProperties,
Map<String, String> requestMetadata) {
Map<String, String> opChainMetadata = new HashMap<>();
@@ -415,4 +399,31 @@ public class QueryRunner {
return node;
}
}
+
+ // Time series related utility methods below
+
+ private long extractTimeoutMs(Map<String, String> metadataMap) {
+ long deadlineMs =
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.DEADLINE_MS));
+ return deadlineMs - System.currentTimeMillis();
+ }
+
+ private TimeBuckets extractTimeBuckets(Map<String, String> metadataMap) {
+ long startTimeSeconds =
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.START_TIME_SECONDS));
+ long windowSeconds =
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.WINDOW_SECONDS));
+ int numElements =
Integer.parseInt(metadataMap.get(WorkerRequestMetadataKeys.NUM_ELEMENTS));
+ return TimeBuckets.ofSeconds(startTimeSeconds,
Duration.ofSeconds(windowSeconds), numElements);
+ }
+
+ private Map<String, List<String>> extractPlanToSegmentMap(Map<String,
String> metadataMap) {
+ Map<String, List<String>> result = new HashMap<>();
+ for (var entry : metadataMap.entrySet()) {
+ if (WorkerRequestMetadataKeys.isKeySegmentList(entry.getKey())) {
+ String planId =
WorkerRequestMetadataKeys.decodeSegmentListKey(entry.getKey());
+ String[] segments = entry.getValue().split(",");
+ result.put(planId,
+
Stream.of(segments).map(String::strip).collect(Collectors.toList()));
+ }
+ }
+ return result;
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
index dc7c704f29..d300232625 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
@@ -18,8 +18,12 @@
*/
package org.apache.pinot.query.runtime.timeseries;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -30,6 +34,8 @@ import
org.apache.pinot.common.request.context.TimeSeriesContext;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
@@ -78,10 +84,11 @@ public class PhysicalTimeSeriesPlanVisitor {
public ServerQueryRequest
compileLeafServerQueryRequest(LeafTimeSeriesPlanNode leafNode, List<String>
segments,
TimeSeriesExecutionContext context) {
return new ServerQueryRequest(compileQueryContext(leafNode, context),
- segments, /* TODO: Pass metadata from request */
Collections.emptyMap(), _serverMetrics);
+ segments, getServerQueryRequestMetadataMap(context), _serverMetrics);
}
- public QueryContext compileQueryContext(LeafTimeSeriesPlanNode leafNode,
TimeSeriesExecutionContext context) {
+ @VisibleForTesting
+ QueryContext compileQueryContext(LeafTimeSeriesPlanNode leafNode,
TimeSeriesExecutionContext context) {
FilterContext filterContext =
RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression(
leafNode.getEffectiveFilter(context.getInitialTimeBuckets())));
@@ -96,10 +103,17 @@ public class PhysicalTimeSeriesPlanVisitor {
.setFilter(filterContext)
.setGroupByExpressions(groupByExpressions)
.setSelectExpressions(Collections.emptyList())
- .setQueryOptions(Collections.emptyMap())
+ .setQueryOptions(ImmutableMap.of(QueryOptionKey.TIMEOUT_MS,
Long.toString(context.getTimeoutMs())))
.setAliasList(Collections.emptyList())
.setTimeSeriesContext(timeSeriesContext)
.setLimit(Integer.MAX_VALUE)
.build();
}
+
+ Map<String, String>
getServerQueryRequestMetadataMap(TimeSeriesExecutionContext context) {
+ Map<String, String> result = new HashMap<>();
+ result.put(MetadataKeys.BROKER_ID,
context.getMetadataMap().get(MetadataKeys.BROKER_ID));
+ result.put(MetadataKeys.REQUEST_ID,
context.getMetadataMap().get(MetadataKeys.REQUEST_ID));
+ return result;
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java
index 4d093af3ed..544443ab1c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java
@@ -27,12 +27,16 @@ public class TimeSeriesExecutionContext {
private final String _language;
private final TimeBuckets _initialTimeBuckets;
private final Map<String, List<String>> _planIdToSegmentsMap;
+ private final long _timeoutMs;
+ private final Map<String, String> _metadataMap;
public TimeSeriesExecutionContext(String language, TimeBuckets
initialTimeBuckets,
- Map<String, List<String>> planIdToSegmentsMap) {
+ Map<String, List<String>> planIdToSegmentsMap, long timeoutMs,
Map<String, String> metadataMap) {
_language = language;
_initialTimeBuckets = initialTimeBuckets;
_planIdToSegmentsMap = planIdToSegmentsMap;
+ _timeoutMs = timeoutMs;
+ _metadataMap = metadataMap;
}
public String getLanguage() {
@@ -46,4 +50,12 @@ public class TimeSeriesExecutionContext {
public Map<String, List<String>> getPlanIdToSegmentsMap() {
return _planIdToSegmentsMap;
}
+
+ public long getTimeoutMs() {
+ return _timeoutMs;
+ }
+
+ public Map<String, String> getMetadataMap() {
+ return _metadataMap;
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index fc10c96346..aac49b5b63 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -175,7 +175,7 @@ public class QueryDispatcher {
long requestId = context.getRequestId();
BlockingQueue<AsyncQueryTimeSeriesDispatchResponse> receiver = new
ArrayBlockingQueue<>(10);
try {
- submit(requestId, plan, timeoutMs, queryOptions, receiver::offer);
+ submit(requestId, plan, timeoutMs, queryOptions, context,
receiver::offer);
AsyncQueryTimeSeriesDispatchResponse received = receiver.poll(timeoutMs,
TimeUnit.MILLISECONDS);
if (received == null) {
return PinotBrokerTimeSeriesResponse.newErrorResponse(
@@ -285,13 +285,14 @@ public class QueryDispatcher {
}
void submit(long requestId, TimeSeriesDispatchablePlan plan, long timeoutMs,
Map<String, String> queryOptions,
- Consumer<AsyncQueryTimeSeriesDispatchResponse> receiver)
+ RequestContext requestContext,
Consumer<AsyncQueryTimeSeriesDispatchResponse> receiver)
throws Exception {
Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS);
+ long deadlineMs = System.currentTimeMillis() + timeoutMs;
String serializedPlan = plan.getSerializedPlan();
Worker.TimeSeriesQueryRequest request =
Worker.TimeSeriesQueryRequest.newBuilder()
.setDispatchPlan(ByteString.copyFrom(serializedPlan,
StandardCharsets.UTF_8))
- .putAllMetadata(initializeTimeSeriesMetadataMap(plan))
+ .putAllMetadata(initializeTimeSeriesMetadataMap(plan, deadlineMs,
requestContext))
.putMetadata(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
Long.toString(requestId))
.build();
getOrCreateTimeSeriesDispatchClient(plan.getQueryServerInstance()).submit(request,
@@ -300,16 +301,20 @@ public class QueryDispatcher {
deadline, receiver::accept);
};
- Map<String, String>
initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan dispatchablePlan) {
+ Map<String, String>
initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan dispatchablePlan,
long deadlineMs,
+ RequestContext requestContext) {
Map<String, String> result = new HashMap<>();
TimeBuckets timeBuckets = dispatchablePlan.getTimeBuckets();
result.put(WorkerRequestMetadataKeys.LANGUAGE,
dispatchablePlan.getLanguage());
result.put(WorkerRequestMetadataKeys.START_TIME_SECONDS,
Long.toString(timeBuckets.getTimeBuckets()[0]));
result.put(WorkerRequestMetadataKeys.WINDOW_SECONDS,
Long.toString(timeBuckets.getBucketSize().getSeconds()));
result.put(WorkerRequestMetadataKeys.NUM_ELEMENTS,
Long.toString(timeBuckets.getTimeBuckets().length));
+ result.put(WorkerRequestMetadataKeys.DEADLINE_MS,
Long.toString(deadlineMs));
for (Map.Entry<String, List<String>> entry :
dispatchablePlan.getPlanIdToSegments().entrySet()) {
result.put(WorkerRequestMetadataKeys.encodeSegmentListKey(entry.getKey()),
String.join(",", entry.getValue()));
}
+ result.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
Long.toString(requestContext.getRequestId()));
+ result.put(CommonConstants.Query.Request.MetadataKeys.BROKER_ID,
requestContext.getBrokerId());
return result;
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
index c9a687c5ff..fabfde6829 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
@@ -22,6 +22,7 @@ import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
@@ -32,6 +33,8 @@ import static org.testng.Assert.assertNotNull;
public class PhysicalTimeSeriesPlanVisitorTest {
+ private static final int DUMMY_TIMEOUT_MS = 10_000;
+
@Test
public void testCompileQueryContext() {
final String planId = "id";
@@ -43,7 +46,7 @@ public class PhysicalTimeSeriesPlanVisitorTest {
{
TimeSeriesExecutionContext context =
new TimeSeriesExecutionContext("m3ql", TimeBuckets.ofSeconds(1000L,
Duration.ofSeconds(10), 100),
- Collections.emptyMap());
+ Collections.emptyMap(), DUMMY_TIMEOUT_MS,
Collections.emptyMap());
LeafTimeSeriesPlanNode leafNode =
new LeafTimeSeriesPlanNode(planId, Collections.emptyList(),
tableName, timeColumn, TimeUnit.SECONDS, 0L,
filterExpr, "orderCount", aggInfo,
Collections.singletonList("cityName"));
@@ -55,12 +58,13 @@ public class PhysicalTimeSeriesPlanVisitorTest {
assertEquals(queryContext.getTimeSeriesContext().getValueExpression().getIdentifier(),
"orderCount");
assertEquals(queryContext.getFilter().toString(),
"(cityName = 'Chicago' AND orderTime > '990' AND orderTime <=
'1990')");
+
assertEquals(Long.parseLong(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS)),
DUMMY_TIMEOUT_MS);
}
// Case-2: With offset, complex group-by expression, complex value, and
non-empty filter
{
TimeSeriesExecutionContext context =
new TimeSeriesExecutionContext("m3ql", TimeBuckets.ofSeconds(1000L,
Duration.ofSeconds(10), 100),
- Collections.emptyMap());
+ Collections.emptyMap(), DUMMY_TIMEOUT_MS,
Collections.emptyMap());
LeafTimeSeriesPlanNode leafNode =
new LeafTimeSeriesPlanNode(planId, Collections.emptyList(),
tableName, timeColumn, TimeUnit.SECONDS, 10L,
filterExpr, "orderCount*2", aggInfo,
Collections.singletonList("concat(cityName, stateName, '-')"));
@@ -76,6 +80,7 @@ public class PhysicalTimeSeriesPlanVisitorTest {
assertNotNull(queryContext.getFilter());
assertEquals(queryContext.getFilter().toString(),
"(cityName = 'Chicago' AND orderTime > '980' AND orderTime <=
'1980')");
+
assertEquals(Long.parseLong(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS)),
DUMMY_TIMEOUT_MS);
}
}
}
diff --git
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
index f764ea720e..b6a2035b52 100644
---
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
+++
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
@@ -32,6 +32,7 @@ public class TimeSeriesPlanConstants {
public static final String START_TIME_SECONDS = "startTimeSeconds";
public static final String WINDOW_SECONDS = "windowSeconds";
public static final String NUM_ELEMENTS = "numElements";
+ public static final String DEADLINE_MS = "deadlineMs";
public static boolean isKeySegmentList(String key) {
return key.startsWith(SEGMENT_MAP_ENTRY_PREFIX);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]