hongkunxu commented on code in PR #18544:
URL: https://github.com/apache/pinot/pull/18544#discussion_r3301253442
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -5374,6 +5434,95 @@ public static void main(String[] args) throws Exception {
// ── MV Consistency Manager helpers ──
+ /// Persists [MaterializedViewDefinitionMetadata] to ZooKeeper for an MV
table at create
+ /// time so [#notifyMaterializedViewConsistencyManagerForTableCreate] can
register the MV
+ /// against the authoritative `baseTables` list rather than depending on its
`definedSQL`
+ /// re-parse fallback. The scheduler's cold-start path will also find the
znode and skip
+ /// its own lazy initialisation (see
+ ///
[org.apache.pinot.materializedview.scheduler.MaterializedViewTaskScheduler#getWatermarkMs]).
+ ///
+ /// Write semantics:
+ ///
+ /// - **Best-effort**: this runs after the table is otherwise fully set up
+ /// (TableConfig persisted, ideal state assigned, BrokerResource
updated). Throwing
+ /// here would leave the cluster in a half-committed state — the table
is already
+ /// visible to brokers and the rollback handler at the top of
`addTable` has been
+ /// passed. Any exception is logged at WARN and we continue; the
notify path's
+ /// `extractSourceTableName` fallback keeps registration correct.
+ /// - **`createIfAbsent`**: an existing znode from a prior scheduler
cold-start or a
+ /// prior CREATE retry is left in place. The scheduler's
createIfAbsent uses the same
+ /// contract, and the
+ /// [MaterializedViewDefinitionMetadataBuilder] produces
byte-identical content for
+ /// the same MV, so an existing znode is by construction equivalent to
what we would
+ /// write.
+ /// - **Non-MV tables**: early-returns without any ZK round-trips.
+ private void persistMaterializedViewDefinitionMetadataBestEffort(TableConfig
tableConfig) {
+ if (!tableConfig.isMaterializedView()) {
+ return;
+ }
+ String tableNameWithType = tableConfig.getTableName();
+ try {
+ Map<String, String> taskConfigs =
tableConfig.getMaterializedViewTaskConfigs();
+ if (taskConfigs == null) {
+ LOGGER.warn("MV table {} has no MaterializedViewTask config; skipping
definition metadata persist",
+ tableNameWithType);
+ return;
+ }
+ String definedSql =
taskConfigs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
+ if (definedSql == null || definedSql.isEmpty()) {
+ LOGGER.warn("MV table {} has no definedSQL; skipping definition
metadata persist", tableNameWithType);
+ return;
+ }
+ // The source table name MUST be a simple identifier —
`MaterializedViewAnalyzer` rejects
+ // anything else at validate time, so this call is safe for any MV that
reached addTable.
+ String sourceRawTableName =
MaterializedViewAnalyzer.extractSourceTableName(definedSql);
+
+ String sourceTableWithType =
TableNameBuilder.OFFLINE.tableNameWithType(sourceRawTableName);
+ TableConfig sourceTableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, sourceTableWithType);
+ if (sourceTableConfig == null) {
+ sourceTableWithType =
TableNameBuilder.REALTIME.tableNameWithType(sourceRawTableName);
+ sourceTableConfig = ZKMetadataProvider.getTableConfig(_propertyStore,
sourceTableWithType);
+ }
+ if (sourceTableConfig == null) {
+ LOGGER.warn("MV table {} source table '{}' not found in OFFLINE or
REALTIME variants; "
+ + "skipping definition metadata persist", tableNameWithType,
sourceRawTableName);
+ return;
+ }
+ Schema sourceSchema = ZKMetadataProvider.getSchema(_propertyStore,
sourceRawTableName);
+ if (sourceSchema == null) {
+ LOGGER.warn("MV table {} source table '{}' has no schema; skipping
definition metadata persist",
+ tableNameWithType, sourceRawTableName);
+ return;
+ }
+ String viewRawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ Schema viewSchema = ZKMetadataProvider.getSchema(_propertyStore,
viewRawTableName);
+ if (viewSchema == null) {
+ LOGGER.warn("MV table {} has no schema; skipping definition metadata
persist", tableNameWithType);
+ return;
+ }
+
+ // partitionExprMaps comes from analyzer post-validation; we re-extract
from the
+ // already-validated definedSQL + viewSchema, which is cheap (single
Calcite parse).
+ Map<String, String> partitionExprMaps =
+ MaterializedViewAnalyzer.extractPartitionExprMaps(definedSql,
viewSchema);
+
+ MaterializedViewDefinitionMetadata definition =
MaterializedViewDefinitionMetadataBuilder.build(
+ tableNameWithType, tableConfig, sourceTableConfig, sourceSchema,
sourceRawTableName,
+ definedSql, partitionExprMaps);
+ if
(MaterializedViewDefinitionMetadataUtils.createIfAbsent(_propertyStore,
definition)) {
+ LOGGER.info("Adding table {}: Persisted MV definition metadata
(baseTables={})",
+ tableNameWithType, definition.getBaseTables());
+ } else {
+ LOGGER.info("Adding table {}: MV definition metadata already exists;
leaving in place",
+ tableNameWithType);
Review Comment:
Addressed in the latest push:
`PinotHelixResourceManager#backfillMaterializedViewReverseIndex`
runs at controller startup (wired in `BaseControllerStarter` between
`MaterializedViewConsistencyManager#init` and `register*`) and walks the
authoritative `TableConfig#isMaterializedView()` list — not the
definition-znode
children — so legacy MVs and MVs whose in-session persist failed are both
recovered into the reverse index via `extractSourceTableName(definedSQL)`
(same
fallback as the in-session create path) before the delete-guard in
`deleteTable` ever runs. Symmetric `extractSourceTableName` fallback added to
`notifyMaterializedViewConsistencyManagerForTableDrop` so a backfilled MV
whose
phase-2 znode write also fails still cleans up cleanly on drop. Tests in
`PinotHelixResourceManagerMaterializedViewBackfillTest`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]