xiangfu0 commented on code in PR #18529:
URL: https://github.com/apache/pinot/pull/18529#discussion_r3298092872


##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -2034,9 +2341,253 @@ static void validateRequest(PinotQuery pinotQuery, int 
queryResponseLimit) {
     }
   }
 
+  /// Broker-side helper invoked by the MV handler's split-dispatcher 
callback. Owns the generic
+  /// Execute a SPLIT_REWRITE plan via the configured MV handler.  Returns a 
finalized
+  /// `BrokerResponseNative` on success, or `null` to signal the caller should 
fall through to
+  /// the non-split base-table path.  Execute-time MV failures (route gone 
mid-query, malformed
+  /// time-format from a pre-V2 rolling-upgrade znode, NPE inside a strategy 
plan) must never
+  /// surface as HTTP 500 — they bump `QUERY_REWRITE_EXCEPTIONS` and return 
null.
+  /// `BROKER_RESOURCE_MISSING` (MV dropped mid-query) is demoted to WARN so 
the operator's
+  /// error-log signal isn't drowned in routine MV churn.
+  @Nullable
+  private BrokerResponseNative tryExecuteMaterializedViewSplit(long requestId, 
String query,
+      SqlNodeAndOptions sqlNodeAndOptions, @Nullable RequesterIdentity 
requesterIdentity,
+      RequestContext requestContext, BrokerRequest brokerRequest, PinotQuery 
serverPinotQuery, Schema schema,
+      MaterializedViewContext materializedViewContext, TableRouteInfo 
routeInfo, long remainingTimeMs,
+      List<QueryProcessingException> errorMsgs, int numPrunedSegmentsTotal, 
String database,
+      String tableName, String userRawTableName, boolean rlsFiltersApplied, 
ServerStats serverStats,
+      RoutingManager selectedRoutingManager, TableRouteProvider routeProvider, 
boolean queryWasLogged) {
+    String clientRequestId = extractClientRequestId(sqlNodeAndOptions);
+    MaterializedViewSplitDispatcher dispatcher =
+        (originalReq, baseQ, baseRoute, baseSch, viewQ, viewTable, viewSch, 
timeoutMs) ->
+            dispatchMaterializedViewSplit(requestId, originalReq, baseQ, 
baseRoute, baseSch, viewQ, viewTable,
+                viewSch, timeoutMs, serverStats, requestContext, 
selectedRoutingManager, routeProvider,
+                clientRequestId, query);
+    MaterializedViewSplitExecutionContext splitCtx = 
MaterializedViewSplitExecutionContext.builder()
+        .originalBrokerRequest(brokerRequest)
+        .baseServerPinotQuery(serverPinotQuery)
+        .baseSchema(schema)
+        .materializedViewContext(materializedViewContext)
+        .baseRouteInfo(routeInfo)
+        .remainingTimeMs(remainingTimeMs)
+        .dispatcher(dispatcher)
+        .build();
+    BrokerResponseNative viewSplitResponse;
+    try {
+      viewSplitResponse = _materializedViewHandler.executeSplit(splitCtx);
+    } catch (QueryException qe) {
+      LOGGER.warn("Materialized view split execution skipped for request {} on 
table {}: {}; "
+          + "falling back to base-table query path", requestId, 
userRawTableName, qe.getMessage());
+      _brokerMetrics.addMeteredTableValue(userRawTableName, 
BrokerMeter.QUERY_REWRITE_EXCEPTIONS, 1);
+      return null;
+    } catch (Exception e) {
+      LOGGER.error("Materialized view split execution failed for request {} on 
table {}; "
+          + "falling back to base-table query path", requestId, 
userRawTableName, e);
+      _brokerMetrics.addMeteredTableValue(userRawTableName, 
BrokerMeter.QUERY_REWRITE_EXCEPTIONS, 1);
+      return null;
+    }
+    viewSplitResponse.setTablesQueried(Set.of(userRawTableName));
+    _materializedViewHandler.annotateResponse(viewSplitResponse, 
materializedViewContext);
+    for (QueryProcessingException errorMsg : errorMsgs) {
+      viewSplitResponse.addException(errorMsg);
+    }
+    viewSplitResponse.setNumSegmentsPrunedByBroker(numPrunedSegmentsTotal);
+    fillEmptyResponseSchema(brokerRequest.getPinotQuery(), viewSplitResponse, 
schema, database, query);
+    long totalTimeMs = System.currentTimeMillis() - 
requestContext.getRequestArrivalTimeMillis();
+    viewSplitResponse.setTimeUsedMs(totalTimeMs);
+    augmentStatistics(requestContext, viewSplitResponse);
+    viewSplitResponse.setRLSFiltersApplied(rlsFiltersApplied);
+    _queryLogger.logQueryCompleted(
+        new QueryLogger.QueryLogParams(requestContext, tableName, 
viewSplitResponse,
+            QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, 
requesterIdentity, serverStats),
+        queryWasLogged);
+    return viewSplitResponse;
+  }
+
+  /// route-build + scatter-gather + reduce work (hybrid offline/realtime 
split, always-false-filter
+  /// pruning, MV-side route, deep-copy of the reduce request to strip 
`SERVER_RETURN_FINAL_RESULT`).
+  /// MV-specific concerns (time-boundary computation and per-branch filter 
attachment) live in the
+  /// MV handler implementation, not here.
+  private BrokerResponseNative dispatchMaterializedViewSplit(long requestId,
+      BrokerRequest originalBrokerRequest, PinotQuery baseQueryWithTimeFilter, 
TableRouteInfo baseRouteInfo,
+      Schema baseSchema, PinotQuery viewQueryWithTimeFilter, String 
viewTableNameWithType, Schema viewSchema,
+      long timeoutMs, ServerStats serverStats, RequestContext requestContext, 
RoutingManager routingManager,
+      TableRouteProvider routeProvider, @Nullable String clientRequestId, 
String query) throws Exception {
+    /// 1. Optimize and route the base-table side (handles hybrid 
offline/realtime split).
+    _queryOptimizer.optimize(baseQueryWithTimeFilter, baseSchema);
+    BrokerRequest baseBrokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(baseQueryWithTimeFilter);
+    ImplicitHybridTableRouteInfo hybridBaseRoute =
+        prepareBaseTableHybridRoute(baseBrokerRequest, baseRouteInfo, 
baseSchema);
+
+    BrokerRequest baseOfflineBrokerRequest = 
hybridBaseRoute.getOfflineBrokerRequest();
+    BrokerRequest baseRealtimeBrokerRequest = 
hybridBaseRoute.getRealtimeBrokerRequest();
+    if (baseOfflineBrokerRequest != null && 
isFilterAlwaysFalse(baseOfflineBrokerRequest.getPinotQuery())) {
+      baseOfflineBrokerRequest = null;
+    }
+    if (baseRealtimeBrokerRequest != null && 
isFilterAlwaysFalse(baseRealtimeBrokerRequest.getPinotQuery())) {
+      baseRealtimeBrokerRequest = null;
+    }
+    hybridBaseRoute.setOfflineBrokerRequest(baseOfflineBrokerRequest);
+    hybridBaseRoute.setRealtimeBrokerRequest(baseRealtimeBrokerRequest);
+    routeProvider.calculateRoutes(hybridBaseRoute, routingManager, 
baseOfflineBrokerRequest, baseRealtimeBrokerRequest,
+        requestId);
+
+    /// 2. Optimize and route the MV-table side (always offline).
+    /// Apply MV-table expression overrides and `$ts$DAY` timestamp-index 
hints before
+    /// optimization, mirroring the non-split path.  Without this, overrides 
configured on the
+    /// MV table would be silently dropped on the SPLIT path.
+    handleExpressionOverride(viewQueryWithTimeFilter, 
_tableCache.getExpressionOverrideMap(viewTableNameWithType));
+    handleTimestampIndexOverride(viewQueryWithTimeFilter, 
_tableCache.getTableConfig(viewTableNameWithType));
+    _queryOptimizer.optimize(viewQueryWithTimeFilter, viewSchema);
+    BrokerRequest viewBrokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(viewQueryWithTimeFilter);
+    TableRouteInfo viewRouteInfo = 
routeProvider.getTableRouteInfo(viewTableNameWithType, _tableCache, 
routingManager);
+    /// The MV table can disappear between compile (broker MV cache) and 
execute (route lookup)
+    /// — operator dropped it, controller mid-state-transition, etc.  Treat 
missing-route the
+    /// same way the base path does (`BROKER_RESOURCE_MISSING` with a graceful 
message); the
+    /// caller's try/catch wrapper then falls back to the base table.
+    if (!viewRouteInfo.isExists()) {
+      throw new QueryException(QueryErrorCode.BROKER_RESOURCE_MISSING,
+          "Materialized view " + viewTableNameWithType
+              + " is no longer routable (table may have been dropped after 
rewrite compile)");
+    }
+    /// `MaterializedViewTaskScheduler.generateTasks` only generates for 
OFFLINE tables, so the
+    /// MV route MUST be OFFLINE-only.  Fail loud rather than silently 
misroute against a
+    /// HYBRID/REALTIME table that operators may have created by mistake.
+    if (!viewRouteInfo.isOffline()) {
+      /// Throw a typed `QueryException` (caught at WARN by 
`tryExecuteMaterializedViewSplit`)
+      /// rather than `IllegalStateException` (which would log at ERROR with 
stack), since a
+      /// HYBRID/REALTIME MV route is an operator misconfiguration the broker 
can recover from
+      /// by falling back to the base-table path — same recovery model as the 
missing-route
+      /// case above.
+      throw new QueryException(QueryErrorCode.BROKER_RESOURCE_MISSING,
+          "MV split routing requires an OFFLINE materialized-view table, got 
route type for "
+              + viewTableNameWithType + ": hybrid=" + viewRouteInfo.isHybrid()
+              + ", offline=" + viewRouteInfo.isOffline());
+    }
+    routeProvider.calculateRoutes(viewRouteInfo, routingManager, 
viewBrokerRequest, null, requestId);
+
+    /// 3. Build the reduce-time broker request (strip 
`SERVER_RETURN_FINAL_RESULT`).
+    ///
+    /// The `originalBrokerRequest` may carry 
`SERVER_RETURN_FINAL_RESULT=true` set by the outer
+    /// `doHandleRequest` flow (when `numServers == 1`).  Since

