This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b16cdece24 [multistage] improve dispatch exception handling (#11688)
b16cdece24 is described below
commit b16cdece2408f926e927db855b45396d8fcc739e
Author: Rong Rong <[email protected]>
AuthorDate: Wed Sep 27 13:47:35 2023 -0700
[multistage] improve dispatch exception handling (#11688)
* fix test runner error not propagating through issue
* clean up submission service and utilize CompletableFuture as an
abstraction
---------
Co-authored-by: Rong Rong <[email protected]>
---
.../pinot/query/service/SubmissionService.java | 59 ----------------------
.../pinot/query/service/server/QueryServer.java | 33 ++++++++----
.../apache/pinot/query/QueryServerEnclosure.java | 16 ++----
.../query/runtime/queries/QueryRunnerTest.java | 41 ++++-----------
.../query/runtime/queries/QueryRunnerTestBase.java | 29 +++++++++--
5 files changed, 63 insertions(+), 115 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/SubmissionService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/SubmissionService.java
deleted file mode 100644
index b42c1d831d..0000000000
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/SubmissionService.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.service;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * Submission service is used to submit multiple runnables and checks the
result upon all {@link Future} returns
- * or any failure occurs.
- */
-public class SubmissionService {
- private final ExecutorService _executor;
- private final List<CompletableFuture<Void>> _futures = new ArrayList<>();
-
- public SubmissionService(ExecutorService executor) {
- _executor = executor;
- }
-
- public void submit(Runnable runnable) {
- _futures.add(CompletableFuture.runAsync(runnable, _executor));
- }
-
- public void awaitFinish(long deadlineMs)
- throws Exception {
- CompletableFuture<Void> completableFuture =
CompletableFuture.allOf(_futures.toArray(new CompletableFuture[]{}));
- try {
- completableFuture.get(deadlineMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
- } finally {
- // Cancel all ongoing submission
- for (CompletableFuture<Void> future : _futures) {
- if (!future.isDone()) {
- future.cancel(true);
- }
- }
- }
- }
-}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index a90418c742..281f434d30 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -25,8 +25,10 @@ import io.grpc.stub.StreamObserver;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
@@ -35,7 +37,6 @@ import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
-import org.apache.pinot.query.service.SubmissionService;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
@@ -112,21 +113,31 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad
request").withCause(e).asException());
return;
}
- // 2. Submit distributed stage plans
- SubmissionService submissionService = new
SubmissionService(_querySubmissionExecutorService);
- distributedStagePlans.forEach(distributedStagePlan ->
submissionService.submit(() -> {
- _queryRunner.processQuery(distributedStagePlan, requestMetadata);
- }));
- // 3. await response successful or any failure which cancels all other
tasks.
+ // 2. Submit distributed stage plans, await response successful or any
failure which cancels all other tasks.
+ int numSubmission = distributedStagePlans.size();
+ CompletableFuture<?>[] submissionStubs = new
CompletableFuture[numSubmission];
+ for (int i = 0; i < numSubmission; i++) {
+ DistributedStagePlan distributedStagePlan = distributedStagePlans.get(i);
+ submissionStubs[i] =
+ CompletableFuture.runAsync(() ->
_queryRunner.processQuery(distributedStagePlan, requestMetadata),
+ _querySubmissionExecutorService);
+ }
try {
- submissionService.awaitFinish(deadlineMs);
- } catch (Throwable t) {
- LOGGER.error("error occurred during stage submission for {}:\n{}",
requestId, t);
+ CompletableFuture.allOf(submissionStubs).get(deadlineMs -
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ LOGGER.error("error occurred during stage submission for {}:\n{}",
requestId, e);
responseObserver.onNext(Worker.QueryResponse.newBuilder()
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR,
- QueryException.getTruncatedStackTrace(t)).build());
+ QueryException.getTruncatedStackTrace(e)).build());
responseObserver.onCompleted();
return;
+ } finally {
+ // Cancel all ongoing submission
+ for (CompletableFuture<?> future : submissionStubs) {
+ if (!future.isDone()) {
+ future.cancel(true);
+ }
+ }
}
responseObserver.onNext(
Worker.QueryResponse.newBuilder().putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK,
"")
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 8038898fa2..b4b1dff3cc 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -108,16 +109,9 @@ public class QueryServerEnclosure {
_queryRunner.shutDown();
}
- public void processQuery(DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadataMap) {
- _queryRunner.getExecutorService().submit(() -> {
- try {
- _queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
- } catch (Exception e) {
- // TODO: Find a way to propagate the exception and fail the test
- System.err.println("Caught exception while executing query");
- e.printStackTrace(System.err);
- throw new RuntimeException("Error executing query!", e);
- }
- });
+ public CompletableFuture<Void> processQuery(DistributedStagePlan
distributedStagePlan,
+ Map<String, String> requestMetadataMap) {
+ return CompletableFuture.runAsync(() ->
_queryRunner.processQuery(distributedStagePlan, requestMetadataMap),
+ _queryRunner.getExecutorService());
}
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
index 1d8646399a..d54eb5b1d0 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
@@ -20,21 +20,16 @@ package org.apache.pinot.query.runtime.queries;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import java.util.stream.Collectors;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
-import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryServerEnclosure;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.planner.DispatchablePlanFragment;
-import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.config.table.TableType;
@@ -44,8 +39,6 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.sql.parsers.CalciteSqlParser;
-import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -181,34 +174,22 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
*/
@Test(dataProvider = "testDataWithSqlExecutionExceptions")
public void testSqlWithExceptionMsgChecker(String sql, String exceptionMsg) {
- long requestId = REQUEST_ID_GEN.getAndIncrement();
- SqlNodeAndOptions sqlNodeAndOptions =
CalciteSqlParser.compileToSqlNodeAndOptions(sql);
- QueryEnvironment.QueryPlannerResult queryPlannerResult =
- _queryEnvironment.planQuery(sql, sqlNodeAndOptions, requestId);
- DispatchableSubPlan dispatchableSubPlan =
queryPlannerResult.getQueryPlan();
- Map<String, String> requestMetadataMap = new HashMap<>();
-
requestMetadataMap.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
String.valueOf(requestId));
- Long timeoutMsInQueryOption =
QueryOptionsUtils.getTimeoutMs(sqlNodeAndOptions.getOptions());
- long timeoutMs =
- timeoutMsInQueryOption != null ? timeoutMsInQueryOption :
CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS;
-
requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
String.valueOf(timeoutMs));
-
requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
"true");
- requestMetadataMap.putAll(sqlNodeAndOptions.getOptions());
- List<DispatchablePlanFragment> stagePlans =
dispatchableSubPlan.getQueryStageList();
- for (int stageId = 1; stageId < stagePlans.size(); stageId++) {
- processDistributedStagePlans(dispatchableSubPlan, stageId,
requestMetadataMap);
- }
try {
- QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs,
Collections.emptyMap(), null,
- _mailboxService);
- Assert.fail("Should have thrown exception!");
- } catch (RuntimeException e) {
+ // query pinot
+ List<Object[]> resultRows = queryRunner(sql, null);
+ Assert.fail(
+ "Expected error with message '" + exceptionMsg + "'. But instead
rows were returned: " + resultRows.stream()
+ .map(Arrays::toString).collect(Collectors.joining(",\n")));
+ } catch (Exception e) {
// NOTE: The actual message is (usually) something like:
// Received error query execution result block:
{200=QueryExecutionError:
// Query execution error on: Server_localhost_12345
// java.lang.IllegalArgumentException: Illegal Json Path: $['path']
does not match document
String exceptionMessage = e.getMessage();
- Assert.assertTrue(exceptionMessage.startsWith("Received error query
execution result block: "));
+ Assert.assertTrue(
+ exceptionMessage.startsWith("Received error query execution result
block: ") || exceptionMessage.startsWith(
+ "Error occurred during stage submission"),
+ "Exception message didn't start with proper heading: " +
exceptionMessage);
Assert.assertTrue(exceptionMessage.contains(exceptionMsg),
"Exception should contain: " + exceptionMsg + ", but found: " +
exceptionMessage);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index a79880e5a5..0c422f3962 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -36,10 +36,13 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
+import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
@@ -112,33 +115,51 @@ public abstract class QueryRunnerTestBase extends
QueryTestSet {
requestMetadataMap.put(CommonConstants.Broker.Request.TRACE, "true");
}
+ // Submission Stub logic are mimic {@link QueryServer}
List<DispatchablePlanFragment> stagePlans =
dispatchableSubPlan.getQueryStageList();
+ List<CompletableFuture<?>> submissionStubs = new ArrayList<>();
for (int stageId = 0; stageId < stagePlans.size(); stageId++) {
if (stageId != 0) {
- processDistributedStagePlans(dispatchableSubPlan, stageId,
requestMetadataMap);
+
submissionStubs.addAll(processDistributedStagePlans(dispatchableSubPlan,
stageId, requestMetadataMap));
}
if (executionStatsAggregatorMap != null) {
executionStatsAggregatorMap.put(stageId, new
ExecutionStatsAggregator(true));
}
}
+ try {
+ CompletableFuture.allOf(submissionStubs.toArray(new
CompletableFuture[0])).get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ // wrap and throw the exception here is for assert purpose on
dispatch-time error
+ throw new RuntimeException("Error occurred during stage submission: " +
QueryException.getTruncatedStackTrace(e));
+ } finally {
+ // Cancel all ongoing submission
+ for (CompletableFuture<?> future : submissionStubs) {
+ if (!future.isDone()) {
+ future.cancel(true);
+ }
+ }
+ }
+ // exception will be propagated through for assert purpose on runtime error
ResultTable resultTable =
QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs,
Collections.emptyMap(),
executionStatsAggregatorMap, _mailboxService);
return resultTable.getRows();
}
- protected void processDistributedStagePlans(DispatchableSubPlan
dispatchableSubPlan, int stageId,
- Map<String, String> requestMetadataMap) {
+ protected List<CompletableFuture<?>>
processDistributedStagePlans(DispatchableSubPlan dispatchableSubPlan,
+ int stageId, Map<String, String> requestMetadataMap) {
Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap =
dispatchableSubPlan.getQueryStageList().get(stageId).getServerInstanceToWorkerIdMap();
+ List<CompletableFuture<?>> submissionStubs = new ArrayList<>();
for (Map.Entry<QueryServerInstance, List<Integer>> entry :
serverInstanceToWorkerIdMap.entrySet()) {
QueryServerInstance server = entry.getKey();
for (int workerId : entry.getValue()) {
DistributedStagePlan distributedStagePlan =
constructDistributedStagePlan(dispatchableSubPlan, stageId, new
VirtualServerAddress(server, workerId));
- _servers.get(server).processQuery(distributedStagePlan,
requestMetadataMap);
+
submissionStubs.add(_servers.get(server).processQuery(distributedStagePlan,
requestMetadataMap));
}
}
+ return submissionStubs;
}
protected static DistributedStagePlan
constructDistributedStagePlan(DispatchableSubPlan dispatchableSubPlan,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]