This is an automated email from the ASF dual-hosted git repository.
vvivekiyer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fb06abc2c2f [Query Resource Isolation] Fix Refresh message (#16636)
fb06abc2c2f is described below
commit fb06abc2c2f0b956e6cdbe4ec2875f3de9abcce9
Author: Praveen <[email protected]>
AuthorDate: Thu Aug 28 12:52:20 2025 -0700
[Query Resource Isolation] Fix Refresh message (#16636)
* Fix Refresh message
* delete queryworkload message handler
* info -> debug logs
---
.../BrokerUserDefinedMessageHandlerFactory.java | 28 ++++++-
.../messages/QueryWorkloadRefreshMessage.java | 2 +-
.../server/starter/helix/BaseServerStarter.java | 6 --
.../helix/QueryWorkloadMessageHandlerFactory.java | 87 ----------------------
.../helix/SegmentMessageHandlerFactory.java | 51 +++++++++++++
.../spi/accounting/WorkloadBudgetManager.java | 15 +++-
6 files changed, 88 insertions(+), 101 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
index 1b2e7ed0452..68dbc098bd8 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.TableConfigRefreshMessage;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -268,20 +269,39 @@ public class BrokerUserDefinedMessageHandlerFactory
implements MessageHandlerFac
private static class QueryWorkloadRefreshMessageHandler extends
MessageHandler {
final String _queryWorkloadName;
final InstanceCost _instanceCost;
+ final String _messageType;
QueryWorkloadRefreshMessageHandler(QueryWorkloadRefreshMessage
queryWorkloadRefreshMessage,
NotificationContext context) {
super(queryWorkloadRefreshMessage, context);
_queryWorkloadName = queryWorkloadRefreshMessage.getQueryWorkloadName();
_instanceCost = queryWorkloadRefreshMessage.getInstanceCost();
+ _messageType = queryWorkloadRefreshMessage.getMsgSubType();
}
@Override
public HelixTaskResult handleMessage() {
- // TODO: Add logic to invoke the query workload manager to
refresh/delete the query workload config
- HelixTaskResult result = new HelixTaskResult();
- result.setSuccess(true);
- return result;
+ LOGGER.info("Handling query workload message: {}", _message);
+ try {
+ if
(_messageType.equals(QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE))
{
+
Tracing.ThreadAccountantOps.getWorkloadBudgetManager().deleteWorkload(_queryWorkloadName);
+ } else if
(_messageType.equals(QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE))
{
+ if (_instanceCost == null) {
+ throw new IllegalStateException(
+ "Instance cost is not provided for refreshing query workload:
" + _queryWorkloadName);
+ }
+ Tracing.ThreadAccountantOps.getWorkloadBudgetManager()
+ .addOrUpdateWorkload(_queryWorkloadName,
_instanceCost.getCpuCostNs(), _instanceCost.getMemoryCostBytes());
+ } else {
+ throw new IllegalStateException("Unknown message type: " +
_messageType);
+ }
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ return result;
+ } catch (Exception e) {
+ LOGGER.warn("Failed to handle query workload message: {}",
_queryWorkloadName, e);
+ throw e;
+ }
}
@Override
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java
index 85f4da123ed..70c13193f60 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java
@@ -54,7 +54,7 @@ public class QueryWorkloadRefreshMessage extends Message {
public QueryWorkloadRefreshMessage(Message message) {
super(message.getRecord());
if (!message.getMsgSubType().equals(REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE)
- ||
!message.getMsgSubType().equals(DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE)) {
+ &&
!message.getMsgSubType().equals(DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE)) {
throw new IllegalArgumentException("Unknown message subtype:" +
message.getMsgSubType());
}
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 0c59296e8f8..e7397a58776 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -750,12 +750,6 @@ public abstract class BaseServerStarter implements
ServiceStartable {
new SegmentMessageHandlerFactory(instanceDataManager, serverMetrics);
_helixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
messageHandlerFactory);
- // Query workload message handler factory
- QueryWorkloadMessageHandlerFactory queryWorkloadMessageHandlerFactory =
- new QueryWorkloadMessageHandlerFactory(serverMetrics);
- _helixManager.getMessagingService()
-
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
- queryWorkloadMessageHandlerFactory);
serverMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, () ->
_helixManager.isConnected() ? 1L : 0L);
_helixManager.addPreConnectCallback(
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/QueryWorkloadMessageHandlerFactory.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/QueryWorkloadMessageHandlerFactory.java
deleted file mode 100644
index bdf91fc7ff5..00000000000
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/QueryWorkloadMessageHandlerFactory.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.server.starter.helix;
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.messaging.handling.HelixTaskResult;
-import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
-import org.apache.helix.model.Message;
-import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.spi.config.workload.InstanceCost;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class QueryWorkloadMessageHandlerFactory implements
MessageHandlerFactory {
- private static final Logger LOGGER =
LoggerFactory.getLogger(QueryWorkloadMessageHandlerFactory.class);
- private final ServerMetrics _metrics;
-
- public QueryWorkloadMessageHandlerFactory(ServerMetrics metrics) {
- _metrics = metrics;
- }
-
- @Override
- public MessageHandler createHandler(Message message, NotificationContext
context) {
- String messageType = message.getMsgSubType();
- if
(messageType.equals(QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE)
- ||
messageType.equals(QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE))
{
- return new QueryWorkloadRefreshMessageHandler(new
QueryWorkloadRefreshMessage(message), context);
- } else {
- throw new IllegalArgumentException("Unknown message subtype: " +
messageType);
- }
- }
-
- // Gets called once during start up. We must return the same message type
that this factory is registered for.
- @Override
- public String getMessageType() {
- return Message.MessageType.USER_DEFINE_MSG.toString();
- }
-
- @Override
- public void reset() {
- LOGGER.info("Reset called");
- }
-
- private static class QueryWorkloadRefreshMessageHandler extends
MessageHandler {
- final String _queryWorkloadName;
- final InstanceCost _instanceCost;
-
- QueryWorkloadRefreshMessageHandler(QueryWorkloadRefreshMessage
queryWorkloadRefreshMessage,
- NotificationContext context) {
- super(queryWorkloadRefreshMessage, context);
- _queryWorkloadName =
queryWorkloadRefreshMessage.getQueryWorkloadName();
- _instanceCost = queryWorkloadRefreshMessage.getInstanceCost();
- }
-
- @Override
- public HelixTaskResult handleMessage() {
- // TODO: Add logic to invoke the query workload manager to
refresh/delete the query workload config
- HelixTaskResult result = new HelixTaskResult();
- result.setSuccess(true);
- return result;
- }
-
- @Override
- public void onError(Exception e, ErrorCode errorCode, ErrorType
errorType) {
- LOGGER.error("Got error while refreshing query workload config for
query workload: {} (error code: {},"
- + " error type: {})", _queryWorkloadName, errorCode,
errorType, e);
- }
- }
-}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index 3429ec0c3ce..a25b449810b 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -31,6 +31,7 @@ import org.apache.helix.model.Message;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.messages.ForceCommitMessage;
import org.apache.pinot.common.messages.IngestionMetricsRemoveMessage;
+import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.messages.TableConfigSchemaRefreshMessage;
@@ -43,6 +44,8 @@ import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,6 +80,9 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
return new IngestionMetricsRemoveMessageHandler(new
IngestionMetricsRemoveMessage(message), _metrics, context);
case TableConfigSchemaRefreshMessage.REFRESH_TABLE_CONFIG_AND_SCHEMA:
return new TableSchemaRefreshMessageHandler(new
TableConfigSchemaRefreshMessage(message), _metrics, context);
+ case QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE:
+ case QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE:
+ return new QueryWorkloadRefreshMessageHandler(new
QueryWorkloadRefreshMessage(message), _metrics, context);
default:
LOGGER.warn("Unsupported user defined message sub type: {} for
segment: {}", msgSubType,
message.getPartitionName());
@@ -273,6 +279,51 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
}
}
+ private static class QueryWorkloadRefreshMessageHandler extends
DefaultMessageHandler {
+ final String _queryWorkloadName;
+ final InstanceCost _instanceCost;
+ final String _messageType;
+
+ QueryWorkloadRefreshMessageHandler(QueryWorkloadRefreshMessage
queryWorkloadRefreshMessage,
+ ServerMetrics metrics,
NotificationContext context) {
+ super(queryWorkloadRefreshMessage, metrics, context);
+ _queryWorkloadName = queryWorkloadRefreshMessage.getQueryWorkloadName();
+ _instanceCost = queryWorkloadRefreshMessage.getInstanceCost();
+ _messageType = queryWorkloadRefreshMessage.getMsgSubType();
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() {
+ LOGGER.info("Handling query workload message: {}", _message);
+ try {
+ if
(_messageType.equals(QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE))
{
+
Tracing.ThreadAccountantOps.getWorkloadBudgetManager().deleteWorkload(_queryWorkloadName);
+ } else if
(_messageType.equals(QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE))
{
+ if (_instanceCost == null) {
+ throw new IllegalStateException(
+ "Instance cost is not provided for refreshing query workload:
" + _queryWorkloadName);
+ }
+ Tracing.ThreadAccountantOps.getWorkloadBudgetManager()
+ .addOrUpdateWorkload(_queryWorkloadName,
_instanceCost.getCpuCostNs(), _instanceCost.getMemoryCostBytes());
+ } else {
+ throw new IllegalStateException("Unknown message type: " +
_messageType);
+ }
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ return result;
+ } catch (Exception e) {
+ LOGGER.warn("Failed to handle query workload message: {}",
_queryWorkloadName, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode errorCode, ErrorType errorType)
{
+ LOGGER.error("Got error while refreshing query workload config for query
workload: {} (error code: {},"
+ + " error type: {})", _queryWorkloadName, errorCode, errorType, e);
+ }
+ }
+
private static class DefaultMessageHandler extends MessageHandler {
final String _segmentName;
final String _tableNameWithType;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
index 83f03931c3f..ddd2734dc76 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
@@ -33,7 +33,7 @@ public class WorkloadBudgetManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(WorkloadBudgetManager.class);
private long _enforcementWindowMs;
- private final ConcurrentHashMap<String, Budget> _workloadBudgets = new
ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, Budget> _workloadBudgets;
private final ScheduledExecutorService _resetScheduler =
Executors.newSingleThreadScheduledExecutor();
private volatile boolean _isEnabled;
@@ -45,6 +45,7 @@ public class WorkloadBudgetManager {
LOGGER.info("WorkloadBudgetManager is disabled. Creating a no-op
instance.");
return;
}
+ _workloadBudgets = new ConcurrentHashMap<>();
_enforcementWindowMs =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS,
CommonConstants.Accounting.DEFAULT_WORKLOAD_ENFORCEMENT_WINDOW_MS);
initSecondaryWorkloadBudget(config);
@@ -113,6 +114,15 @@ public class WorkloadBudgetManager {
memoryBudgetBytes);
}
+ public void deleteWorkload(String workload) {
+ if (!_isEnabled) {
+ LOGGER.info("WorkloadBudgetManager is disabled. Not deleting workload:
{}", workload);
+ return;
+ }
+ _workloadBudgets.remove(workload);
+ LOGGER.info("Removed workload: {}", workload);
+ }
+
/**
* Collects workload stats for CPU and memory usage.
* Could be overridden for custom implementations
@@ -172,8 +182,7 @@ public class WorkloadBudgetManager {
* Periodically resets budgets at the end of each enforcement window
(Thread-Safe).
*/
private void startBudgetResetTask() {
- // TODO(Vivek): Reduce logging verbosity. Maybe make it debug logs.
- LOGGER.info("Starting budget reset task with enforcement window: {}ms",
_enforcementWindowMs);
+ LOGGER.debug("Starting budget reset task with enforcement window: {}ms",
_enforcementWindowMs);
_resetScheduler.scheduleAtFixedRate(() -> {
LOGGER.debug("Resetting all workload budgets.");
// Also print the budget used in the last enforcement window.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]