This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8e1a74d2a2 Add Tabled Is Disabled Error (#14199)
8e1a74d2a2 is described below
commit 8e1a74d2a22ab8b3087f5de97913bb4a59dfce57
Author: ashishjayamohan <[email protected]>
AuthorDate: Sun Oct 27 16:59:55 2024 -0700
Add Tabled Is Disabled Error (#14199)
---
.../BaseSingleStageBrokerRequestHandler.java | 33 ++++++++++++++++++++--
.../pinot/broker/routing/BrokerRoutingManager.java | 27 ++++++++++++++++--
.../pinot/common/exception/QueryException.java | 5 ++++
.../response/broker/BrokerResponseNative.java | 2 ++
4 files changed, 62 insertions(+), 5 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index c9862d1af0..42ae1d13ab 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -485,6 +485,7 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
if (realtimeTableName == null) {
realtimeTableConfig = null;
}
+
HandlerContext handlerContext = getHandlerContext(offlineTableConfig,
realtimeTableConfig);
if (handlerContext._disableGroovy) {
rejectGroovyQuery(serverPinotQuery);
@@ -620,9 +621,16 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
Map<ServerInstance, Pair<List<String>, List<String>>>
realtimeRoutingTable = null;
List<String> unavailableSegments = new ArrayList<>();
int numPrunedSegmentsTotal = 0;
+ boolean offlineTableDisabled = false;
+ boolean realtimeTableDisabled = false;
+ List<ProcessingException> exceptions = new ArrayList<>();
if (offlineBrokerRequest != null) {
+ offlineTableDisabled =
_routingManager.isTableDisabled(offlineTableName);
// NOTE: Routing table might be null if table is just removed
- RoutingTable routingTable =
_routingManager.getRoutingTable(offlineBrokerRequest, requestId);
+ RoutingTable routingTable = null;
+ if (!offlineTableDisabled) {
+ routingTable = _routingManager.getRoutingTable(offlineBrokerRequest,
requestId);
+ }
if (routingTable != null) {
unavailableSegments.addAll(routingTable.getUnavailableSegments());
Map<ServerInstance, Pair<List<String>, List<String>>>
serverInstanceToSegmentsMap =
@@ -638,8 +646,12 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
}
}
if (realtimeBrokerRequest != null) {
+ realtimeTableDisabled =
_routingManager.isTableDisabled(realtimeTableName);
// NOTE: Routing table might be null if table is just removed
- RoutingTable routingTable =
_routingManager.getRoutingTable(realtimeBrokerRequest, requestId);
+ RoutingTable routingTable = null;
+ if (!realtimeTableDisabled) {
+ routingTable =
_routingManager.getRoutingTable(realtimeBrokerRequest, requestId);
+ }
if (routingTable != null) {
unavailableSegments.addAll(routingTable.getUnavailableSegments());
Map<ServerInstance, Pair<List<String>, List<String>>>
serverInstanceToSegmentsMap =
@@ -654,10 +666,25 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
realtimeBrokerRequest = null;
}
}
+
+ if (offlineTableDisabled || realtimeTableDisabled) {
+ String errorMessage = null;
+ if (((realtimeTableConfig != null && offlineTableConfig != null) &&
(offlineTableDisabled
+ && realtimeTableDisabled)) || (offlineTableConfig == null &&
realtimeTableDisabled) || (
+ realtimeTableConfig == null && offlineTableDisabled)) {
+
requestContext.setErrorCode(QueryException.TABLE_IS_DISABLED_ERROR_CODE);
+ return BrokerResponseNative.TABLE_IS_DISABLED;
+ } else if ((realtimeTableConfig != null && offlineTableConfig != null)
&& realtimeTableDisabled) {
+ errorMessage = "Realtime table is disabled in hybrid table";
+ } else if ((realtimeTableConfig != null && offlineTableConfig != null)
&& offlineTableDisabled) {
+ errorMessage = "Offline table is disabled in hybrid table";
+ }
+
exceptions.add(QueryException.getException(QueryException.TABLE_IS_DISABLED_ERROR,
errorMessage));
+ }
+
int numUnavailableSegments = unavailableSegments.size();
requestContext.setNumUnavailableSegments(numUnavailableSegments);
- List<ProcessingException> exceptions = new ArrayList<>();
if (numUnavailableSegments > 0) {
String errorMessage;
if (numUnavailableSegments >
MAX_UNAVAILABLE_SEGMENTS_TO_PRINT_IN_QUERY_EXCEPTION) {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index f2f65c91e8..b6f82e0705 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -533,7 +533,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
RoutingEntry routingEntry =
new RoutingEntry(tableNameWithType, idealStatePath, externalViewPath,
segmentPreSelector, segmentSelector,
segmentPruners, instanceSelector, idealStateVersion,
externalViewVersion, segmentZkMetadataFetcher,
- timeBoundaryManager, partitionMetadataManager, queryTimeoutMs);
+ timeBoundaryManager, partitionMetadataManager, queryTimeoutMs,
!idealState.isEnabled());
if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
LOGGER.info("Built routing for table: {}", tableNameWithType);
} else {
@@ -603,6 +603,20 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
return _routingEntryMap.containsKey(tableNameWithType);
}
+ /**
+ * Returns whether the given table is enabled
+ * @param tableNameWithType Table name with type
+ * @return Whether the given table is enabled
+ */
+ public boolean isTableDisabled(String tableNameWithType) {
+ RoutingEntry routingEntry =
_routingEntryMap.getOrDefault(tableNameWithType, null);
+ if (routingEntry == null) {
+ return false;
+ } else {
+ return routingEntry.isDisabled();
+ }
+ }
+
/**
* Returns the routing table (a map from server instance to list of segments
hosted by the server, and a list of
* unavailable segments) based on the broker request, or {@code null} if the
routing does not exist.
@@ -729,11 +743,14 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
// Time boundary manager is only available for the offline part of the
hybrid table
transient TimeBoundaryManager _timeBoundaryManager;
+ transient boolean _disabled;
+
RoutingEntry(String tableNameWithType, String idealStatePath, String
externalViewPath,
SegmentPreSelector segmentPreSelector, SegmentSelector
segmentSelector, List<SegmentPruner> segmentPruners,
InstanceSelector instanceSelector, int lastUpdateIdealStateVersion,
int lastUpdateExternalViewVersion,
SegmentZkMetadataFetcher segmentZkMetadataFetcher, @Nullable
TimeBoundaryManager timeBoundaryManager,
- @Nullable SegmentPartitionMetadataManager partitionMetadataManager,
@Nullable Long queryTimeoutMs) {
+ @Nullable SegmentPartitionMetadataManager partitionMetadataManager,
@Nullable Long queryTimeoutMs,
+ boolean disabled) {
_tableNameWithType = tableNameWithType;
_idealStatePath = idealStatePath;
_externalViewPath = externalViewPath;
@@ -747,6 +764,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
_partitionMetadataManager = partitionMetadataManager;
_queryTimeoutMs = queryTimeoutMs;
_segmentZkMetadataFetcher = segmentZkMetadataFetcher;
+ _disabled = disabled;
}
String getTableNameWithType() {
@@ -779,6 +797,10 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
return _queryTimeoutMs;
}
+ boolean isDisabled() {
+ return _disabled;
+ }
+
// NOTE: The change gets applied in sequence, and before change applied to
all components, there could be some
// inconsistency between components, which is fine because the
inconsistency only exists for the newly changed
// segments and only lasts for a very short time.
@@ -793,6 +815,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
}
_lastUpdateIdealStateVersion = idealState.getStat().getVersion();
_lastUpdateExternalViewVersion = externalView.getStat().getVersion();
+ _disabled = !idealState.isEnabled();
}
void onInstancesChange(Set<String> enabledInstances, List<String>
changedInstances) {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index 82ed026aca..f6e563f62c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -53,6 +53,7 @@ public class QueryException {
public static final int COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR_CODE = 170;
public static final int ACCESS_DENIED_ERROR_CODE = 180;
public static final int TABLE_DOES_NOT_EXIST_ERROR_CODE = 190;
+ public static final int TABLE_IS_DISABLED_ERROR_CODE = 191;
public static final int QUERY_EXECUTION_ERROR_CODE = 200;
public static final int QUERY_CANCELLATION_ERROR_CODE = 503;
// TODO: Handle these errors in broker
@@ -95,6 +96,8 @@ public class QueryException {
new ProcessingException(COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR_CODE);
public static final ProcessingException TABLE_DOES_NOT_EXIST_ERROR =
new ProcessingException(TABLE_DOES_NOT_EXIST_ERROR_CODE);
+ public static final ProcessingException TABLE_IS_DISABLED_ERROR =
+ new ProcessingException(TABLE_IS_DISABLED_ERROR_CODE);
public static final ProcessingException QUERY_EXECUTION_ERROR = new
ProcessingException(QUERY_EXECUTION_ERROR_CODE);
public static final ProcessingException QUERY_CANCELLATION_ERROR =
new ProcessingException(QUERY_CANCELLATION_ERROR_CODE);
@@ -146,6 +149,7 @@ public class QueryException {
SEGMENT_PLAN_EXECUTION_ERROR.setMessage("SegmentPlanExecutionError");
COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR.setMessage("CombineSegmentPlanTimeoutError");
TABLE_DOES_NOT_EXIST_ERROR.setMessage("TableDoesNotExistError");
+ TABLE_IS_DISABLED_ERROR.setMessage("TableIsDisabledError");
QUERY_EXECUTION_ERROR.setMessage("QueryExecutionError");
QUERY_CANCELLATION_ERROR.setMessage("QueryCancellationError");
SERVER_SCHEDULER_DOWN_ERROR.setMessage("ServerShuttingDown");
@@ -230,6 +234,7 @@ public class QueryException {
case QueryException.SQL_PARSING_ERROR_CODE:
case QueryException.TOO_MANY_REQUESTS_ERROR_CODE:
case QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE:
+ case QueryException.TABLE_IS_DISABLED_ERROR_CODE:
case QueryException.UNKNOWN_COLUMN_ERROR_CODE:
return true;
default:
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 6cdfb3cc15..83a30a10a3 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -60,6 +60,8 @@ public class BrokerResponseNative implements BrokerResponse {
new BrokerResponseNative(QueryException.BROKER_RESOURCE_MISSING_ERROR);
public static final BrokerResponseNative TABLE_DOES_NOT_EXIST =
new BrokerResponseNative(QueryException.TABLE_DOES_NOT_EXIST_ERROR);
+ public static final BrokerResponseNative TABLE_IS_DISABLED =
+ new BrokerResponseNative(QueryException.TABLE_IS_DISABLED_ERROR);
public static final BrokerResponseNative BROKER_ONLY_EXPLAIN_PLAN_OUTPUT =
getBrokerResponseExplainPlanOutput();
private ResultTable _resultTable;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]