Review Comment:
   `materializedViewRequestId` creates a second live scatter-gather, but cancel 
still only tracks `requestId` and `handleCancel()` only sends DELETEs for 
`getGlobalQueryId(requestId)`. On a long-running split rewrite, cancelling the 
query stops only the base branch while the MV branch keeps running until 
timeout/completion. We need to cancel both request ids or reuse one cancellable 
id for both branches.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java:
##########
@@ -308,15 +308,32 @@ public Map<String, String> 
getMaterializedViewTaskConfigs() {
     return 
_taskConfig.getConfigsForTaskType(CommonConstants.MaterializedViewTask.TASK_TYPE);
   }
 
-  /// Whether the table has a non-empty `definedSQL` under 
[CommonConstants.MaterializedViewTask#TASK_TYPE].
+  /// Whether the table has a non-empty `definedSQL` under any registered 
MV-style task type.
+  ///
+  /// Recognizes both the built-in OSS 
[CommonConstants.MaterializedViewTask#TASK_TYPE] and any
+  /// downstream task variant (e.g. StarTree's `MseMaterializedViewTask` for 
multi-stage MVs) by
+  /// scanning every task config for a non-empty `definedSQL` key. This keeps 
the
+  /// `isMaterializedView=true` invariant compatible with the wider ecosystem 
without OSS having to
+  /// hard-code each variant's task-type label.
   @JsonIgnore
   public boolean hasMaterializedViewTaskWithDefinedSql() {

Review Comment:
   `hasMaterializedViewTaskWithDefinedSql()` now treats any task config with 
`definedSQL` as MV identity, but `validateMaterializedViewConfigUpdate()` still 
reads `definedSQL` only from the built-in `MaterializedViewTask`. That lets a 
downstream MV-style task change `definedSQL` in place without tripping the 
immutability guard, even though the persisted MV data was built from the old 
definition. The update path needs the same generic task scan as this helper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to