yashmayya commented on code in PR #17009:
URL: https://github.com/apache/pinot/pull/17009#discussion_r2430078712


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -214,21 +214,28 @@ protected MseBlock getNextBlock() {
         sendEos((MseBlock.Eos) block);
       } else {
         sendMseBlock(((MseBlock.Data) block));
+        checkTerminationAndSampleUsage();
       }
-      checkTerminationAndSampleUsage();
       return block;
-    } catch (QueryCancelledException e) {
-      LOGGER.debug("Query was cancelled for opChain: {}", _context.getId());
-      return SuccessMseBlock.INSTANCE;
-    } catch (TerminationException e) {
-      LOGGER.info("Query was terminated for opChain: {}", _context.getId(), e);
-      return ErrorMseBlock.fromException(e);
-    } catch (QueryException e) {
-      return ErrorMseBlock.fromException(e);
     } catch (RuntimeException e) {
-      ErrorMseBlock errorBlock = ErrorMseBlock.fromException(e);
-      try {
+      if (e instanceof QueryCancelledException) {
+        LOGGER.debug("Query was cancelled for opChain: {}", _context.getId());
+        return SuccessMseBlock.INSTANCE;

Review Comment:
   Why do we return `SuccessMseBlock` in this case?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java:
##########
@@ -201,23 +198,31 @@ private byte[] serializeResponse(ServerQueryRequest 
queryRequest, InstanceRespon
     byte[] responseByte = null;
     try {
       responseByte = instanceResponse.toDataTable().toBytes();
-    } catch (EarlyTerminationException e) {
-      QueryException terminateException = 
QueryThreadContext.getTerminateException();
-      String userMsg = "Cancelled while building data table" + 
(terminateException != null ? ": " + terminateException
-          : StringUtils.EMPTY);
-      LOGGER.error(userMsg);
-      QueryErrorMessage errMsg = 
QueryErrorMessage.safeMsg(QueryErrorCode.QUERY_CANCELLATION, userMsg);
-      Map<String, String> queryOptions = 
queryRequest.getQueryContext().getQueryOptions();
-      String workloadName = QueryOptionsUtils.getWorkloadName(queryOptions);
-      instanceResponse = new InstanceResponseBlock(new 
ExceptionResultsBlock(errMsg));
-      instanceResponse.addMetadata(MetadataKey.REQUEST_ID.getName(), 
Long.toString(queryRequest.getRequestId()));
-      instanceResponse.addMetadata(MetadataKey.QUERY_ID.getName(), 
queryRequest.getCid());
-      instanceResponse.addMetadata(MetadataKey.WORKLOAD_NAME.getName(), 
workloadName);
-      return serializeResponse(queryRequest, instanceResponse);
     } catch (Exception e) {
-      
_serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS,
 1);
-      LOGGER.error("Caught exception while serializing response for requestId: 
{}, brokerId: {}",
-          queryRequest.getRequestId(), queryRequest.getBrokerId(), e);
+      long requestId = queryRequest.getRequestId();
+      String brokerId = queryRequest.getBrokerId();
+      // First check terminate exception and use it as the response if exists. 
We want to return the termination reason
+      // when query is explicitly terminated.
+      QueryException queryException = 
QueryThreadContext.getTerminateException();
+      // Do not log exception when query is explicitly terminated
+      if (queryException == null && e instanceof QueryException) {
+        queryException = (QueryException) e;

Review Comment:
   Shouldn't this case also be treated like a response serialization error?



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java:
##########
@@ -91,11 +90,14 @@ protected InstanceResponseBlock getNextBlock() {
         return 
buildInstanceResponseBlock(resultsBlock).toMetadataOnlyResponseBlock();
       }
     } catch (Throwable t) {
-      TerminationException terminateException = 
QueryThreadContext.getTerminateException();
-      if (terminateException != null) {
-        return new InstanceResponseBlock(new 
ExceptionResultsBlock(terminateException));
-      } else if (t instanceof QueryException) {
-        return new InstanceResponseBlock(new 
ExceptionResultsBlock((QueryException) t));
+      // First check terminate exception and use it as the results block if 
exists. We want to return the termination
+      // reason when query is explicitly terminated.
+      QueryException queryException = 
QueryThreadContext.getTerminateException();

Review Comment:
   Do we not want to even log the exception caught here if the query has been 
terminated explicitly?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -74,15 +76,17 @@ public OpChainSchedulerService(ExecutorService 
executorService, PinotConfigurati
             MultiStageQueryRunner.DEFAULT_OF_CANCELLED_QUERY_CACHE_EXPIRE_MS));
   }
 
+  @VisibleForTesting
   public OpChainSchedulerService(ExecutorService executorService) {
-    this(executorService, MultiStageQueryRunner.DEFAULT_OF_OP_STATS_CACHE_SIZE,
+    this("testServer", executorService, 
MultiStageQueryRunner.DEFAULT_OF_OP_STATS_CACHE_SIZE,

Review Comment:
   This constructor shouldn't be public IMO since it isn't intended to be used 
externally in production code - can we make it package-private instead?



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CpuBasedBrokerQueryKillingIntegrationTest.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.core.accounting.ResourceUsageAccountantFactory;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class CpuBasedBrokerQueryKillingIntegrationTest extends 
BaseQueryKillingIntegrationTest {
+
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    super.overrideBrokerConf(brokerConf);
+
+    String prefix = CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + ".";
+    brokerConf.setProperty(prefix + Accounting.CONFIG_OF_FACTORY_NAME, 
ResourceUsageAccountantFactory.class.getName());
+    brokerConf.setProperty(prefix + 
Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, true);
+    brokerConf.setProperty(prefix + 
Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED, true);
+    brokerConf.setProperty(prefix + 
Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS, 500);
+    brokerConf.setProperty(prefix + 
Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 1.1f);
+    brokerConf.setProperty(prefix + 
Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO, 1.1f);
+  }
+
+  @Test
+  public void testCpuTimeKill()
+      throws Exception {
+    // Trigger broker side CPU kill with a large streaming query
+    // Do not run other queries because they can cause OOM on server first
+    setUseMultiStageQueryEngine(true);
+    verifyCpuTimeKill(LARGE_SELECT_STAR_QUERY, 
postQuery(LARGE_SELECT_STAR_QUERY));

Review Comment:
   Hm this seems like it could be prone to test flakiness? How do we know that 
this query will be deterministically killed on the broker but never on the 
server?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to