xiangfu0 commented on code in PR #18529:
URL: https://github.com/apache/pinot/pull/18529#discussion_r3298092872
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -2034,9 +2341,253 @@ static void validateRequest(PinotQuery pinotQuery, int
queryResponseLimit) {
}
}
+ /// Broker-side helper invoked by the MV handler's split-dispatcher
callback. Owns the generic
+ /// Execute a SPLIT_REWRITE plan via the configured MV handler. Returns a
finalized
+ /// `BrokerResponseNative` on success, or `null` to signal the caller should
fall through to
+ /// the non-split base-table path. Execute-time MV failures (route gone
mid-query, malformed
+ /// time-format from a pre-V2 rolling-upgrade znode, NPE inside a strategy
plan) must never
+ /// surface as HTTP 500 — they bump `QUERY_REWRITE_EXCEPTIONS` and return
null.
+ /// `BROKER_RESOURCE_MISSING` (MV dropped mid-query) is demoted to WARN so
the operator's
+ /// error-log signal isn't drowned in routine MV churn.
+ @Nullable
+ private BrokerResponseNative tryExecuteMaterializedViewSplit(long requestId,
String query,
+ SqlNodeAndOptions sqlNodeAndOptions, @Nullable RequesterIdentity
requesterIdentity,
+ RequestContext requestContext, BrokerRequest brokerRequest, PinotQuery
serverPinotQuery, Schema schema,
+ MaterializedViewContext materializedViewContext, TableRouteInfo
routeInfo, long remainingTimeMs,
+ List<QueryProcessingException> errorMsgs, int numPrunedSegmentsTotal,
String database,
+ String tableName, String userRawTableName, boolean rlsFiltersApplied,
ServerStats serverStats,
+ RoutingManager selectedRoutingManager, TableRouteProvider routeProvider,
boolean queryWasLogged) {
+ String clientRequestId = extractClientRequestId(sqlNodeAndOptions);
+ MaterializedViewSplitDispatcher dispatcher =
+ (originalReq, baseQ, baseRoute, baseSch, viewQ, viewTable, viewSch,
timeoutMs) ->
+ dispatchMaterializedViewSplit(requestId, originalReq, baseQ,
baseRoute, baseSch, viewQ, viewTable,
+ viewSch, timeoutMs, serverStats, requestContext,
selectedRoutingManager, routeProvider,
+ clientRequestId, query);
+ MaterializedViewSplitExecutionContext splitCtx =
MaterializedViewSplitExecutionContext.builder()
+ .originalBrokerRequest(brokerRequest)
+ .baseServerPinotQuery(serverPinotQuery)
+ .baseSchema(schema)
+ .materializedViewContext(materializedViewContext)
+ .baseRouteInfo(routeInfo)
+ .remainingTimeMs(remainingTimeMs)
+ .dispatcher(dispatcher)
+ .build();
+ BrokerResponseNative viewSplitResponse;
+ try {
+ viewSplitResponse = _materializedViewHandler.executeSplit(splitCtx);
+ } catch (QueryException qe) {
+ LOGGER.warn("Materialized view split execution skipped for request {} on
table {}: {}; "
+ + "falling back to base-table query path", requestId,
userRawTableName, qe.getMessage());
+ _brokerMetrics.addMeteredTableValue(userRawTableName,
BrokerMeter.QUERY_REWRITE_EXCEPTIONS, 1);
+ return null;
+ } catch (Exception e) {
+ LOGGER.error("Materialized view split execution failed for request {} on
table {}; "
+ + "falling back to base-table query path", requestId,
userRawTableName, e);
+ _brokerMetrics.addMeteredTableValue(userRawTableName,
BrokerMeter.QUERY_REWRITE_EXCEPTIONS, 1);
+ return null;
+ }
+ viewSplitResponse.setTablesQueried(Set.of(userRawTableName));
+ _materializedViewHandler.annotateResponse(viewSplitResponse,
materializedViewContext);
+ for (QueryProcessingException errorMsg : errorMsgs) {
+ viewSplitResponse.addException(errorMsg);
+ }
+ viewSplitResponse.setNumSegmentsPrunedByBroker(numPrunedSegmentsTotal);
+ fillEmptyResponseSchema(brokerRequest.getPinotQuery(), viewSplitResponse,
schema, database, query);
+ long totalTimeMs = System.currentTimeMillis() -
requestContext.getRequestArrivalTimeMillis();
+ viewSplitResponse.setTimeUsedMs(totalTimeMs);
+ augmentStatistics(requestContext, viewSplitResponse);
+ viewSplitResponse.setRLSFiltersApplied(rlsFiltersApplied);
+ _queryLogger.logQueryCompleted(
+ new QueryLogger.QueryLogParams(requestContext, tableName,
viewSplitResponse,
+ QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE,
requesterIdentity, serverStats),
+ queryWasLogged);
+ return viewSplitResponse;
+ }
+
+ /// route-build + scatter-gather + reduce work (hybrid offline/realtime
split, always-false-filter
+ /// pruning, MV-side route, deep-copy of the reduce request to strip
`SERVER_RETURN_FINAL_RESULT`).
+ /// MV-specific concerns (time-boundary computation and per-branch filter
attachment) live in the
+ /// MV handler implementation, not here.
+ private BrokerResponseNative dispatchMaterializedViewSplit(long requestId,
+ BrokerRequest originalBrokerRequest, PinotQuery baseQueryWithTimeFilter,
TableRouteInfo baseRouteInfo,
+ Schema baseSchema, PinotQuery viewQueryWithTimeFilter, String
viewTableNameWithType, Schema viewSchema,
+ long timeoutMs, ServerStats serverStats, RequestContext requestContext,
RoutingManager routingManager,
+ TableRouteProvider routeProvider, @Nullable String clientRequestId,
String query) throws Exception {
+ /// 1. Optimize and route the base-table side (handles hybrid
offline/realtime split).
+ _queryOptimizer.optimize(baseQueryWithTimeFilter, baseSchema);
+ BrokerRequest baseBrokerRequest =
CalciteSqlCompiler.convertToBrokerRequest(baseQueryWithTimeFilter);
+ ImplicitHybridTableRouteInfo hybridBaseRoute =
+ prepareBaseTableHybridRoute(baseBrokerRequest, baseRouteInfo,
baseSchema);
+
+ BrokerRequest baseOfflineBrokerRequest =
hybridBaseRoute.getOfflineBrokerRequest();
+ BrokerRequest baseRealtimeBrokerRequest =
hybridBaseRoute.getRealtimeBrokerRequest();
+ if (baseOfflineBrokerRequest != null &&
isFilterAlwaysFalse(baseOfflineBrokerRequest.getPinotQuery())) {
+ baseOfflineBrokerRequest = null;
+ }
+ if (baseRealtimeBrokerRequest != null &&
isFilterAlwaysFalse(baseRealtimeBrokerRequest.getPinotQuery())) {
+ baseRealtimeBrokerRequest = null;
+ }
+ hybridBaseRoute.setOfflineBrokerRequest(baseOfflineBrokerRequest);
+ hybridBaseRoute.setRealtimeBrokerRequest(baseRealtimeBrokerRequest);
+ routeProvider.calculateRoutes(hybridBaseRoute, routingManager,
baseOfflineBrokerRequest, baseRealtimeBrokerRequest,
+ requestId);
+
+ /// 2. Optimize and route the MV-table side (always offline).
+ /// Apply MV-table expression overrides and `$ts$DAY` timestamp-index
hints before
+ /// optimization, mirroring the non-split path. Without this, overrides
configured on the
+ /// MV table would be silently dropped on the SPLIT path.
+ handleExpressionOverride(viewQueryWithTimeFilter,
_tableCache.getExpressionOverrideMap(viewTableNameWithType));
+ handleTimestampIndexOverride(viewQueryWithTimeFilter,
_tableCache.getTableConfig(viewTableNameWithType));
+ _queryOptimizer.optimize(viewQueryWithTimeFilter, viewSchema);
+ BrokerRequest viewBrokerRequest =
CalciteSqlCompiler.convertToBrokerRequest(viewQueryWithTimeFilter);
+ TableRouteInfo viewRouteInfo =
routeProvider.getTableRouteInfo(viewTableNameWithType, _tableCache,
routingManager);
+ /// The MV table can disappear between compile (broker MV cache) and
execute (route lookup)
+ /// — operator dropped it, controller mid-state-transition, etc. Treat
missing-route the
+ /// same way the base path does (`BROKER_RESOURCE_MISSING` with a graceful
message); the
+ /// caller's try/catch wrapper then falls back to the base table.
+ if (!viewRouteInfo.isExists()) {
+ throw new QueryException(QueryErrorCode.BROKER_RESOURCE_MISSING,
+ "Materialized view " + viewTableNameWithType
+ + " is no longer routable (table may have been dropped after
rewrite compile)");
+ }
+ /// `MaterializedViewTaskScheduler.generateTasks` only generates for
OFFLINE tables, so the
+ /// MV route MUST be OFFLINE-only. Fail loud rather than silently
misroute against a
+ /// HYBRID/REALTIME table that operators may have created by mistake.
+ if (!viewRouteInfo.isOffline()) {
+ /// Throw a typed `QueryException` (caught at WARN by
`tryExecuteMaterializedViewSplit`)
+ /// rather than `IllegalStateException` (which would log at ERROR with
stack), since a
+ /// HYBRID/REALTIME MV route is an operator misconfiguration the broker
can recover from
+ /// by falling back to the base-table path — same recovery model as the
missing-route
+ /// case above.
+ throw new QueryException(QueryErrorCode.BROKER_RESOURCE_MISSING,
+ "MV split routing requires an OFFLINE materialized-view table, got
route type for "
+ + viewTableNameWithType + ": hybrid=" + viewRouteInfo.isHybrid()
+ + ", offline=" + viewRouteInfo.isOffline());
+ }
+ routeProvider.calculateRoutes(viewRouteInfo, routingManager,
viewBrokerRequest, null, requestId);
+
+ /// 3. Build the reduce-time broker request (strip
`SERVER_RETURN_FINAL_RESULT`).
+ ///
+ /// The `originalBrokerRequest` may carry
`SERVER_RETURN_FINAL_RESULT=true` set by the outer
+ /// `doHandleRequest` flow (when `numServers == 1`). Since
Review Comment:
`materializedViewRequestId` creates a second live scatter-gather, but cancel
still only tracks `requestId` and `handleCancel()` only sends DELETEs for
`getGlobalQueryId(requestId)`. On a long-running split rewrite, cancelling the
query stops only the base branch while the MV branch keeps running until
timeout/completion. We need to cancel both request ids or reuse one cancellable
id for both branches.
##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java:
##########
@@ -308,15 +308,32 @@ public Map<String, String>
getMaterializedViewTaskConfigs() {
return
_taskConfig.getConfigsForTaskType(CommonConstants.MaterializedViewTask.TASK_TYPE);
}
- /// Whether the table has a non-empty `definedSQL` under
[CommonConstants.MaterializedViewTask#TASK_TYPE].
+ /// Whether the table has a non-empty `definedSQL` under any registered
MV-style task type.
+ ///
+ /// Recognizes both the built-in OSS
[CommonConstants.MaterializedViewTask#TASK_TYPE] and any
+ /// downstream task variant (e.g. StarTree's `MseMaterializedViewTask` for
multi-stage MVs) by
+ /// scanning every task config for a non-empty `definedSQL` key. This keeps
the
+ /// `isMaterializedView=true` invariant compatible with the wider ecosystem
without OSS having to
+ /// hard-code each variant's task-type label.
@JsonIgnore
public boolean hasMaterializedViewTaskWithDefinedSql() {
Review Comment:
`hasMaterializedViewTaskWithDefinedSql()` now treats any task config with
`definedSQL` as MV identity, but `validateMaterializedViewConfigUpdate()` still
reads `definedSQL` only from the built-in `MaterializedViewTask`. That lets a
downstream MV-style task change `definedSQL` in place without tripping the
immutability guard, even though the persisted MV data was built from the old
definition. The update path needs the same generic task scan as this helper.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]