xiangfu0 commented on code in PR #18544:
URL: https://github.com/apache/pinot/pull/18544#discussion_r3298092748
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -5374,6 +5434,95 @@ public static void main(String[] args) throws Exception {
// ── MV Consistency Manager helpers ──
+ /// Persists [MaterializedViewDefinitionMetadata] to ZooKeeper for an MV
table at create
+ /// time so [#notifyMaterializedViewConsistencyManagerForTableCreate] can
register the MV
+ /// against the authoritative `baseTables` list rather than depending on its
`definedSQL`
+ /// re-parse fallback. The scheduler's cold-start path will also find the
znode and skip
+ /// its own lazy initialisation (see
+ ///
[org.apache.pinot.materializedview.scheduler.MaterializedViewTaskScheduler#getWatermarkMs]).
+ ///
+ /// Write semantics:
+ ///
+ /// - **Best-effort**: this runs after the table is otherwise fully set up
+ /// (TableConfig persisted, ideal state assigned, BrokerResource
updated). Throwing
+ /// here would leave the cluster in a half-committed state — the table
is already
+ /// visible to brokers and the rollback handler at the top of
`addTable` has been
+ /// passed. Any exception is logged at WARN and we continue; the
notify path's
+ /// `extractSourceTableName` fallback keeps registration correct.
+ /// - **`createIfAbsent`**: an existing znode from a prior scheduler
cold-start or a
+ /// prior CREATE retry is left in place. The scheduler's
createIfAbsent uses the same
+ /// contract, and the
+ /// [MaterializedViewDefinitionMetadataBuilder] produces
byte-identical content for
+ /// the same MV, so an existing znode is by construction equivalent to
what we would
+ /// write.
+ /// - **Non-MV tables**: early-returns without any ZK round-trips.
+ private void persistMaterializedViewDefinitionMetadataBestEffort(TableConfig
tableConfig) {
+ if (!tableConfig.isMaterializedView()) {
+ return;
+ }
+ String tableNameWithType = tableConfig.getTableName();
+ try {
+ Map<String, String> taskConfigs =
tableConfig.getMaterializedViewTaskConfigs();
+ if (taskConfigs == null) {
+ LOGGER.warn("MV table {} has no MaterializedViewTask config; skipping
definition metadata persist",
+ tableNameWithType);
+ return;
+ }
+ String definedSql =
taskConfigs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
+ if (definedSql == null || definedSql.isEmpty()) {
+ LOGGER.warn("MV table {} has no definedSQL; skipping definition
metadata persist", tableNameWithType);
+ return;
+ }
+ // The source table name MUST be a simple identifier —
`MaterializedViewAnalyzer` rejects
+ // anything else at validate time, so this call is safe for any MV that
reached addTable.
+ String sourceRawTableName =
MaterializedViewAnalyzer.extractSourceTableName(definedSql);
+
+ String sourceTableWithType =
TableNameBuilder.OFFLINE.tableNameWithType(sourceRawTableName);
+ TableConfig sourceTableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, sourceTableWithType);
+ if (sourceTableConfig == null) {
+ sourceTableWithType =
TableNameBuilder.REALTIME.tableNameWithType(sourceRawTableName);
+ sourceTableConfig = ZKMetadataProvider.getTableConfig(_propertyStore,
sourceTableWithType);
+ }
+ if (sourceTableConfig == null) {
+ LOGGER.warn("MV table {} source table '{}' not found in OFFLINE or
REALTIME variants; "
+ + "skipping definition metadata persist", tableNameWithType,
sourceRawTableName);
+ return;
+ }
+ Schema sourceSchema = ZKMetadataProvider.getSchema(_propertyStore,
sourceRawTableName);
+ if (sourceSchema == null) {
+ LOGGER.warn("MV table {} source table '{}' has no schema; skipping
definition metadata persist",
+ tableNameWithType, sourceRawTableName);
+ return;
+ }
+ String viewRawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ Schema viewSchema = ZKMetadataProvider.getSchema(_propertyStore,
viewRawTableName);
+ if (viewSchema == null) {
+ LOGGER.warn("MV table {} has no schema; skipping definition metadata
persist", tableNameWithType);
+ return;
+ }
+
+ // partitionExprMaps comes from analyzer post-validation; we re-extract
from the
+ // already-validated definedSQL + viewSchema, which is cheap (single
Calcite parse).
+ Map<String, String> partitionExprMaps =
+ MaterializedViewAnalyzer.extractPartitionExprMaps(definedSql,
viewSchema);
+
+ MaterializedViewDefinitionMetadata definition =
MaterializedViewDefinitionMetadataBuilder.build(
+ tableNameWithType, tableConfig, sourceTableConfig, sourceSchema,
sourceRawTableName,
+ definedSql, partitionExprMaps);
+ if
(MaterializedViewDefinitionMetadataUtils.createIfAbsent(_propertyStore,
definition)) {
+ LOGGER.info("Adding table {}: Persisted MV definition metadata
(baseTables={})",
+ tableNameWithType, definition.getBaseTables());
+ } else {
+ LOGGER.info("Adding table {}: MV definition metadata already exists;
leaving in place",
+ tableNameWithType);
Review Comment:
This fallback keeps the current controller session alive, but the new delete
guard later trusts the reverse index rebuilt from MV definition znodes.
Existing MVs created before definition metadata existed, or MVs whose
definition persist failed here, will be missing after restart, so `DROP TABLE`
can still delete the base table and orphan the MV. We need a startup backfill
or fallback scan before relying on definition metadata for safety.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]