This is an automated email from the ASF dual-hosted git repository.

pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a8a0a33864 [timeseries] Fix Server Selection Bug + Enforce Timeout 
(#14426)
a8a0a33864 is described below

commit a8a0a3386402aa9af13c3e046e2e4be80f0f27bf
Author: Ankit Sultana <[email protected]>
AuthorDate: Tue Nov 12 04:17:06 2024 -0600

    [timeseries] Fix Server Selection Bug + Enforce Timeout (#14426)
    
    * [timeseries] Fix Server Selection Bug + Enforce Timeout
    
    * pass requestId and brokerId to servers
    
    * undo the 2 server quickstart change
---
 .../requesthandler/TimeSeriesRequestHandler.java   |  2 +
 .../apache/pinot/query/runtime/QueryRunner.java    | 53 +++++++++++++---------
 .../timeseries/PhysicalTimeSeriesPlanVisitor.java  | 20 ++++++--
 .../timeseries/TimeSeriesExecutionContext.java     | 14 +++++-
 .../query/service/dispatch/QueryDispatcher.java    | 13 ++++--
 .../PhysicalTimeSeriesPlanVisitorTest.java         |  9 +++-
 .../tsdb/planner/TimeSeriesPlanConstants.java      |  1 +
 7 files changed, 81 insertions(+), 31 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index ade9d6c9a6..1773fca957 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -90,6 +90,8 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
   @Override
   public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, 
String rawQueryParamString,
       RequestContext requestContext) {
+    requestContext.setBrokerId(_brokerId);
+    requestContext.setRequestId(_requestIdGenerator.get());
     RangeTimeSeriesRequest timeSeriesRequest = null;
     try {
       timeSeriesRequest = buildRangeTimeSeriesRequest(lang, 
rawQueryParamString);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index a0739162a6..65c8d4118d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import io.grpc.stub.StreamObserver;
 import java.nio.charset.StandardCharsets;
@@ -259,11 +260,14 @@ public class QueryRunner {
       responseObserver.onCompleted();
     };
     try {
+      final long timeoutMs = extractTimeoutMs(metadata);
+      Preconditions.checkState(timeoutMs > 0,
+          "Query timed out before getting processed in server. Remaining time: 
%s", timeoutMs);
       // Deserialize plan, and compile to create a tree of operators.
       BaseTimeSeriesPlanNode rootNode = 
TimeSeriesPlanSerde.deserialize(serializedPlan);
       TimeSeriesExecutionContext context = new TimeSeriesExecutionContext(
           metadata.get(WorkerRequestMetadataKeys.LANGUAGE), 
extractTimeBuckets(metadata),
-          extractPlanToSegmentMap(metadata));
+          extractPlanToSegmentMap(metadata), timeoutMs, metadata);
       BaseTimeSeriesOperator operator = 
PhysicalTimeSeriesPlanVisitor.INSTANCE.compile(rootNode, context);
       // Run the operator using the same executor service as 
OpChainSchedulerService
       _executorService.submit(() -> {
@@ -286,26 +290,6 @@ public class QueryRunner {
     }
   }
 
-  public TimeBuckets extractTimeBuckets(Map<String, String> metadataMap) {
-    long startTimeSeconds = 
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.START_TIME_SECONDS));
-    long windowSeconds = 
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.WINDOW_SECONDS));
-    int numElements = 
Integer.parseInt(metadataMap.get(WorkerRequestMetadataKeys.NUM_ELEMENTS));
-    return TimeBuckets.ofSeconds(startTimeSeconds, 
Duration.ofSeconds(windowSeconds), numElements);
-  }
-
-  public Map<String, List<String>> extractPlanToSegmentMap(Map<String, String> 
metadataMap) {
-    Map<String, List<String>> result = new HashMap<>();
-    for (var entry : metadataMap.entrySet()) {
-      if (WorkerRequestMetadataKeys.isKeySegmentList(entry.getKey())) {
-        String planId = 
WorkerRequestMetadataKeys.decodeSegmentListKey(entry.getKey());
-        String[] segments = entry.getValue().split(",");
-        result.put(planId,
-            
Stream.of(segments).map(String::strip).collect(Collectors.toList()));
-      }
-    }
-    return result;
-  }
-
   private Map<String, String> consolidateMetadata(Map<String, String> 
customProperties,
       Map<String, String> requestMetadata) {
     Map<String, String> opChainMetadata = new HashMap<>();
@@ -415,4 +399,31 @@ public class QueryRunner {
       return node;
     }
   }
+
+  // Time series related utility methods below
+
+  private long extractTimeoutMs(Map<String, String> metadataMap) {
+    long deadlineMs = 
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.DEADLINE_MS));
+    return deadlineMs - System.currentTimeMillis();
+  }
+
+  private TimeBuckets extractTimeBuckets(Map<String, String> metadataMap) {
+    long startTimeSeconds = 
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.START_TIME_SECONDS));
+    long windowSeconds = 
Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.WINDOW_SECONDS));
+    int numElements = 
Integer.parseInt(metadataMap.get(WorkerRequestMetadataKeys.NUM_ELEMENTS));
+    return TimeBuckets.ofSeconds(startTimeSeconds, 
Duration.ofSeconds(windowSeconds), numElements);
+  }
+
+  private Map<String, List<String>> extractPlanToSegmentMap(Map<String, 
String> metadataMap) {
+    Map<String, List<String>> result = new HashMap<>();
+    for (var entry : metadataMap.entrySet()) {
+      if (WorkerRequestMetadataKeys.isKeySegmentList(entry.getKey())) {
+        String planId = 
WorkerRequestMetadataKeys.decodeSegmentListKey(entry.getKey());
+        String[] segments = entry.getValue().split(",");
+        result.put(planId,
+            
Stream.of(segments).map(String::strip).collect(Collectors.toList()));
+      }
+    }
+    return result;
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
index dc7c704f29..d300232625 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
@@ -18,8 +18,12 @@
  */
 package org.apache.pinot.query.runtime.timeseries;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -30,6 +34,8 @@ import 
org.apache.pinot.common.request.context.TimeSeriesContext;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
 import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
@@ -78,10 +84,11 @@ public class PhysicalTimeSeriesPlanVisitor {
   public ServerQueryRequest 
compileLeafServerQueryRequest(LeafTimeSeriesPlanNode leafNode, List<String> 
segments,
       TimeSeriesExecutionContext context) {
     return new ServerQueryRequest(compileQueryContext(leafNode, context),
-        segments, /* TODO: Pass metadata from request */ 
Collections.emptyMap(), _serverMetrics);
+        segments, getServerQueryRequestMetadataMap(context), _serverMetrics);
   }
 
-  public QueryContext compileQueryContext(LeafTimeSeriesPlanNode leafNode, 
TimeSeriesExecutionContext context) {
+  @VisibleForTesting
+  QueryContext compileQueryContext(LeafTimeSeriesPlanNode leafNode, 
TimeSeriesExecutionContext context) {
     FilterContext filterContext =
         RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression(
             leafNode.getEffectiveFilter(context.getInitialTimeBuckets())));
@@ -96,10 +103,17 @@ public class PhysicalTimeSeriesPlanVisitor {
         .setFilter(filterContext)
         .setGroupByExpressions(groupByExpressions)
         .setSelectExpressions(Collections.emptyList())
-        .setQueryOptions(Collections.emptyMap())
+        .setQueryOptions(ImmutableMap.of(QueryOptionKey.TIMEOUT_MS, 
Long.toString(context.getTimeoutMs())))
         .setAliasList(Collections.emptyList())
         .setTimeSeriesContext(timeSeriesContext)
         .setLimit(Integer.MAX_VALUE)
         .build();
   }
+
+  Map<String, String> 
getServerQueryRequestMetadataMap(TimeSeriesExecutionContext context) {
+    Map<String, String> result = new HashMap<>();
+    result.put(MetadataKeys.BROKER_ID, 
context.getMetadataMap().get(MetadataKeys.BROKER_ID));
+    result.put(MetadataKeys.REQUEST_ID, 
context.getMetadataMap().get(MetadataKeys.REQUEST_ID));
+    return result;
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java
index 4d093af3ed..544443ab1c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java
@@ -27,12 +27,16 @@ public class TimeSeriesExecutionContext {
   private final String _language;
   private final TimeBuckets _initialTimeBuckets;
   private final Map<String, List<String>> _planIdToSegmentsMap;
+  private final long _timeoutMs;
+  private final Map<String, String> _metadataMap;
 
   public TimeSeriesExecutionContext(String language, TimeBuckets 
initialTimeBuckets,
-      Map<String, List<String>> planIdToSegmentsMap) {
+      Map<String, List<String>> planIdToSegmentsMap, long timeoutMs, 
Map<String, String> metadataMap) {
     _language = language;
     _initialTimeBuckets = initialTimeBuckets;
     _planIdToSegmentsMap = planIdToSegmentsMap;
+    _timeoutMs = timeoutMs;
+    _metadataMap = metadataMap;
   }
 
   public String getLanguage() {
@@ -46,4 +50,12 @@ public class TimeSeriesExecutionContext {
   public Map<String, List<String>> getPlanIdToSegmentsMap() {
     return _planIdToSegmentsMap;
   }
+
+  public long getTimeoutMs() {
+    return _timeoutMs;
+  }
+
+  public Map<String, String> getMetadataMap() {
+    return _metadataMap;
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index fc10c96346..aac49b5b63 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -175,7 +175,7 @@ public class QueryDispatcher {
     long requestId = context.getRequestId();
     BlockingQueue<AsyncQueryTimeSeriesDispatchResponse> receiver = new 
ArrayBlockingQueue<>(10);
     try {
-      submit(requestId, plan, timeoutMs, queryOptions, receiver::offer);
+      submit(requestId, plan, timeoutMs, queryOptions, context, 
receiver::offer);
       AsyncQueryTimeSeriesDispatchResponse received = receiver.poll(timeoutMs, 
TimeUnit.MILLISECONDS);
       if (received == null) {
         return PinotBrokerTimeSeriesResponse.newErrorResponse(
@@ -285,13 +285,14 @@ public class QueryDispatcher {
   }
 
   void submit(long requestId, TimeSeriesDispatchablePlan plan, long timeoutMs, 
Map<String, String> queryOptions,
-      Consumer<AsyncQueryTimeSeriesDispatchResponse> receiver)
+      RequestContext requestContext, 
Consumer<AsyncQueryTimeSeriesDispatchResponse> receiver)
       throws Exception {
     Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS);
+    long deadlineMs = System.currentTimeMillis() + timeoutMs;
     String serializedPlan = plan.getSerializedPlan();
     Worker.TimeSeriesQueryRequest request = 
Worker.TimeSeriesQueryRequest.newBuilder()
         .setDispatchPlan(ByteString.copyFrom(serializedPlan, 
StandardCharsets.UTF_8))
-        .putAllMetadata(initializeTimeSeriesMetadataMap(plan))
+        .putAllMetadata(initializeTimeSeriesMetadataMap(plan, deadlineMs, 
requestContext))
         .putMetadata(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, 
Long.toString(requestId))
         .build();
     
getOrCreateTimeSeriesDispatchClient(plan.getQueryServerInstance()).submit(request,
@@ -300,16 +301,20 @@ public class QueryDispatcher {
         deadline, receiver::accept);
   };
 
-  Map<String, String> 
initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan dispatchablePlan) {
+  Map<String, String> 
initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan dispatchablePlan, 
long deadlineMs,
+      RequestContext requestContext) {
     Map<String, String> result = new HashMap<>();
     TimeBuckets timeBuckets = dispatchablePlan.getTimeBuckets();
     result.put(WorkerRequestMetadataKeys.LANGUAGE, 
dispatchablePlan.getLanguage());
     result.put(WorkerRequestMetadataKeys.START_TIME_SECONDS, 
Long.toString(timeBuckets.getTimeBuckets()[0]));
     result.put(WorkerRequestMetadataKeys.WINDOW_SECONDS, 
Long.toString(timeBuckets.getBucketSize().getSeconds()));
     result.put(WorkerRequestMetadataKeys.NUM_ELEMENTS, 
Long.toString(timeBuckets.getTimeBuckets().length));
+    result.put(WorkerRequestMetadataKeys.DEADLINE_MS, 
Long.toString(deadlineMs));
     for (Map.Entry<String, List<String>> entry : 
dispatchablePlan.getPlanIdToSegments().entrySet()) {
       
result.put(WorkerRequestMetadataKeys.encodeSegmentListKey(entry.getKey()), 
String.join(",", entry.getValue()));
     }
+    result.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, 
Long.toString(requestContext.getRequestId()));
+    result.put(CommonConstants.Query.Request.MetadataKeys.BROKER_ID, 
requestContext.getBrokerId());
     return result;
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
index c9a687c5ff..fabfde6829 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
@@ -22,6 +22,7 @@ import java.time.Duration;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import org.apache.pinot.tsdb.spi.AggInfo;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
 import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
@@ -32,6 +33,8 @@ import static org.testng.Assert.assertNotNull;
 
 
 public class PhysicalTimeSeriesPlanVisitorTest {
+  private static final int DUMMY_TIMEOUT_MS = 10_000;
+
   @Test
   public void testCompileQueryContext() {
     final String planId = "id";
@@ -43,7 +46,7 @@ public class PhysicalTimeSeriesPlanVisitorTest {
     {
       TimeSeriesExecutionContext context =
           new TimeSeriesExecutionContext("m3ql", TimeBuckets.ofSeconds(1000L, 
Duration.ofSeconds(10), 100),
-              Collections.emptyMap());
+              Collections.emptyMap(), DUMMY_TIMEOUT_MS, 
Collections.emptyMap());
       LeafTimeSeriesPlanNode leafNode =
           new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), 
tableName, timeColumn, TimeUnit.SECONDS, 0L,
               filterExpr, "orderCount", aggInfo, 
Collections.singletonList("cityName"));
@@ -55,12 +58,13 @@ public class PhysicalTimeSeriesPlanVisitorTest {
       
assertEquals(queryContext.getTimeSeriesContext().getValueExpression().getIdentifier(),
 "orderCount");
       assertEquals(queryContext.getFilter().toString(),
           "(cityName = 'Chicago' AND orderTime > '990' AND orderTime <= 
'1990')");
+      
assertEquals(Long.parseLong(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS)),
 DUMMY_TIMEOUT_MS);
     }
     // Case-2: With offset, complex group-by expression, complex value, and 
non-empty filter
     {
       TimeSeriesExecutionContext context =
           new TimeSeriesExecutionContext("m3ql", TimeBuckets.ofSeconds(1000L, 
Duration.ofSeconds(10), 100),
-              Collections.emptyMap());
+              Collections.emptyMap(), DUMMY_TIMEOUT_MS, 
Collections.emptyMap());
       LeafTimeSeriesPlanNode leafNode =
           new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), 
tableName, timeColumn, TimeUnit.SECONDS, 10L,
               filterExpr, "orderCount*2", aggInfo, 
Collections.singletonList("concat(cityName, stateName, '-')"));
@@ -76,6 +80,7 @@ public class PhysicalTimeSeriesPlanVisitorTest {
       assertNotNull(queryContext.getFilter());
       assertEquals(queryContext.getFilter().toString(),
           "(cityName = 'Chicago' AND orderTime > '980' AND orderTime <= 
'1980')");
+      
assertEquals(Long.parseLong(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS)),
 DUMMY_TIMEOUT_MS);
     }
   }
 }
diff --git 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
index f764ea720e..b6a2035b52 100644
--- 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
+++ 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanConstants.java
@@ -32,6 +32,7 @@ public class TimeSeriesPlanConstants {
     public static final String START_TIME_SECONDS = "startTimeSeconds";
     public static final String WINDOW_SECONDS = "windowSeconds";
     public static final String NUM_ELEMENTS = "numElements";
+    public static final String DEADLINE_MS = "deadlineMs";
 
     public static boolean isKeySegmentList(String key) {
       return key.startsWith(SEGMENT_MAP_ENTRY_PREFIX);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to