This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5606b510bdb MSE ErrorOperator (#16257)
5606b510bdb is described below
commit 5606b510bdb787f4465a1c8e2ed5789530be973d
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Thu Jul 17 16:10:04 2025 +0200
MSE ErrorOperator (#16257)
---
.../tests/MultiStageEngineIntegrationTest.java | 26 ++++
.../query/runtime/operator/ErrorOperator.java | 102 +++++++++++++
.../query/runtime/plan/PlanNodeToOpChain.java | 162 ++++++++++++++-------
.../service/dispatch/QueryDispatcherTest.java | 7 +-
4 files changed, 244 insertions(+), 53 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 80b76b15157..4b0b97b51b7 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -56,6 +56,7 @@ import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
+import org.assertj.core.api.Assertions;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.testng.Assert;
@@ -375,6 +376,31 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
assertEquals(oneHourAgoTodayStr, expectedOneHourAgoTodayStr);
}
+ @Test
+ public void testUnsupportedUdfOnIntermediateStage()
+ throws Exception {
+ String sqlQuery = ""
+ + "SET timeoutMs=10000;\n" // In older versions we timeout in this
case, but we should fail fast now
+ + "WITH fakeTable AS (\n" // this table is used to make sure the call
is made on an intermediate stage
+ + " SELECT \n"
+ + " t1.DaysSinceEpoch + t2.DaysSinceEpoch as DaysSinceEpoch"
+ + " FROM (select * from mytable limit 1) AS t1 \n"
+ + " CROSS JOIN (select * from mytable limit 1) AS t2 \n"
+ + ")\n"
+ + "SELECT \n"
+ // arrayMax is not supported on intermediate stages. Broker doesn't
know that, so this produces an error
+ // when the query is received on the server
+ + " arrayMax(ARRAY[DaysSinceEpoch]) \n"
+ + "FROM fakeTable \n";
+ JsonNode response = postQuery(sqlQuery);
+ Assertions.assertThat(response.get("exceptions"))
+ .describedAs("Expected exception for unsupported projection")
+ .isNotEmpty();
+
Assertions.assertThat(response.get("exceptions").get(0).get("message").asText())
+ .describedAs("Expected exception message for unsupported projection")
+ .contains("Unsupported function: ARRAYMAX");
+ }
+
@Test
public void testRegexpReplace()
throws Exception {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java
new file mode 100644
index 00000000000..fbe9c8f99a3
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
+import org.apache.pinot.query.runtime.blocks.MseBlock;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/// An operator that emits an error block with a specific error code and
message.
+///
+/// This operator is not produced by any planner rule but instead when we find
an error and it is too late to throw an
+/// exception. This for example happens when servers receive a query in
+/// [org.apache.pinot.query.planner.plannode.PlanNode] and for whatever reason
(ie uses a function that is unknown for
+/// the server) it cannot be transformed into an actual operator. At that time
it is too late to throw an exception
+/// because the server dispatcher is async and the only way to communicate the
error is through the mailbox mechanism.
+/// Therefore we create an `ErrorOperator` that emits an error block with the
error code and message, aborting the query
+/// execution in a controlled way.
+public class ErrorOperator extends MultiStageOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ErrorOperator.class);
+ private static final String EXPLAIN_NAME = "ERROR";
+ private final QueryErrorCode _errorCode;
+ private final String _errorMessage;
+ private final StatMap<LiteralValueOperator.StatKey> _statMap = new
StatMap<>(LiteralValueOperator.StatKey.class);
+ private final List<MultiStageOperator> _childOperators;
+
+
+ public ErrorOperator(OpChainExecutionContext context, QueryErrorCode
errorCode,
+ String errorMessage) {
+ this(context, errorCode, errorMessage, List.of());
+ }
+
+ public ErrorOperator(OpChainExecutionContext context, QueryErrorCode
errorCode,
+ String errorMessage, @Nullable MultiStageOperator childOperator) {
+ this(context, errorCode, errorMessage, childOperator == null ? List.of() :
List.of(childOperator));
+ }
+
+ public ErrorOperator(OpChainExecutionContext context, QueryErrorCode
errorCode,
+ String errorMessage, List<MultiStageOperator> childOperators) {
+ super(context);
+ _errorCode = errorCode;
+ _errorMessage = errorMessage;
+ _childOperators = childOperators;
+ }
+
+ @Override
+ protected MseBlock getNextBlock() {
+ return ErrorMseBlock.fromError(_errorCode, _errorMessage);
+ }
+
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ protected Logger logger() {
+ return LOGGER;
+ }
+
+ @Override
+ public void registerExecution(long time, int numRows) {
+ _statMap.merge(LiteralValueOperator.StatKey.EXECUTION_TIME_MS, time);
+ _statMap.merge(LiteralValueOperator.StatKey.EMITTED_ROWS, numRows);
+ }
+
+ @Override
+ public Type getOperatorType() {
+ return Type.LITERAL;
+ }
+
+ @Override
+ public List<MultiStageOperator> getChildOperators() {
+ return _childOperators;
+ }
+
+ protected StatMap<?> copyStatMaps() {
+ return new StatMap<>(_statMap);
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
index c08cb893599..a1ef3c9571b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
@@ -20,7 +20,10 @@ package org.apache.pinot.query.runtime.plan;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
import org.apache.pinot.query.planner.plannode.ExplainedNode;
@@ -38,6 +41,7 @@ import org.apache.pinot.query.planner.plannode.ValueNode;
import org.apache.pinot.query.planner.plannode.WindowNode;
import org.apache.pinot.query.runtime.operator.AggregateOperator;
import org.apache.pinot.query.runtime.operator.AsofJoinOperator;
+import org.apache.pinot.query.runtime.operator.ErrorOperator;
import org.apache.pinot.query.runtime.operator.FilterOperator;
import org.apache.pinot.query.runtime.operator.HashJoinOperator;
import org.apache.pinot.query.runtime.operator.IntersectAllOperator;
@@ -58,6 +62,7 @@ import
org.apache.pinot.query.runtime.operator.TransformOperator;
import org.apache.pinot.query.runtime.operator.UnionOperator;
import org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
+import org.apache.pinot.spi.exception.QueryErrorCode;
/**
@@ -118,10 +123,14 @@ public class PlanNodeToOpChain {
@Override
public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node,
OpChainExecutionContext context) {
- if (node.isSort()) {
- return new SortedMailboxReceiveOperator(context, node);
- } else {
- return new MailboxReceiveOperator(context, node);
+ try {
+ if (node.isSort()) {
+ return new SortedMailboxReceiveOperator(context, node);
+ } else {
+ return new MailboxReceiveOperator(context, node);
+ }
+ } catch (Exception e) {
+ return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION,
e.getMessage());
}
}
@@ -132,96 +141,147 @@ public class PlanNodeToOpChain {
@Override
public MultiStageOperator visitAggregate(AggregateNode node,
OpChainExecutionContext context) {
- return new AggregateOperator(context, visit(node.getInputs().get(0),
context), node);
+ MultiStageOperator child = null;
+ try {
+ child = visit(node.getInputs().get(0), context);
+ return new AggregateOperator(context, child, node);
+ } catch (Exception e) {
+ return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION,
e.getMessage(), child);
+ }
}
@Override
public MultiStageOperator visitWindow(WindowNode node,
OpChainExecutionContext context) {
- PlanNode input = node.getInputs().get(0);
- return new WindowAggregateOperator(context, visit(input, context),
input.getDataSchema(), node);
+ MultiStageOperator child = null;
+ try {
+ PlanNode input = node.getInputs().get(0);
+ child = visit(input, context);
+ return new WindowAggregateOperator(context, child,
input.getDataSchema(), node);
+ } catch (Exception e) {
+ return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION,
e.getMessage(), child);
+ }
}
@Override
public MultiStageOperator visitSetOp(SetOpNode setOpNode,
OpChainExecutionContext context) {
List<MultiStageOperator> inputOperators = new
ArrayList<>(setOpNode.getInputs().size());
- for (PlanNode input : setOpNode.getInputs()) {
- inputOperators.add(visit(input, context));
- }
- switch (setOpNode.getSetOpType()) {
- case UNION:
- return new UnionOperator(context, inputOperators,
setOpNode.getInputs().get(0).getDataSchema());
- case INTERSECT:
- return setOpNode.isAll() ? new IntersectAllOperator(context,
inputOperators,
- setOpNode.getInputs().get(0).getDataSchema())
- : new IntersectOperator(context, inputOperators,
setOpNode.getInputs().get(0).getDataSchema());
- case MINUS:
- return setOpNode.isAll() ? new MinusAllOperator(context,
inputOperators,
- setOpNode.getInputs().get(0).getDataSchema())
- : new MinusOperator(context, inputOperators,
setOpNode.getInputs().get(0).getDataSchema());
- default:
- throw new IllegalStateException("Unsupported SetOpType: " +
setOpNode.getSetOpType());
+ try {
+ for (PlanNode input : setOpNode.getInputs()) {
+ inputOperators.add(visit(input, context));
+ }
+ switch (setOpNode.getSetOpType()) {
+ case UNION:
+ return new UnionOperator(context, inputOperators,
setOpNode.getInputs().get(0).getDataSchema());
+ case INTERSECT:
+ return setOpNode.isAll() ? new IntersectAllOperator(context,
inputOperators,
+ setOpNode.getInputs().get(0).getDataSchema())
+ : new IntersectOperator(context, inputOperators,
setOpNode.getInputs().get(0).getDataSchema());
+ case MINUS:
+ return setOpNode.isAll() ? new MinusAllOperator(context,
inputOperators,
+ setOpNode.getInputs().get(0).getDataSchema())
+ : new MinusOperator(context, inputOperators,
setOpNode.getInputs().get(0).getDataSchema());
+ default:
+ throw new IllegalStateException("Unsupported SetOpType: " +
setOpNode.getSetOpType());
+ }
+ } catch (Exception e) {
+ return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION,
e.getMessage(), inputOperators);
}
}
@Override
public MultiStageOperator visitExchange(ExchangeNode exchangeNode,
OpChainExecutionContext context) {
- throw new UnsupportedOperationException("ExchangeNode should not be
visited");
+ return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION,
"ExchangeNode should not be visited");
}
@Override
public MultiStageOperator visitFilter(FilterNode node,
OpChainExecutionContext context) {
- return new FilterOperator(context, visit(node.getInputs().get(0),
context), node);
+ MultiStageOperator child = null;
+ try {
+ child = visit(node.getInputs().get(0), context);
+ return new FilterOperator(context, child, node);
+ } catch (Exception e) {
+ return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION,
e.getMessage(), child);
+ }
}
@Override
public MultiStageOperator visitJoin(JoinNode node, OpChainExecutionContext
context) {
- List<PlanNode> inputs = node.getInputs();
- PlanNode left = inputs.get(0);
- MultiStageOperator leftOperator = visit(left, context);
- PlanNode right = inputs.get(1);
- MultiStageOperator rightOperator = visit(right, context);
- JoinNode.JoinStrategy joinStrategy = node.getJoinStrategy();
- switch (joinStrategy) {
- case HASH:
- if (node.getLeftKeys().isEmpty()) {
- // TODO: Consider adding non-equi as a separate join strategy.
- return new NonEquiJoinOperator(context, leftOperator,
left.getDataSchema(), rightOperator, node);
- } else {
- return new HashJoinOperator(context, leftOperator,
left.getDataSchema(), rightOperator, node);
- }
- case LOOKUP:
- return new LookupJoinOperator(context, leftOperator, rightOperator,
node);
- case ASOF:
- return new AsofJoinOperator(context, leftOperator,
left.getDataSchema(), rightOperator, node);
- default:
- throw new IllegalStateException("Unsupported JoinStrategy: " +
joinStrategy);
+ MultiStageOperator leftOperator = null;
+ MultiStageOperator rightOperator = null;
+ try {
+ List<PlanNode> inputs = node.getInputs();
+ PlanNode left = inputs.get(0);
+ leftOperator = visit(left, context);
+
+ PlanNode right = inputs.get(1);
+ rightOperator = visit(right, context);
+
+ JoinNode.JoinStrategy joinStrategy = node.getJoinStrategy();
+ switch (joinStrategy) {
+ case HASH:
+ if (node.getLeftKeys().isEmpty()) {
+ // TODO: Consider adding non-equi as a separate join strategy.
+ return new NonEquiJoinOperator(context, leftOperator,
left.getDataSchema(), rightOperator, node);
+ } else {
+ return new HashJoinOperator(context, leftOperator,
left.getDataSchema(), rightOperator, node);
+ }
+ case LOOKUP:
+ return new LookupJoinOperator(context, leftOperator,
rightOperator, node);
+ case ASOF:
+ return new AsofJoinOperator(context, leftOperator,
left.getDataSchema(), rightOperator, node);
+ default:
+ throw new IllegalStateException("Unsupported JoinStrategy: " +
joinStrategy);
+ }
+ } catch (Exception e) {
+ List<MultiStageOperator> children = Stream.of(leftOperator,
rightOperator)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION,
e.getMessage(), children);
}
}
@Override
public MultiStageOperator visitProject(ProjectNode node,
OpChainExecutionContext context) {
- PlanNode input = node.getInputs().get(0);
- return new TransformOperator(context, visit(input, context),
input.getDataSchema(), node);
+ MultiStageOperator child = null;
+ try {
+ PlanNode input = node.getInputs().get(0);
+ child = visit(input, context);
+ return new TransformOperator(context, child, input.getDataSchema(),
node);
+ } catch (Exception e) {
+ return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION,
e.getMessage(), child);
+ }
}
@Override
public MultiStageOperator visitSort(SortNode node, OpChainExecutionContext
context) {
- return new SortOperator(context, visit(node.getInputs().get(0),
context), node);
+ MultiStageOperator child = null;
+ try {
+ child = visit(node.getInputs().get(0), context);
+ return new SortOperator(context, child, node);
+ } catch (Exception e) {
+ return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION,
e.getMessage(), child);
+ }
}
@Override
public MultiStageOperator visitTableScan(TableScanNode node,
OpChainExecutionContext context) {
- throw new UnsupportedOperationException("Plan node of type TableScanNode
is not supported!");
+ return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION,
+ "Plan node of type TableScanNode is not supported in OpChain
execution.");
}
@Override
public MultiStageOperator visitValue(ValueNode node,
OpChainExecutionContext context) {
- return new LiteralValueOperator(context, node);
+ try {
+ return new LiteralValueOperator(context, node);
+ } catch (Exception e) {
+ return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION,
e.getMessage());
+ }
}
@Override
public MultiStageOperator visitExplained(ExplainedNode node,
OpChainExecutionContext context) {
- throw new UnsupportedOperationException("Plan node of type ExplainedNode
is not supported!");
+ return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION,
+ "Plan node of type ExplainedNode is not supported in OpChain
execution.");
}
}
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index 3bea4ef650b..36aa6b0f231 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -164,8 +164,11 @@ public class QueryDispatcherTest extends QueryTestSet {
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql);
try {
// will throw b/c mailboxService is mocked
- _queryDispatcher.submitAndReduce(context, dispatchableSubPlan, 10_000L,
Collections.emptyMap());
- Assert.fail("Method call above should have failed");
+ QueryDispatcher.QueryResult queryResult =
_queryDispatcher.submitAndReduce(context, dispatchableSubPlan,
+ 10_000L, Collections.emptyMap());
+ if (queryResult.getProcessingException() == null) {
+ Assert.fail("Method call above should have failed");
+ }
} catch (NullPointerException e) {
// Expected
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]