hongkunxu commented on code in PR #18544:
URL: https://github.com/apache/pinot/pull/18544#discussion_r3301253442


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -5374,6 +5434,95 @@ public static void main(String[] args) throws Exception {
 
   // ── MV Consistency Manager helpers ──
 
+  /// Persists [MaterializedViewDefinitionMetadata] to ZooKeeper for an MV 
table at create
+  /// time so [#notifyMaterializedViewConsistencyManagerForTableCreate] can 
register the MV
+  /// against the authoritative `baseTables` list rather than depending on its 
`definedSQL`
+  /// re-parse fallback. The scheduler's cold-start path will also find the 
znode and skip
+  /// its own lazy initialisation (see
+  /// 
[org.apache.pinot.materializedview.scheduler.MaterializedViewTaskScheduler#getWatermarkMs]).
+  ///
+  /// Write semantics:
+  ///
+  ///   - **Best-effort**: this runs after the table is otherwise fully set up
+  ///       (TableConfig persisted, ideal state assigned, BrokerResource 
updated). Throwing
+  ///       here would leave the cluster in a half-committed state — the table 
is already
+  ///       visible to brokers and the rollback handler at the top of 
`addTable` has been
+  ///       passed. Any exception is logged at WARN and we continue; the 
notify path's
+  ///       `extractSourceTableName` fallback keeps registration correct.
+  ///   - **`createIfAbsent`**: an existing znode from a prior scheduler 
cold-start or a
+  ///       prior CREATE retry is left in place. The scheduler's 
createIfAbsent uses the same
+  ///       contract, and the
+  ///       [MaterializedViewDefinitionMetadataBuilder] produces 
byte-identical content for
+  ///       the same MV, so an existing znode is by construction equivalent to 
what we would
+  ///       write.
+  ///   - **Non-MV tables**: early-returns without any ZK round-trips.
+  private void persistMaterializedViewDefinitionMetadataBestEffort(TableConfig 
tableConfig) {
+    if (!tableConfig.isMaterializedView()) {
+      return;
+    }
+    String tableNameWithType = tableConfig.getTableName();
+    try {
+      Map<String, String> taskConfigs = 
tableConfig.getMaterializedViewTaskConfigs();
+      if (taskConfigs == null) {
+        LOGGER.warn("MV table {} has no MaterializedViewTask config; skipping 
definition metadata persist",
+            tableNameWithType);
+        return;
+      }
+      String definedSql = 
taskConfigs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
+      if (definedSql == null || definedSql.isEmpty()) {
+        LOGGER.warn("MV table {} has no definedSQL; skipping definition 
metadata persist", tableNameWithType);
+        return;
+      }
+      // The source table name MUST be a simple identifier — 
`MaterializedViewAnalyzer` rejects
+      // anything else at validate time, so this call is safe for any MV that 
reached addTable.
+      String sourceRawTableName = 
MaterializedViewAnalyzer.extractSourceTableName(definedSql);
+
+      String sourceTableWithType = 
TableNameBuilder.OFFLINE.tableNameWithType(sourceRawTableName);
+      TableConfig sourceTableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, sourceTableWithType);
+      if (sourceTableConfig == null) {
+        sourceTableWithType = 
TableNameBuilder.REALTIME.tableNameWithType(sourceRawTableName);
+        sourceTableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, 
sourceTableWithType);
+      }
+      if (sourceTableConfig == null) {
+        LOGGER.warn("MV table {} source table '{}' not found in OFFLINE or 
REALTIME variants; "
+            + "skipping definition metadata persist", tableNameWithType, 
sourceRawTableName);
+        return;
+      }
+      Schema sourceSchema = ZKMetadataProvider.getSchema(_propertyStore, 
sourceRawTableName);
+      if (sourceSchema == null) {
+        LOGGER.warn("MV table {} source table '{}' has no schema; skipping 
definition metadata persist",
+            tableNameWithType, sourceRawTableName);
+        return;
+      }
+      String viewRawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      Schema viewSchema = ZKMetadataProvider.getSchema(_propertyStore, 
viewRawTableName);
+      if (viewSchema == null) {
+        LOGGER.warn("MV table {} has no schema; skipping definition metadata 
persist", tableNameWithType);
+        return;
+      }
+
+      // partitionExprMaps comes from analyzer post-validation; we re-extract 
from the
+      // already-validated definedSQL + viewSchema, which is cheap (single 
Calcite parse).
+      Map<String, String> partitionExprMaps =
+          MaterializedViewAnalyzer.extractPartitionExprMaps(definedSql, 
viewSchema);
+
+      MaterializedViewDefinitionMetadata definition = 
MaterializedViewDefinitionMetadataBuilder.build(
+          tableNameWithType, tableConfig, sourceTableConfig, sourceSchema, 
sourceRawTableName,
+          definedSql, partitionExprMaps);
+      if 
(MaterializedViewDefinitionMetadataUtils.createIfAbsent(_propertyStore, 
definition)) {
+        LOGGER.info("Adding table {}: Persisted MV definition metadata 
(baseTables={})",
+            tableNameWithType, definition.getBaseTables());
+      } else {
+        LOGGER.info("Adding table {}: MV definition metadata already exists; 
leaving in place",
+            tableNameWithType);

Review Comment:
   Addressed in the latest push: 
`PinotHelixResourceManager#backfillMaterializedViewReverseIndex`
   runs at controller startup (wired in `BaseControllerStarter` between
   `MaterializedViewConsistencyManager#init` and `register*`) and walks the
   authoritative `TableConfig#isMaterializedView()` list — not the 
definition-znode
   children — so legacy MVs and MVs whose in-session persist failed are both
   recovered into the reverse index via `extractSourceTableName(definedSQL)` 
(same
   fallback as the in-session create path) before the delete-guard in
   `deleteTable` ever runs. Symmetric `extractSourceTableName` fallback added to
   `notifyMaterializedViewConsistencyManagerForTableDrop` so a backfilled MV 
whose
   phase-2 znode write also fails still cleans up cleanly on drop. Tests in
   `PinotHelixResourceManagerMaterializedViewBackfillTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to