This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5606b510bdb MSE ErrorOperator (#16257)
5606b510bdb is described below

commit 5606b510bdb787f4465a1c8e2ed5789530be973d
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Thu Jul 17 16:10:04 2025 +0200

    MSE ErrorOperator (#16257)
---
 .../tests/MultiStageEngineIntegrationTest.java     |  26 ++++
 .../query/runtime/operator/ErrorOperator.java      | 102 +++++++++++++
 .../query/runtime/plan/PlanNodeToOpChain.java      | 162 ++++++++++++++-------
 .../service/dispatch/QueryDispatcherTest.java      |   7 +-
 4 files changed, 244 insertions(+), 53 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 80b76b15157..4b0b97b51b7 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -56,6 +56,7 @@ import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
+import org.assertj.core.api.Assertions;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.testng.Assert;
@@ -375,6 +376,31 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
     assertEquals(oneHourAgoTodayStr, expectedOneHourAgoTodayStr);
   }
 
+  @Test
+  public void testUnsupportedUdfOnIntermediateStage()
+      throws Exception {
+    String sqlQuery = ""
+        + "SET timeoutMs=10000;\n" // In older versions we timeout in this 
case, but we should fail fast now
+        + "WITH fakeTable AS (\n" // this table is used to make sure the call 
is made on an intermediate stage
+        + "  SELECT \n"
+        + "    t1.DaysSinceEpoch + t2.DaysSinceEpoch as DaysSinceEpoch"
+        + "  FROM (select * from mytable limit 1) AS t1 \n"
+        + "  CROSS JOIN (select * from mytable limit 1) AS t2 \n"
+        + ")\n"
+        + "SELECT \n"
+        // arrayMax is not supported on intermediate stages. Broker doesn't 
know that, so this produces an error
+        // when the query is received on the server
+        + "  arrayMax(ARRAY[DaysSinceEpoch]) \n"
+        + "FROM fakeTable \n";
+    JsonNode response = postQuery(sqlQuery);
+    Assertions.assertThat(response.get("exceptions"))
+        .describedAs("Expected exception for unsupported projection")
+        .isNotEmpty();
+    
Assertions.assertThat(response.get("exceptions").get(0).get("message").asText())
+        .describedAs("Expected exception message for unsupported projection")
+        .contains("Unsupported function: ARRAYMAX");
+  }
+
   @Test
   public void testRegexpReplace()
       throws Exception {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java
new file mode 100644
index 00000000000..fbe9c8f99a3
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
+import org.apache.pinot.query.runtime.blocks.MseBlock;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/// An operator that emits an error block with a specific error code and 
message.
+///
+/// This operator is not produced by any planner rule but instead when we find 
an error and it is too late to throw an
+/// exception. This for example happens when servers receive a query in
+/// [org.apache.pinot.query.planner.plannode.PlanNode] and for whatever reason 
(ie uses a function that is unknown for
+/// the server) it cannot be transformed into an actual operator. At that time 
it is too late to throw an exception
+/// because the server dispatcher is async and the only way to communicate the 
error is through the mailbox mechanism.
+/// Therefore we create an `ErrorOperator` that emits an error block with the 
error code and message, aborting the query
+/// execution in a controlled way.
+public class ErrorOperator extends MultiStageOperator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ErrorOperator.class);
+  private static final String EXPLAIN_NAME = "ERROR";
+  private final QueryErrorCode _errorCode;
+  private final String _errorMessage;
+  private final StatMap<LiteralValueOperator.StatKey> _statMap = new 
StatMap<>(LiteralValueOperator.StatKey.class);
+  private final List<MultiStageOperator> _childOperators;
+
+
+  public ErrorOperator(OpChainExecutionContext context, QueryErrorCode 
errorCode,
+      String errorMessage) {
+    this(context, errorCode, errorMessage, List.of());
+  }
+
+  public ErrorOperator(OpChainExecutionContext context, QueryErrorCode 
errorCode,
+      String errorMessage, @Nullable MultiStageOperator childOperator) {
+    this(context, errorCode, errorMessage, childOperator == null ? List.of() : 
List.of(childOperator));
+  }
+
+  public ErrorOperator(OpChainExecutionContext context, QueryErrorCode 
errorCode,
+      String errorMessage, List<MultiStageOperator> childOperators) {
+    super(context);
+    _errorCode = errorCode;
+    _errorMessage = errorMessage;
+    _childOperators = childOperators;
+  }
+
+  @Override
+  protected MseBlock getNextBlock() {
+    return ErrorMseBlock.fromError(_errorCode, _errorMessage);
+  }
+
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected Logger logger() {
+    return LOGGER;
+  }
+
+  @Override
+  public void registerExecution(long time, int numRows) {
+    _statMap.merge(LiteralValueOperator.StatKey.EXECUTION_TIME_MS, time);
+    _statMap.merge(LiteralValueOperator.StatKey.EMITTED_ROWS, numRows);
+  }
+
+  @Override
+  public Type getOperatorType() {
+    return Type.LITERAL;
+  }
+
+  @Override
+  public List<MultiStageOperator> getChildOperators() {
+    return _childOperators;
+  }
+
+  protected StatMap<?> copyStatMaps() {
+    return new StatMap<>(_statMap);
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
index c08cb893599..a1ef3c9571b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
@@ -20,7 +20,10 @@ package org.apache.pinot.query.runtime.plan;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.pinot.query.planner.plannode.AggregateNode;
 import org.apache.pinot.query.planner.plannode.ExchangeNode;
 import org.apache.pinot.query.planner.plannode.ExplainedNode;
@@ -38,6 +41,7 @@ import org.apache.pinot.query.planner.plannode.ValueNode;
 import org.apache.pinot.query.planner.plannode.WindowNode;
 import org.apache.pinot.query.runtime.operator.AggregateOperator;
 import org.apache.pinot.query.runtime.operator.AsofJoinOperator;
+import org.apache.pinot.query.runtime.operator.ErrorOperator;
 import org.apache.pinot.query.runtime.operator.FilterOperator;
 import org.apache.pinot.query.runtime.operator.HashJoinOperator;
 import org.apache.pinot.query.runtime.operator.IntersectAllOperator;
@@ -58,6 +62,7 @@ import 
org.apache.pinot.query.runtime.operator.TransformOperator;
 import org.apache.pinot.query.runtime.operator.UnionOperator;
 import org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
+import org.apache.pinot.spi.exception.QueryErrorCode;
 
 
 /**
@@ -118,10 +123,14 @@ public class PlanNodeToOpChain {
 
     @Override
     public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, 
OpChainExecutionContext context) {
-      if (node.isSort()) {
-        return new SortedMailboxReceiveOperator(context, node);
-      } else {
-        return new MailboxReceiveOperator(context, node);
+      try {
+        if (node.isSort()) {
+          return new SortedMailboxReceiveOperator(context, node);
+        } else {
+          return new MailboxReceiveOperator(context, node);
+        }
+      } catch (Exception e) {
+        return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION, 
e.getMessage());
       }
     }
 
@@ -132,96 +141,147 @@ public class PlanNodeToOpChain {
 
     @Override
     public MultiStageOperator visitAggregate(AggregateNode node, 
OpChainExecutionContext context) {
-      return new AggregateOperator(context, visit(node.getInputs().get(0), 
context), node);
+      MultiStageOperator child = null;
+      try {
+        child = visit(node.getInputs().get(0), context);
+        return new AggregateOperator(context, child, node);
+      } catch (Exception e) {
+        return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION, 
e.getMessage(), child);
+      }
     }
 
     @Override
     public MultiStageOperator visitWindow(WindowNode node, 
OpChainExecutionContext context) {
-      PlanNode input = node.getInputs().get(0);
-      return new WindowAggregateOperator(context, visit(input, context), 
input.getDataSchema(), node);
+      MultiStageOperator child = null;
+      try {
+        PlanNode input = node.getInputs().get(0);
+        child = visit(input, context);
+        return new WindowAggregateOperator(context, child, 
input.getDataSchema(), node);
+      } catch (Exception e) {
+        return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION, 
e.getMessage(), child);
+      }
     }
 
     @Override
     public MultiStageOperator visitSetOp(SetOpNode setOpNode, 
OpChainExecutionContext context) {
       List<MultiStageOperator> inputOperators = new 
ArrayList<>(setOpNode.getInputs().size());
-      for (PlanNode input : setOpNode.getInputs()) {
-        inputOperators.add(visit(input, context));
-      }
-      switch (setOpNode.getSetOpType()) {
-        case UNION:
-          return new UnionOperator(context, inputOperators, 
setOpNode.getInputs().get(0).getDataSchema());
-        case INTERSECT:
-          return setOpNode.isAll() ? new IntersectAllOperator(context, 
inputOperators,
-              setOpNode.getInputs().get(0).getDataSchema())
-              : new IntersectOperator(context, inputOperators, 
setOpNode.getInputs().get(0).getDataSchema());
-        case MINUS:
-          return setOpNode.isAll() ? new MinusAllOperator(context, 
inputOperators,
-              setOpNode.getInputs().get(0).getDataSchema())
-              : new MinusOperator(context, inputOperators, 
setOpNode.getInputs().get(0).getDataSchema());
-        default:
-          throw new IllegalStateException("Unsupported SetOpType: " + 
setOpNode.getSetOpType());
+      try {
+        for (PlanNode input : setOpNode.getInputs()) {
+          inputOperators.add(visit(input, context));
+        }
+        switch (setOpNode.getSetOpType()) {
+          case UNION:
+            return new UnionOperator(context, inputOperators, 
setOpNode.getInputs().get(0).getDataSchema());
+          case INTERSECT:
+            return setOpNode.isAll() ? new IntersectAllOperator(context, 
inputOperators,
+                setOpNode.getInputs().get(0).getDataSchema())
+                : new IntersectOperator(context, inputOperators, 
setOpNode.getInputs().get(0).getDataSchema());
+          case MINUS:
+            return setOpNode.isAll() ? new MinusAllOperator(context, 
inputOperators,
+                setOpNode.getInputs().get(0).getDataSchema())
+                : new MinusOperator(context, inputOperators, 
setOpNode.getInputs().get(0).getDataSchema());
+          default:
+            throw new IllegalStateException("Unsupported SetOpType: " + 
setOpNode.getSetOpType());
+        }
+      } catch (Exception e) {
+        return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION, 
e.getMessage(), inputOperators);
       }
     }
 
     @Override
     public MultiStageOperator visitExchange(ExchangeNode exchangeNode, 
OpChainExecutionContext context) {
-      throw new UnsupportedOperationException("ExchangeNode should not be 
visited");
+      return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION, 
"ExchangeNode should not be visited");
     }
 
     @Override
     public MultiStageOperator visitFilter(FilterNode node, 
OpChainExecutionContext context) {
-      return new FilterOperator(context, visit(node.getInputs().get(0), 
context), node);
+      MultiStageOperator child = null;
+      try {
+        child = visit(node.getInputs().get(0), context);
+        return new FilterOperator(context, child, node);
+      } catch (Exception e) {
+        return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION, 
e.getMessage(), child);
+      }
     }
 
     @Override
     public MultiStageOperator visitJoin(JoinNode node, OpChainExecutionContext 
context) {
-      List<PlanNode> inputs = node.getInputs();
-      PlanNode left = inputs.get(0);
-      MultiStageOperator leftOperator = visit(left, context);
-      PlanNode right = inputs.get(1);
-      MultiStageOperator rightOperator = visit(right, context);
-      JoinNode.JoinStrategy joinStrategy = node.getJoinStrategy();
-      switch (joinStrategy) {
-        case HASH:
-          if (node.getLeftKeys().isEmpty()) {
-            // TODO: Consider adding non-equi as a separate join strategy.
-            return new NonEquiJoinOperator(context, leftOperator, 
left.getDataSchema(), rightOperator, node);
-          } else {
-            return new HashJoinOperator(context, leftOperator, 
left.getDataSchema(), rightOperator, node);
-          }
-        case LOOKUP:
-          return new LookupJoinOperator(context, leftOperator, rightOperator, 
node);
-        case ASOF:
-          return new AsofJoinOperator(context, leftOperator, 
left.getDataSchema(), rightOperator, node);
-        default:
-          throw new IllegalStateException("Unsupported JoinStrategy: " + 
joinStrategy);
+      MultiStageOperator leftOperator = null;
+      MultiStageOperator rightOperator = null;
+      try {
+        List<PlanNode> inputs = node.getInputs();
+        PlanNode left = inputs.get(0);
+        leftOperator = visit(left, context);
+
+        PlanNode right = inputs.get(1);
+        rightOperator = visit(right, context);
+
+        JoinNode.JoinStrategy joinStrategy = node.getJoinStrategy();
+        switch (joinStrategy) {
+          case HASH:
+            if (node.getLeftKeys().isEmpty()) {
+              // TODO: Consider adding non-equi as a separate join strategy.
+              return new NonEquiJoinOperator(context, leftOperator, 
left.getDataSchema(), rightOperator, node);
+            } else {
+              return new HashJoinOperator(context, leftOperator, 
left.getDataSchema(), rightOperator, node);
+            }
+          case LOOKUP:
+            return new LookupJoinOperator(context, leftOperator, 
rightOperator, node);
+          case ASOF:
+            return new AsofJoinOperator(context, leftOperator, 
left.getDataSchema(), rightOperator, node);
+          default:
+            throw new IllegalStateException("Unsupported JoinStrategy: " + 
joinStrategy);
+        }
+      } catch (Exception e) {
+        List<MultiStageOperator> children = Stream.of(leftOperator, 
rightOperator)
+            .filter(Objects::nonNull)
+            .collect(Collectors.toList());
+        return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION, 
e.getMessage(), children);
       }
     }
 
     @Override
     public MultiStageOperator visitProject(ProjectNode node, 
OpChainExecutionContext context) {
-      PlanNode input = node.getInputs().get(0);
-      return new TransformOperator(context, visit(input, context), 
input.getDataSchema(), node);
+      MultiStageOperator child = null;
+      try {
+        PlanNode input = node.getInputs().get(0);
+        child = visit(input, context);
+        return new TransformOperator(context, child, input.getDataSchema(), 
node);
+      } catch (Exception e) {
+        return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION, 
e.getMessage(), child);
+      }
     }
 
     @Override
     public MultiStageOperator visitSort(SortNode node, OpChainExecutionContext 
context) {
-      return new SortOperator(context, visit(node.getInputs().get(0), 
context), node);
+      MultiStageOperator child = null;
+      try {
+        child = visit(node.getInputs().get(0), context);
+        return new SortOperator(context, child, node);
+      } catch (Exception e) {
+        return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION, 
e.getMessage(), child);
+      }
     }
 
     @Override
     public MultiStageOperator visitTableScan(TableScanNode node, 
OpChainExecutionContext context) {
-      throw new UnsupportedOperationException("Plan node of type TableScanNode 
is not supported!");
+      return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION,
+          "Plan node of type TableScanNode is not supported in OpChain 
execution.");
     }
 
     @Override
     public MultiStageOperator visitValue(ValueNode node, 
OpChainExecutionContext context) {
-      return new LiteralValueOperator(context, node);
+      try {
+        return new LiteralValueOperator(context, node);
+      } catch (Exception e) {
+        return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION, 
e.getMessage());
+      }
     }
 
     @Override
     public MultiStageOperator visitExplained(ExplainedNode node, 
OpChainExecutionContext context) {
-      throw new UnsupportedOperationException("Plan node of type ExplainedNode 
is not supported!");
+      return new ErrorOperator(context, QueryErrorCode.QUERY_EXECUTION,
+          "Plan node of type ExplainedNode is not supported in OpChain 
execution.");
     }
   }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index 3bea4ef650b..36aa6b0f231 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -164,8 +164,11 @@ public class QueryDispatcherTest extends QueryTestSet {
     DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql);
     try {
       // will throw b/c mailboxService is mocked
-      _queryDispatcher.submitAndReduce(context, dispatchableSubPlan, 10_000L, 
Collections.emptyMap());
-      Assert.fail("Method call above should have failed");
+      QueryDispatcher.QueryResult queryResult = 
_queryDispatcher.submitAndReduce(context, dispatchableSubPlan,
+          10_000L, Collections.emptyMap());
+      if (queryResult.getProcessingException() == null) {
+        Assert.fail("Method call above should have failed");
+      }
     } catch (NullPointerException e) {
       // Expected
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to