rymarm commented on code in PR #3036:
URL: https://github.com/apache/drill/pull/3036#discussion_r3442457008


##########
exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/MaterializedView.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.dotdrill;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * Represents a materialized view definition stored as a JSON file with
+ * .materialized_view.drill extension. The actual data is stored separately
+ * in Parquet format in the workspace directory.
+ */
+@JsonTypeName("materialized_view")
+public class MaterializedView {
+
+  /**
+   * Represents the refresh status of the materialized view.
+   */
+  public enum RefreshStatus {
+    /** The materialized view data is complete and up-to-date with its 
definition */
+    COMPLETE,
+    /** The materialized view data needs to be refreshed */
+    INCOMPLETE
+  }
+
+  private final String name;
+  private String sql;
+  private List<View.Field> fields;
+
+  /** Current schema when materialized view is created (not the schema to 
which view belongs to) */
+  private List<String> workspaceSchemaPath;
+
+  /** The relative path where the materialized data is stored (typically the 
view name) */
+  @JsonInclude(Include.NON_NULL)
+  private String dataStoragePath;

Review Comment:
   In 
`exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMaterializedViewTable.java`
 you specified, that the MV data is stored within `{name}_mv_data/`:
   ```
    * A materialized view stores:
    * <ul>
    *   <li>Definition file (.materialized_view.drill) - JSON with name, SQL, 
schema info</li>
    *   <li>Data directory ({name}_mv_data/) - Parquet files with pre-computed 
results</li>
    * </ul>
   ```
   
   In the unit tests, you used the MV name for the data storage, but at the 
same time, you used `{name}_mv_data` in the following places:
   - 
`exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMaterializedViewTable.java`
   - 
`exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/MaterializedViewRewriter.java`
   - 
`exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java`
   
   We need to agree on what name pattern to use: `{name}_mv_data` or simply 
`{name}`. 



##########
exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/MaterializedView.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.dotdrill;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * Represents a materialized view definition stored as a JSON file with
+ * .materialized_view.drill extension. The actual data is stored separately
+ * in Parquet format in the workspace directory.
+ */
+@JsonTypeName("materialized_view")
+public class MaterializedView {
+
+  /**
+   * Represents the refresh status of the materialized view.
+   */
+  public enum RefreshStatus {
+    /** The materialized view data is complete and up-to-date with its 
definition */
+    COMPLETE,
+    /** The materialized view data needs to be refreshed */
+    INCOMPLETE
+  }
+
+  private final String name;
+  private String sql;
+  private List<View.Field> fields;
+
+  /** Current schema when materialized view is created (not the schema to 
which view belongs to) */
+  private List<String> workspaceSchemaPath;
+
+  /** The relative path where the materialized data is stored (typically the 
view name) */
+  @JsonInclude(Include.NON_NULL)
+  private String dataStoragePath;
+
+  /** Timestamp of the last successful refresh in milliseconds since epoch */
+  @JsonInclude(Include.NON_NULL)
+  private Long lastRefreshTime;
+
+  /** Current refresh status of the materialized view */
+  @JsonInclude(Include.NON_NULL)
+  private RefreshStatus refreshStatus;
+
+  public MaterializedView(String name, String sql, RelDataType rowType, 
List<String> workspaceSchemaPath) {
+    this(name,
+        sql,
+        rowType.getFieldList().stream()
+            .map(f -> new View.Field(f.getName(), f.getType()))
+            .collect(Collectors.toList()),
+        workspaceSchemaPath,
+        name,  // data storage path defaults to view name

Review Comment:
   We need to agree on what name pattern to use by default: `{name}_mv_data` or 
simply `{name}`.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/MaterializedViewRewriter.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptMaterialization;
+import org.apache.calcite.plan.RelOptMaterializations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.exec.dotdrill.MaterializedView;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for materialized view query rewriting.
+ * <p>
+ * When enabled via planner.enable_materialized_view_rewrite, this class 
attempts
+ * to rewrite queries to use materialized views when beneficial.
+ * <p>
+ * Uses Calcite's {@link RelOptMaterializations#useMaterializedViews} API which
+ * normalizes both the query and MV definitions before performing structural
+ * matching via {@link org.apache.calcite.plan.SubstitutionVisitor}.
+ * <p>
+ * Materialized views are discovered by iterating over enabled file-based
+ * storage plugins (the only plugin type that supports MVs) and force-loading
+ * their schemas to find .materialized_view.drill files.
+ */
+public class MaterializedViewRewriter {
+  private static final Logger logger = 
LoggerFactory.getLogger(MaterializedViewRewriter.class);
+
+  private final QueryContext context;
+  private final SchemaPlus rootSchema;
+  private final SqlConverter sqlConverter;
+
+  public MaterializedViewRewriter(QueryContext context, SchemaPlus rootSchema, 
SqlConverter sqlConverter) {
+    this.context = context;
+    this.rootSchema = rootSchema;
+    this.sqlConverter = sqlConverter;
+  }
+
+  /**
+   * Attempts to rewrite the given RelNode to use a materialized view.
+   *
+   * @param queryRel the query plan to potentially rewrite
+   * @return the rewritten plan using an MV, or the original plan if no 
rewrite is possible
+   */
+  public RelNode rewrite(RelNode queryRel) {
+    if (!context.getPlannerSettings().isMaterializedViewRewriteEnabled()) {
+      return queryRel;
+    }
+
+    // Find all available materialized views that have been refreshed

Review Comment:
   Actually, `findCandidateMaterializedViews()` returns both refreshed and 
unrefreshed MVs.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/MaterializedViewRewriter.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptMaterialization;
+import org.apache.calcite.plan.RelOptMaterializations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.exec.dotdrill.MaterializedView;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for materialized view query rewriting.
+ * <p>
+ * When enabled via planner.enable_materialized_view_rewrite, this class 
attempts
+ * to rewrite queries to use materialized views when beneficial.
+ * <p>
+ * Uses Calcite's {@link RelOptMaterializations#useMaterializedViews} API which
+ * normalizes both the query and MV definitions before performing structural
+ * matching via {@link org.apache.calcite.plan.SubstitutionVisitor}.
+ * <p>
+ * Materialized views are discovered by iterating over enabled file-based
+ * storage plugins (the only plugin type that supports MVs) and force-loading
+ * their schemas to find .materialized_view.drill files.
+ */
+public class MaterializedViewRewriter {
+  private static final Logger logger = 
LoggerFactory.getLogger(MaterializedViewRewriter.class);
+
+  private final QueryContext context;
+  private final SchemaPlus rootSchema;
+  private final SqlConverter sqlConverter;
+
+  public MaterializedViewRewriter(QueryContext context, SchemaPlus rootSchema, 
SqlConverter sqlConverter) {
+    this.context = context;
+    this.rootSchema = rootSchema;
+    this.sqlConverter = sqlConverter;
+  }
+
+  /**
+   * Attempts to rewrite the given RelNode to use a materialized view.
+   *
+   * @param queryRel the query plan to potentially rewrite
+   * @return the rewritten plan using an MV, or the original plan if no 
rewrite is possible
+   */
+  public RelNode rewrite(RelNode queryRel) {
+    if (!context.getPlannerSettings().isMaterializedViewRewriteEnabled()) {
+      return queryRel;
+    }
+
+    // Find all available materialized views that have been refreshed
+    List<MaterializedViewCandidate> candidates = 
findCandidateMaterializedViews();
+
+    if (candidates.isEmpty()) {
+      logger.debug("No refreshed materialized views available for rewriting");
+      return queryRel;
+    }
+
+    logger.debug("Found {} materialized view candidates for potential 
rewriting", candidates.size());
+
+    // Build Calcite RelOptMaterialization objects for each refreshed candidate
+    List<RelOptMaterialization> materializations = new ArrayList<>();
+    for (MaterializedViewCandidate candidate : candidates) {
+      if (!candidate.isRefreshed()) {
+        logger.debug("Skipping MV {} - not refreshed", candidate.getName());
+        continue;
+      }
+
+      try {
+        RelOptMaterialization mat = buildMaterialization(candidate);
+        if (mat != null) {
+          materializations.add(mat);
+        }
+      } catch (Exception e) {
+        logger.debug("Failed to build materialization for MV {}: {}", 
candidate.getName(), e.getMessage());
+      }
+    }
+
+    if (materializations.isEmpty()) {
+      logger.debug("No valid materializations could be built");
+      return queryRel;
+    }
+
+    // Use Calcite's materialized view matching API which normalizes both the
+    // query and MV definitions (trimming unused fields, converting 
Filter/Project
+    // to Calc, merging, etc.) before performing structural matching.
+    try {
+      List<Pair<RelNode, List<RelOptMaterialization>>> results =
+          RelOptMaterializations.useMaterializedViews(queryRel, 
materializations);
+
+      if (!results.isEmpty()) {
+        RelNode rewritten = results.get(0).left;
+        if (logger.isInfoEnabled()) {
+          List<RelOptMaterialization> usedMVs = results.get(0).right;
+          logger.info("Query rewritten to use materialized view(s): {}",
+              !usedMVs.isEmpty() ? usedMVs.get(0).qualifiedTableName : 
"unknown");
+        }
+        return rewritten;
+      }
+    } catch (Exception e) {
+      logger.debug("Materialized view rewriting failed: {}", e.getMessage());
+    }
+
+    logger.debug("No materialized view matched the query");
+    return queryRel;
+  }
+
+  /**
+   * Builds a Calcite {@link RelOptMaterialization} for a candidate MV.
+   */
+  private RelOptMaterialization buildMaterialization(MaterializedViewCandidate 
candidate) {
+    RelNode mvQueryRel = parseMvSql(candidate);
+    if (mvQueryRel == null) {
+      return null;
+    }
+
+    RelNode mvTableRel = buildMvScanRel(candidate);
+    if (mvTableRel == null) {
+      return null;
+    }
+
+    List<String> qualifiedTableName = java.util.Arrays.asList(

Review Comment:
   Is the fully qualified name used intentionally?



##########
exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java:
##########
@@ -351,6 +354,278 @@ public void dropView(String viewName) throws IOException {
       getFS().delete(getViewPath(viewName), false);
     }
 
+    private Path getMaterializedViewPath(String name) {
+      return DotDrillType.MATERIALIZED_VIEW.getPath(config.getLocation(), 
name);
+    }
+
+    private Path getMaterializedViewDataPath(String name) {
+      // Use _mv_data suffix to distinguish data directory from MV definition 
lookup
+      return new Path(config.getLocation(), name + "_mv_data");
+    }
+
+    @Override
+    public boolean createMaterializedView(MaterializedView materializedView) 
throws IOException {
+      String viewName = materializedView.getName();
+      Path viewPath = getMaterializedViewPath(viewName);
+      Path dataPath = getMaterializedViewDataPath(viewName);

Review Comment:
   Use `materializedView.getDataStoragePath()` instead.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java:
##########
@@ -181,6 +191,33 @@ public List<Records.View> views(String schemaPath, 
SchemaPlus schema) {
         .collect(Collectors.toList());
     }
 
+    @Override
+    public List<Records.MaterializedView> materializedViews(String schemaPath, 
SchemaPlus schema) {
+      AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+
+      return drillSchema.getTablesByNames(schema.getTableNames()).stream()
+        .filter(pair -> pair.getValue().getJdbcTableType() == 
Schema.TableType.MATERIALIZED_VIEW)
+        .filter(pair -> filterEvaluator.shouldVisitTable(schemaPath, 
pair.getKey(), pair.getValue().getJdbcTableType()))
+        .map(pair -> {
+          Table table = pair.getValue();
+          String viewSql = table instanceof DrillViewInfoProvider
+              ? ((DrillViewInfoProvider) table).getViewSql() : "";
+          String refreshStatus = null;
+          Long lastRefreshTime = null;
+          String dataLocation = null;
+          if (table instanceof DrillMaterializedViewTable) {
+            DrillMaterializedViewTable mvTable = (DrillMaterializedViewTable) 
table;

Review Comment:
   Use pattern matching feature: 
https://docs.oracle.com/en/java/javase/17/language/pattern-matching-instanceof.html.
 Drill use Java 17+



##########
exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java:
##########
@@ -351,6 +354,278 @@ public void dropView(String viewName) throws IOException {
       getFS().delete(getViewPath(viewName), false);
     }
 
+    private Path getMaterializedViewPath(String name) {
+      return DotDrillType.MATERIALIZED_VIEW.getPath(config.getLocation(), 
name);
+    }
+
+    private Path getMaterializedViewDataPath(String name) {
+      // Use _mv_data suffix to distinguish data directory from MV definition 
lookup
+      return new Path(config.getLocation(), name + "_mv_data");
+    }
+
+    @Override
+    public boolean createMaterializedView(MaterializedView materializedView) 
throws IOException {
+      String viewName = materializedView.getName();
+      Path viewPath = getMaterializedViewPath(viewName);
+      Path dataPath = getMaterializedViewDataPath(viewName);
+
+      boolean replaced = getFS().exists(viewPath);
+
+      // If replacing, first drop the old data
+      if (replaced) {
+        if (getFS().exists(dataPath)) {
+          getFS().delete(dataPath, true);
+        }
+      }
+
+      // Create the data directory for the materialized view
+      final FsPermission dirPerms = new FsPermission(
+          
schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      getFS().mkdirs(dataPath, dirPerms);
+
+      // Set the data storage path in the materialized view
+      materializedView.setDataStoragePath(viewName);
+
+      // Write the materialized view definition file
+      final FsPermission viewPerms = new FsPermission(
+          
schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      try (OutputStream stream = DrillFileSystem.create(getFS(), viewPath, 
viewPerms)) {
+        mapper.writeValue(stream, materializedView);
+      }
+
+      // Sync to metastore if enabled
+      syncMaterializedViewToMetastore(materializedView);
+
+      // Mark as complete (data will be populated by the handler via CTAS-like 
operation)
+      return replaced;
+    }
+
+    @Override
+    public void dropMaterializedView(String viewName) throws IOException {
+      Path viewPath = getMaterializedViewPath(viewName);
+      Path dataPath = getMaterializedViewDataPath(viewName);
+
+      // Delete the definition file
+      if (getFS().exists(viewPath)) {
+        getFS().delete(viewPath, false);
+      }
+
+      // Delete the data directory
+      if (getFS().exists(dataPath)) {
+        getFS().delete(dataPath, true);
+      }
+
+      // Remove from metastore if enabled
+      removeMaterializedViewFromMetastore(viewName);
+    }
+
+    @Override
+    public void refreshMaterializedView(String viewName) throws IOException {
+      // Read the existing materialized view definition
+      MaterializedView mv = getMaterializedView(viewName);
+      if (mv == null) {
+        throw UserException.validationError()
+            .message("Materialized view [%s] not found in schema [%s]", 
viewName, getFullSchemaName())
+            .build(logger);
+      }
+
+      Path dataPath = getMaterializedViewDataPath(viewName);

Review Comment:
   Use `materializedView.getDataStoragePath()` instead.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMaterializedViewTable.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptTable.ToRelContext;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Schema.TableType;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.drill.exec.dotdrill.MaterializedView;
+import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.drill.exec.planner.sql.conversion.DrillViewExpander;
+
+/**
+ * Represents a materialized view in the Drill query planning.
+ * <p>
+ * A materialized view stores:
+ * <ul>
+ *   <li>Definition file (.materialized_view.drill) - JSON with name, SQL, 
schema info</li>
+ *   <li>Data directory ({name}_mv_data/) - Parquet files with pre-computed 
results</li>
+ * </ul>
+ * <p>
+ * <b>Behavior:</b>
+ * <ul>
+ *   <li>Before REFRESH: queries expand the SQL definition (like a view)</li>
+ *   <li>After REFRESH: queries scan from pre-computed Parquet data</li>
+ * </ul>
+ *
+ * @see org.apache.drill.exec.dotdrill.MaterializedView
+ */
+public class DrillMaterializedViewTable implements TranslatableTable, 
DrillViewInfoProvider {
+
+  private final MaterializedView materializedView;
+  private final String viewOwner;
+  private final ViewExpansionContext viewExpansionContext;
+  private final String workspaceLocation;
+
+  public DrillMaterializedViewTable(MaterializedView materializedView, String 
viewOwner,
+                                    ViewExpansionContext viewExpansionContext, 
String workspaceLocation) {
+    this.materializedView = materializedView;
+    this.viewOwner = viewOwner;
+    this.viewExpansionContext = viewExpansionContext;
+    this.workspaceLocation = workspaceLocation;
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    return materializedView.getRowType(typeFactory);
+  }
+
+  @Override
+  public Statistic getStatistic() {
+    return Statistics.UNKNOWN;
+  }
+
+  /**
+   * Converts this materialized view to a RelNode for query planning.
+   * <p>
+   * If the MV has been refreshed (data exists), scans from the pre-computed 
Parquet data.
+   * Otherwise, expands the SQL definition like a regular view.
+   */
+  @Override
+  public RelNode toRel(ToRelContext context, RelOptTable relOptTable) {
+    DrillViewExpander viewExpander = viewExpansionContext.getViewExpander();
+    ViewExpansionContext.ViewExpansionToken token = null;
+    try {
+      RelDataType rowType = relOptTable.getRowType();
+      RelNode rel;
+
+      // Check if materialized data exists (REFRESH has been called)
+      boolean hasData = materializedView.getRefreshStatus() == 
MaterializedView.RefreshStatus.COMPLETE;
+
+      // Build the SQL to execute - either scan data or expand definition
+      String sqlToExpand;
+      if (hasData) {
+        // Scan from the pre-computed data directory
+        sqlToExpand = buildDataScanSql();
+      } else {
+        // No data yet - expand the SQL definition like a view
+        sqlToExpand = materializedView.getSql();
+      }
+
+      // Always use the workspace schema path for context - needed for table 
resolution
+      List<String> schemaPath = materializedView.getWorkspaceSchemaPath();
+
+      if (viewExpansionContext.isImpersonationEnabled()) {
+        token = viewExpansionContext.reserveViewExpansionToken(viewOwner);
+        rel = viewExpander.expandView(sqlToExpand, token.getSchemaTree(), 
schemaPath).rel;
+      } else {
+        // When scanning data, pass null for rowType to let Parquet schema be 
inferred
+        // When expanding SQL definition, use the MV's row type
+        RelDataType typeHint = hasData ? null : rowType;
+        rel = viewExpander.expandView(typeHint, sqlToExpand, schemaPath, 
Collections.emptyList()).rel;
+      }
+
+      return rel;
+    } finally {
+      if (token != null) {
+        token.release();
+      }
+    }
+  }
+
+  /**
+   * Builds SQL to scan the materialized data directory.
+   * The data is stored in {workspace}/{mvName}_mv_data/ directory.
+   * We explicitly select the MV's columns to ensure proper schema matching.
+   */
+  private String buildDataScanSql() {
+    String dataTableName = materializedView.getName() + "_mv_data";
+
+    // Build explicit column list from the MV's field definitions
+    List<String> fieldNames = materializedView.getFields().stream()
+        .map(f -> f.getName())
+        .collect(java.util.stream.Collectors.toList());
+    if (fieldNames.isEmpty()) {
+      // Fallback to SELECT * if no fields defined (shouldn't happen for 
non-dynamic MVs)
+      return "SELECT * FROM `" + dataTableName + "`";

Review Comment:
   Apache Drill allows to configure the identifier quote character. Won't it 
fail if `planner.parser.quoting_identifiers` is set to double quotes?
   https://drill.apache.org/docs/lexical-structure#identifier-quotes
   
   And doesn't it should include either the workspace name?



##########
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMaterializedViewTable.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptTable.ToRelContext;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Schema.TableType;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.drill.exec.dotdrill.MaterializedView;
+import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.drill.exec.planner.sql.conversion.DrillViewExpander;
+
+/**
+ * Represents a materialized view in the Drill query planning.
+ * <p>
+ * A materialized view stores:
+ * <ul>
+ *   <li>Definition file (.materialized_view.drill) - JSON with name, SQL, 
schema info</li>
+ *   <li>Data directory ({name}_mv_data/) - Parquet files with pre-computed 
results</li>
+ * </ul>
+ * <p>
+ * <b>Behavior:</b>
+ * <ul>
+ *   <li>Before REFRESH: queries expand the SQL definition (like a view)</li>
+ *   <li>After REFRESH: queries scan from pre-computed Parquet data</li>
+ * </ul>
+ *
+ * @see org.apache.drill.exec.dotdrill.MaterializedView
+ */
+public class DrillMaterializedViewTable implements TranslatableTable, 
DrillViewInfoProvider {
+
+  private final MaterializedView materializedView;
+  private final String viewOwner;
+  private final ViewExpansionContext viewExpansionContext;
+  private final String workspaceLocation;
+
+  public DrillMaterializedViewTable(MaterializedView materializedView, String 
viewOwner,
+                                    ViewExpansionContext viewExpansionContext, 
String workspaceLocation) {
+    this.materializedView = materializedView;
+    this.viewOwner = viewOwner;
+    this.viewExpansionContext = viewExpansionContext;
+    this.workspaceLocation = workspaceLocation;
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    return materializedView.getRowType(typeFactory);
+  }
+
+  @Override
+  public Statistic getStatistic() {
+    return Statistics.UNKNOWN;
+  }
+
+  /**
+   * Converts this materialized view to a RelNode for query planning.
+   * <p>
+   * If the MV has been refreshed (data exists), scans from the pre-computed 
Parquet data.
+   * Otherwise, expands the SQL definition like a regular view.
+   */
+  @Override
+  public RelNode toRel(ToRelContext context, RelOptTable relOptTable) {
+    DrillViewExpander viewExpander = viewExpansionContext.getViewExpander();
+    ViewExpansionContext.ViewExpansionToken token = null;
+    try {
+      RelDataType rowType = relOptTable.getRowType();
+      RelNode rel;
+
+      // Check if materialized data exists (REFRESH has been called)
+      boolean hasData = materializedView.getRefreshStatus() == 
MaterializedView.RefreshStatus.COMPLETE;
+
+      // Build the SQL to execute - either scan data or expand definition
+      String sqlToExpand;
+      if (hasData) {
+        // Scan from the pre-computed data directory
+        sqlToExpand = buildDataScanSql();
+      } else {
+        // No data yet - expand the SQL definition like a view
+        sqlToExpand = materializedView.getSql();
+      }
+
+      // Always use the workspace schema path for context - needed for table 
resolution
+      List<String> schemaPath = materializedView.getWorkspaceSchemaPath();
+
+      if (viewExpansionContext.isImpersonationEnabled()) {
+        token = viewExpansionContext.reserveViewExpansionToken(viewOwner);
+        rel = viewExpander.expandView(sqlToExpand, token.getSchemaTree(), 
schemaPath).rel;
+      } else {
+        // When scanning data, pass null for rowType to let Parquet schema be 
inferred
+        // When expanding SQL definition, use the MV's row type
+        RelDataType typeHint = hasData ? null : rowType;
+        rel = viewExpander.expandView(typeHint, sqlToExpand, schemaPath, 
Collections.emptyList()).rel;
+      }
+
+      return rel;
+    } finally {
+      if (token != null) {
+        token.release();
+      }
+    }
+  }
+
+  /**
+   * Builds SQL to scan the materialized data directory.
+   * The data is stored in {workspace}/{mvName}_mv_data/ directory.
+   * We explicitly select the MV's columns to ensure proper schema matching.
+   */
+  private String buildDataScanSql() {
+    String dataTableName = materializedView.getName() + "_mv_data";
+
+    // Build explicit column list from the MV's field definitions
+    List<String> fieldNames = materializedView.getFields().stream()
+        .map(f -> f.getName())
+        .collect(java.util.stream.Collectors.toList());
+    if (fieldNames.isEmpty()) {
+      // Fallback to SELECT * if no fields defined (shouldn't happen for 
non-dynamic MVs)
+      return "SELECT * FROM `" + dataTableName + "`";
+    }
+
+    StringBuilder sql = new StringBuilder("SELECT ");
+    for (int i = 0; i < fieldNames.size(); i++) {
+      if (i > 0) {
+        sql.append(", ");
+      }
+      sql.append("`").append(fieldNames.get(i)).append("`");
+    }
+    sql.append(" FROM `").append(dataTableName).append("`");

Review Comment:
   The same:
   > Won't it fail if planner.parser.quoting_identifiers is set to double 
quotes?
   https://drill.apache.org/docs/lexical-structure#identifier-quotes



##########
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/MaterializedViewRewriter.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptMaterialization;
+import org.apache.calcite.plan.RelOptMaterializations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.exec.dotdrill.MaterializedView;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for materialized view query rewriting.
+ * <p>
+ * When enabled via planner.enable_materialized_view_rewrite, this class 
attempts
+ * to rewrite queries to use materialized views when beneficial.
+ * <p>
+ * Uses Calcite's {@link RelOptMaterializations#useMaterializedViews} API which
+ * normalizes both the query and MV definitions before performing structural
+ * matching via {@link org.apache.calcite.plan.SubstitutionVisitor}.
+ * <p>
+ * Materialized views are discovered by iterating over enabled file-based
+ * storage plugins (the only plugin type that supports MVs) and force-loading
+ * their schemas to find .materialized_view.drill files.
+ */
+public class MaterializedViewRewriter {
+  private static final Logger logger = 
LoggerFactory.getLogger(MaterializedViewRewriter.class);
+
+  private final QueryContext context;
+  private final SchemaPlus rootSchema;
+  private final SqlConverter sqlConverter;
+
+  public MaterializedViewRewriter(QueryContext context, SchemaPlus rootSchema, 
SqlConverter sqlConverter) {
+    this.context = context;
+    this.rootSchema = rootSchema;
+    this.sqlConverter = sqlConverter;
+  }
+
+  /**
+   * Attempts to rewrite the given RelNode to use a materialized view.
+   *
+   * @param queryRel the query plan to potentially rewrite
+   * @return the rewritten plan using an MV, or the original plan if no 
rewrite is possible
+   */
+  public RelNode rewrite(RelNode queryRel) {
+    if (!context.getPlannerSettings().isMaterializedViewRewriteEnabled()) {
+      return queryRel;
+    }
+
+    // Find all available materialized views that have been refreshed
+    List<MaterializedViewCandidate> candidates = 
findCandidateMaterializedViews();
+
+    if (candidates.isEmpty()) {
+      logger.debug("No refreshed materialized views available for rewriting");
+      return queryRel;
+    }
+
+    logger.debug("Found {} materialized view candidates for potential 
rewriting", candidates.size());
+
+    // Build Calcite RelOptMaterialization objects for each refreshed candidate
+    List<RelOptMaterialization> materializations = new ArrayList<>();
+    for (MaterializedViewCandidate candidate : candidates) {
+      if (!candidate.isRefreshed()) {
+        logger.debug("Skipping MV {} - not refreshed", candidate.getName());
+        continue;
+      }
+
+      try {
+        RelOptMaterialization mat = buildMaterialization(candidate);
+        if (mat != null) {
+          materializations.add(mat);
+        }
+      } catch (Exception e) {
+        logger.debug("Failed to build materialization for MV {}: {}", 
candidate.getName(), e.getMessage());
+      }
+    }
+
+    if (materializations.isEmpty()) {
+      logger.debug("No valid materializations could be built");
+      return queryRel;
+    }
+
+    // Use Calcite's materialized view matching API which normalizes both the
+    // query and MV definitions (trimming unused fields, converting 
Filter/Project
+    // to Calc, merging, etc.) before performing structural matching.
+    try {
+      List<Pair<RelNode, List<RelOptMaterialization>>> results =
+          RelOptMaterializations.useMaterializedViews(queryRel, 
materializations);
+
+      if (!results.isEmpty()) {
+        RelNode rewritten = results.get(0).left;
+        if (logger.isInfoEnabled()) {
+          List<RelOptMaterialization> usedMVs = results.get(0).right;
+          logger.info("Query rewritten to use materialized view(s): {}",
+              !usedMVs.isEmpty() ? usedMVs.get(0).qualifiedTableName : 
"unknown");
+        }
+        return rewritten;
+      }
+    } catch (Exception e) {
+      logger.debug("Materialized view rewriting failed: {}", e.getMessage());
+    }
+
+    logger.debug("No materialized view matched the query");
+    return queryRel;
+  }
+
+  /**
+   * Builds a Calcite {@link RelOptMaterialization} for a candidate MV.
+   */
+  private RelOptMaterialization buildMaterialization(MaterializedViewCandidate 
candidate) {
+    RelNode mvQueryRel = parseMvSql(candidate);
+    if (mvQueryRel == null) {
+      return null;
+    }
+
+    RelNode mvTableRel = buildMvScanRel(candidate);
+    if (mvTableRel == null) {
+      return null;
+    }
+
+    List<String> qualifiedTableName = java.util.Arrays.asList(
+        candidate.getSchemaPath().split("\\."));
+
+    return new RelOptMaterialization(
+        mvTableRel,
+        mvQueryRel,
+        null,
+        qualifiedTableName
+    );
+  }
+
+  /**
+   * Parses the MV's SQL definition into a RelNode.
+   */
+  private RelNode parseMvSql(MaterializedViewCandidate candidate) {
+    try {
+      String mvSql = candidate.getSql();
+      org.apache.calcite.sql.SqlNode parsedNode = sqlConverter.parse(mvSql);
+      org.apache.calcite.sql.SqlNode validatedNode = 
sqlConverter.validate(parsedNode);

Review Comment:
   Is the fully qualified name used intentionally?



##########
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMaterializedViewTable.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptTable.ToRelContext;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Schema.TableType;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.drill.exec.dotdrill.MaterializedView;
+import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.drill.exec.planner.sql.conversion.DrillViewExpander;
+
+/**
+ * Represents a materialized view in the Drill query planning.
+ * <p>
+ * A materialized view stores:
+ * <ul>
+ *   <li>Definition file (.materialized_view.drill) - JSON with name, SQL, 
schema info</li>
+ *   <li>Data directory ({name}_mv_data/) - Parquet files with pre-computed 
results</li>
+ * </ul>
+ * <p>
+ * <b>Behavior:</b>
+ * <ul>
+ *   <li>Before REFRESH: queries expand the SQL definition (like a view)</li>
+ *   <li>After REFRESH: queries scan from pre-computed Parquet data</li>
+ * </ul>
+ *
+ * @see org.apache.drill.exec.dotdrill.MaterializedView
+ */
+public class DrillMaterializedViewTable implements TranslatableTable, 
DrillViewInfoProvider {
+
+  private final MaterializedView materializedView;
+  private final String viewOwner;
+  private final ViewExpansionContext viewExpansionContext;
+  private final String workspaceLocation;
+
+  public DrillMaterializedViewTable(MaterializedView materializedView, String 
viewOwner,
+                                    ViewExpansionContext viewExpansionContext, 
String workspaceLocation) {
+    this.materializedView = materializedView;
+    this.viewOwner = viewOwner;
+    this.viewExpansionContext = viewExpansionContext;
+    this.workspaceLocation = workspaceLocation;
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    return materializedView.getRowType(typeFactory);
+  }
+
+  @Override
+  public Statistic getStatistic() {
+    return Statistics.UNKNOWN;
+  }
+
+  /**
+   * Converts this materialized view to a RelNode for query planning.
+   * <p>
+   * If the MV has been refreshed (data exists), scans from the pre-computed 
Parquet data.
+   * Otherwise, expands the SQL definition like a regular view.
+   */
+  @Override
+  public RelNode toRel(ToRelContext context, RelOptTable relOptTable) {
+    DrillViewExpander viewExpander = viewExpansionContext.getViewExpander();
+    ViewExpansionContext.ViewExpansionToken token = null;
+    try {
+      RelDataType rowType = relOptTable.getRowType();
+      RelNode rel;
+
+      // Check if materialized data exists (REFRESH has been called)
+      boolean hasData = materializedView.getRefreshStatus() == 
MaterializedView.RefreshStatus.COMPLETE;
+
+      // Build the SQL to execute - either scan data or expand definition
+      String sqlToExpand;
+      if (hasData) {
+        // Scan from the pre-computed data directory
+        sqlToExpand = buildDataScanSql();
+      } else {
+        // No data yet - expand the SQL definition like a view
+        sqlToExpand = materializedView.getSql();
+      }
+
+      // Always use the workspace schema path for context - needed for table 
resolution
+      List<String> schemaPath = materializedView.getWorkspaceSchemaPath();
+
+      if (viewExpansionContext.isImpersonationEnabled()) {
+        token = viewExpansionContext.reserveViewExpansionToken(viewOwner);
+        rel = viewExpander.expandView(sqlToExpand, token.getSchemaTree(), 
schemaPath).rel;
+      } else {
+        // When scanning data, pass null for rowType to let Parquet schema be 
inferred
+        // When expanding SQL definition, use the MV's row type
+        RelDataType typeHint = hasData ? null : rowType;
+        rel = viewExpander.expandView(typeHint, sqlToExpand, schemaPath, 
Collections.emptyList()).rel;
+      }
+
+      return rel;
+    } finally {
+      if (token != null) {
+        token.release();
+      }
+    }
+  }
+
+  /**
+   * Builds SQL to scan the materialized data directory.
+   * The data is stored in {workspace}/{mvName}_mv_data/ directory.
+   * We explicitly select the MV's columns to ensure proper schema matching.
+   */
+  private String buildDataScanSql() {
+    String dataTableName = materializedView.getName() + "_mv_data";

Review Comment:
   I believe `materializedView.getDataStoragePath()` should be used there.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/MaterializedViewRewriter.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptMaterialization;
+import org.apache.calcite.plan.RelOptMaterializations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.exec.dotdrill.MaterializedView;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for materialized view query rewriting.
+ * <p>
+ * When enabled via planner.enable_materialized_view_rewrite, this class 
attempts
+ * to rewrite queries to use materialized views when beneficial.
+ * <p>
+ * Uses Calcite's {@link RelOptMaterializations#useMaterializedViews} API which
+ * normalizes both the query and MV definitions before performing structural
+ * matching via {@link org.apache.calcite.plan.SubstitutionVisitor}.
+ * <p>
+ * Materialized views are discovered by iterating over enabled file-based
+ * storage plugins (the only plugin type that supports MVs) and force-loading
+ * their schemas to find .materialized_view.drill files.
+ */
+public class MaterializedViewRewriter {
+  private static final Logger logger = 
LoggerFactory.getLogger(MaterializedViewRewriter.class);
+
+  private final QueryContext context;
+  private final SchemaPlus rootSchema;
+  private final SqlConverter sqlConverter;
+
+  public MaterializedViewRewriter(QueryContext context, SchemaPlus rootSchema, 
SqlConverter sqlConverter) {
+    this.context = context;
+    this.rootSchema = rootSchema;
+    this.sqlConverter = sqlConverter;
+  }
+
+  /**
+   * Attempts to rewrite the given RelNode to use a materialized view.
+   *
+   * @param queryRel the query plan to potentially rewrite
+   * @return the rewritten plan using an MV, or the original plan if no 
rewrite is possible
+   */
+  public RelNode rewrite(RelNode queryRel) {
+    if (!context.getPlannerSettings().isMaterializedViewRewriteEnabled()) {
+      return queryRel;
+    }
+
+    // Find all available materialized views that have been refreshed
+    List<MaterializedViewCandidate> candidates = 
findCandidateMaterializedViews();
+
+    if (candidates.isEmpty()) {
+      logger.debug("No refreshed materialized views available for rewriting");
+      return queryRel;
+    }
+
+    logger.debug("Found {} materialized view candidates for potential 
rewriting", candidates.size());
+
+    // Build Calcite RelOptMaterialization objects for each refreshed candidate
+    List<RelOptMaterialization> materializations = new ArrayList<>();
+    for (MaterializedViewCandidate candidate : candidates) {
+      if (!candidate.isRefreshed()) {
+        logger.debug("Skipping MV {} - not refreshed", candidate.getName());
+        continue;
+      }
+
+      try {
+        RelOptMaterialization mat = buildMaterialization(candidate);
+        if (mat != null) {
+          materializations.add(mat);
+        }
+      } catch (Exception e) {
+        logger.debug("Failed to build materialization for MV {}: {}", 
candidate.getName(), e.getMessage());
+      }
+    }
+
+    if (materializations.isEmpty()) {
+      logger.debug("No valid materializations could be built");
+      return queryRel;
+    }
+
+    // Use Calcite's materialized view matching API which normalizes both the
+    // query and MV definitions (trimming unused fields, converting 
Filter/Project
+    // to Calc, merging, etc.) before performing structural matching.
+    try {
+      List<Pair<RelNode, List<RelOptMaterialization>>> results =
+          RelOptMaterializations.useMaterializedViews(queryRel, 
materializations);
+
+      if (!results.isEmpty()) {
+        RelNode rewritten = results.get(0).left;
+        if (logger.isInfoEnabled()) {
+          List<RelOptMaterialization> usedMVs = results.get(0).right;
+          logger.info("Query rewritten to use materialized view(s): {}",
+              !usedMVs.isEmpty() ? usedMVs.get(0).qualifiedTableName : 
"unknown");
+        }
+        return rewritten;
+      }
+    } catch (Exception e) {
+      logger.debug("Materialized view rewriting failed: {}", e.getMessage());
+    }
+
+    logger.debug("No materialized view matched the query");
+    return queryRel;
+  }
+
+  /**
+   * Builds a Calcite {@link RelOptMaterialization} for a candidate MV.
+   */
+  private RelOptMaterialization buildMaterialization(MaterializedViewCandidate 
candidate) {
+    RelNode mvQueryRel = parseMvSql(candidate);
+    if (mvQueryRel == null) {
+      return null;
+    }
+
+    RelNode mvTableRel = buildMvScanRel(candidate);
+    if (mvTableRel == null) {
+      return null;
+    }
+
+    List<String> qualifiedTableName = java.util.Arrays.asList(
+        candidate.getSchemaPath().split("\\."));
+
+    return new RelOptMaterialization(
+        mvTableRel,
+        mvQueryRel,
+        null,
+        qualifiedTableName
+    );
+  }
+
+  /**
+   * Parses the MV's SQL definition into a RelNode.
+   */
+  private RelNode parseMvSql(MaterializedViewCandidate candidate) {
+    try {
+      String mvSql = candidate.getSql();
+      org.apache.calcite.sql.SqlNode parsedNode = sqlConverter.parse(mvSql);
+      org.apache.calcite.sql.SqlNode validatedNode = 
sqlConverter.validate(parsedNode);
+      RelRoot relRoot = sqlConverter.toRel(validatedNode);
+      return relRoot.rel;
+    } catch (Exception e) {
+      logger.debug("Failed to parse MV SQL for {}: {}", candidate.getName(), 
e.getMessage());
+      return null;
+    }
+  }
+
+  /**
+   * Builds a RelNode that scans the MV's pre-computed data.
+   */
+  private RelNode buildMvScanRel(MaterializedViewCandidate candidate) {
+    try {
+      String mvDataTable = candidate.getSchemaPath() + ".`" + 
candidate.getName() + "_mv_data`";

Review Comment:
   I believe `materializedView.getDataStoragePath()` should be used there. 
   
   And also:
   > Won't it fail if `planner.parser.quoting_identifiers` is set to double 
quotes?
       https://drill.apache.org/docs/lexical-structure#identifier-quotes
   
   



##########
exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MaterializedViewHandler.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import java.io.IOException;
+
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.util.DrillStringUtils;
+import org.apache.drill.exec.dotdrill.MaterializedView;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.logical.CreateTableEntry;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillScreenRel;
+import org.apache.drill.exec.planner.logical.DrillWriterRel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.sql.DirectPlan;
+import org.apache.drill.exec.planner.sql.SchemaUtilities;
+import org.apache.drill.exec.planner.sql.parser.SqlCreateMaterializedView;
+import org.apache.drill.exec.planner.sql.parser.SqlCreateType;
+import org.apache.drill.exec.planner.sql.parser.SqlDropMaterializedView;
+import org.apache.drill.exec.planner.sql.parser.SqlRefreshMaterializedView;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
+
+/**
+ * Handlers for materialized view DDL commands: CREATE, DROP, and REFRESH 
MATERIALIZED VIEW.
+ * <p>
+ * CREATE and DROP return DirectPlan with ok/summary output.
+ * REFRESH executes the MV query and writes data to Parquet, returning write 
statistics.
+ */
+public abstract class MaterializedViewHandler extends DefaultSqlHandler {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MaterializedViewHandler.class);
+
+  protected QueryContext context;
+
+  public MaterializedViewHandler(SqlHandlerConfig config) {
+    super(config);
+    this.context = config.getContext();
+  }
+
+  /**
+   * Handler for CREATE MATERIALIZED VIEW DDL command.
+   * <p>
+   * Creates the MV definition file. The data will be materialized on first 
query
+   * or can be explicitly populated via REFRESH MATERIALIZED VIEW.
+   */
+  public static class CreateMaterializedView extends MaterializedViewHandler {
+
+    public CreateMaterializedView(SqlHandlerConfig config) {
+      super(config);
+    }
+
+    @Override
+    public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, 
RelConversionException,
+        IOException, ForemanSetupException {
+      SqlCreateMaterializedView createMV = unwrap(sqlNode, 
SqlCreateMaterializedView.class);
+
+      final String newViewName = 
DrillStringUtils.removeLeadingSlash(createMV.getName());
+
+      // Disallow temporary tables usage in materialized view definition
+      config.getConverter().disallowTemporaryTables();
+
+      // Store the SQL as the view definition
+      final String viewSql = createMV.getQuery().toSqlString(null, 
true).getSql();
+      final ConvertedRelNode convertedRelNode = 
validateAndConvert(createMV.getQuery());
+      final RelDataType validatedRowType = 
convertedRelNode.getValidatedRowType();
+      final RelNode queryRelNode = convertedRelNode.getConvertedNode();
+
+      final RelNode newViewRelNode = SqlHandlerUtil.resolveNewTableRel(true, 
createMV.getFieldNames(),
+          validatedRowType, queryRelNode);
+
+      final SchemaPlus defaultSchema = context.getNewDefaultSchema();
+      final AbstractSchema drillSchema = 
SchemaUtilities.resolveToMutableDrillSchema(defaultSchema,
+          createMV.getSchemaPath());
+
+      final String schemaPath = drillSchema.getFullSchemaName();
+
+      // Check view creation possibility
+      if (!checkMaterializedViewCreationPossibility(drillSchema, createMV, 
context)) {
+        return DirectPlan.createDirectPlan(context, false,
+            String.format("A table or view with given name [%s] already exists 
in schema [%s]",
+                newViewName, schemaPath));
+      }
+
+      // Create the materialized view definition
+      // Use the actual schema path where the MV is created (not the session's 
default schema)
+      final MaterializedView materializedView = new 
MaterializedView(newViewName, viewSql,
+          newViewRelNode.getRowType(), drillSchema.getSchemaPath());
+
+      // Create the materialized view definition file
+      final boolean replaced = 
drillSchema.createMaterializedView(materializedView);
+
+      String message = replaced
+          ? String.format("Materialized view '%s' replaced successfully in 
'%s' schema", newViewName, schemaPath)
+          : String.format("Materialized view '%s' created successfully in '%s' 
schema", newViewName, schemaPath);
+
+      logger.info("Created materialized view [{}] in schema [{}]", 
newViewName, schemaPath);
+      return DirectPlan.createDirectPlan(context, true, message);
+    }
+
+    /**
+     * Validates if materialized view can be created in indicated schema.
+     */
+    private boolean checkMaterializedViewCreationPossibility(AbstractSchema 
drillSchema,
+                                                              
SqlCreateMaterializedView createMV,
+                                                              QueryContext 
context) {
+      final String schemaPath = drillSchema.getFullSchemaName();
+      final String viewName = createMV.getName();
+      final Table table = SqlHandlerUtil.getTableFromSchema(drillSchema, 
viewName);
+
+      // Check if it's a materialized view
+      final boolean isMaterializedView = table != null &&
+          table.getJdbcTableType() == Schema.TableType.MATERIALIZED_VIEW;
+      final boolean isView = (table != null && table.getJdbcTableType() == 
Schema.TableType.VIEW);
+      // Regular table check excludes views and materialized views
+      final boolean isTable = (table != null
+          && table.getJdbcTableType() != Schema.TableType.VIEW
+          && table.getJdbcTableType() != Schema.TableType.MATERIALIZED_VIEW)
+          || context.getSession().isTemporaryTable(drillSchema, 
context.getConfig(), viewName);

Review Comment:
   Since we already verified whether the table is materialized or a simple 
view, we can rewrite it with:
   ```java
   final boolean isTable = (!isView && !isMaterializedView) 
       || context.getSession().isTemporaryTable(drillSchema, 
context.getConfig(), viewName);
   ```



##########
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/MaterializedViewRewriter.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptMaterialization;
+import org.apache.calcite.plan.RelOptMaterializations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.exec.dotdrill.MaterializedView;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for materialized view query rewriting.
+ * <p>
+ * When enabled via planner.enable_materialized_view_rewrite, this class 
attempts
+ * to rewrite queries to use materialized views when beneficial.
+ * <p>
+ * Uses Calcite's {@link RelOptMaterializations#useMaterializedViews} API which
+ * normalizes both the query and MV definitions before performing structural
+ * matching via {@link org.apache.calcite.plan.SubstitutionVisitor}.
+ * <p>
+ * Materialized views are discovered by iterating over enabled file-based
+ * storage plugins (the only plugin type that supports MVs) and force-loading
+ * their schemas to find .materialized_view.drill files.
+ */
+public class MaterializedViewRewriter {
+  private static final Logger logger = 
LoggerFactory.getLogger(MaterializedViewRewriter.class);
+
+  private final QueryContext context;
+  private final SchemaPlus rootSchema;
+  private final SqlConverter sqlConverter;
+
+  public MaterializedViewRewriter(QueryContext context, SchemaPlus rootSchema, 
SqlConverter sqlConverter) {
+    this.context = context;
+    this.rootSchema = rootSchema;
+    this.sqlConverter = sqlConverter;
+  }
+
+  /**
+   * Attempts to rewrite the given RelNode to use a materialized view.
+   *
+   * @param queryRel the query plan to potentially rewrite
+   * @return the rewritten plan using an MV, or the original plan if no 
rewrite is possible
+   */
+  public RelNode rewrite(RelNode queryRel) {
+    if (!context.getPlannerSettings().isMaterializedViewRewriteEnabled()) {
+      return queryRel;
+    }
+
+    // Find all available materialized views that have been refreshed
+    List<MaterializedViewCandidate> candidates = 
findCandidateMaterializedViews();
+
+    if (candidates.isEmpty()) {
+      logger.debug("No refreshed materialized views available for rewriting");
+      return queryRel;
+    }

Review Comment:
   Since `findCandidateMaterializedViews()` returns both refreshed and 
unrefreshed MVs, the log message is misleading. 



##########
exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java:
##########
@@ -351,6 +354,278 @@ public void dropView(String viewName) throws IOException {
       getFS().delete(getViewPath(viewName), false);
     }
 
+    private Path getMaterializedViewPath(String name) {
+      return DotDrillType.MATERIALIZED_VIEW.getPath(config.getLocation(), 
name);
+    }
+
+    private Path getMaterializedViewDataPath(String name) {
+      // Use _mv_data suffix to distinguish data directory from MV definition 
lookup
+      return new Path(config.getLocation(), name + "_mv_data");
+    }
+
+    @Override
+    public boolean createMaterializedView(MaterializedView materializedView) 
throws IOException {
+      String viewName = materializedView.getName();
+      Path viewPath = getMaterializedViewPath(viewName);
+      Path dataPath = getMaterializedViewDataPath(viewName);
+
+      boolean replaced = getFS().exists(viewPath);
+
+      // If replacing, first drop the old data
+      if (replaced) {
+        if (getFS().exists(dataPath)) {
+          getFS().delete(dataPath, true);
+        }
+      }
+
+      // Create the data directory for the materialized view
+      final FsPermission dirPerms = new FsPermission(
+          
schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      getFS().mkdirs(dataPath, dirPerms);
+
+      // Set the data storage path in the materialized view
+      materializedView.setDataStoragePath(viewName);
+
+      // Write the materialized view definition file
+      final FsPermission viewPerms = new FsPermission(
+          
schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      try (OutputStream stream = DrillFileSystem.create(getFS(), viewPath, 
viewPerms)) {
+        mapper.writeValue(stream, materializedView);
+      }
+
+      // Sync to metastore if enabled
+      syncMaterializedViewToMetastore(materializedView);
+
+      // Mark as complete (data will be populated by the handler via CTAS-like 
operation)
+      return replaced;
+    }
+
+    @Override
+    public void dropMaterializedView(String viewName) throws IOException {
+      Path viewPath = getMaterializedViewPath(viewName);
+      Path dataPath = getMaterializedViewDataPath(viewName);
+
+      // Delete the definition file
+      if (getFS().exists(viewPath)) {
+        getFS().delete(viewPath, false);
+      }
+
+      // Delete the data directory
+      if (getFS().exists(dataPath)) {
+        getFS().delete(dataPath, true);
+      }
+
+      // Remove from metastore if enabled
+      removeMaterializedViewFromMetastore(viewName);
+    }
+
+    @Override
+    public void refreshMaterializedView(String viewName) throws IOException {
+      // Read the existing materialized view definition
+      MaterializedView mv = getMaterializedView(viewName);
+      if (mv == null) {
+        throw UserException.validationError()
+            .message("Materialized view [%s] not found in schema [%s]", 
viewName, getFullSchemaName())
+            .build(logger);
+      }
+
+      Path dataPath = getMaterializedViewDataPath(viewName);
+
+      // Delete existing data
+      if (getFS().exists(dataPath)) {
+        getFS().delete(dataPath, true);
+      }
+
+      // Recreate the data directory
+      final FsPermission dirPerms = new FsPermission(
+          
schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      getFS().mkdirs(dataPath, dirPerms);
+
+      // Mark as INCOMPLETE while data is being refreshed.
+      // completeMaterializedViewRefresh() should be called after data is 
fully written.
+      MaterializedView updatedMV = mv.withRefreshInfo(
+          mv.getLastRefreshTime(),
+          MaterializedView.RefreshStatus.INCOMPLETE);
+
+      // Write the updated definition file
+      Path viewPath = getMaterializedViewPath(viewName);
+      final FsPermission viewPerms = new FsPermission(
+          
schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      try (OutputStream stream = DrillFileSystem.create(getFS(), viewPath, 
viewPerms)) {
+        mapper.writeValue(stream, updatedMV);
+      }
+
+      // Sync updated metadata to metastore if enabled
+      syncMaterializedViewToMetastore(updatedMV);
+    }
+
+    @Override
+    public void completeMaterializedViewRefresh(String viewName) throws 
IOException {
+      MaterializedView mv = getMaterializedView(viewName);
+      if (mv == null) {
+        throw UserException.validationError()
+            .message("Materialized view [%s] not found in schema [%s]", 
viewName, getFullSchemaName())
+            .build(logger);
+      }
+
+      // Mark as COMPLETE with current timestamp now that data is fully written
+      MaterializedView updatedMV = mv.withRefreshInfo(
+          System.currentTimeMillis(),
+          MaterializedView.RefreshStatus.COMPLETE);
+
+      Path viewPath = getMaterializedViewPath(viewName);
+      final FsPermission viewPerms = new FsPermission(
+          
schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      try (OutputStream stream = DrillFileSystem.create(getFS(), viewPath, 
viewPerms)) {
+        mapper.writeValue(stream, updatedMV);
+      }
+
+      syncMaterializedViewToMetastore(updatedMV);
+    }
+
+    @Override
+    public CreateTableEntry createMaterializedViewDataWriter(String viewName) {
+      // Use Parquet format for storing materialized view data
+      FormatPlugin formatPlugin = plugin.getFormatPlugin("parquet");
+      if (formatPlugin == null) {
+        throw UserException.unsupportedError()
+            .message("Parquet format plugin not available for materialized 
view storage")
+            .build(logger);
+      }
+
+      // Store data in a directory with _mv_data suffix to avoid name collision
+      // with the materialized view lookup (which uses the same base name)
+      String dataLocation = config.getLocation() + Path.SEPARATOR + viewName + 
"_mv_data";
+      return new FileSystemCreateTableEntry(
+          (FileSystemConfig) plugin.getConfig(),
+          formatPlugin,
+          dataLocation,
+          Collections.emptyList(),  // No partition columns for MVs
+          StorageStrategy.DEFAULT);
+    }

Review Comment:
   I think, the method should have `MaterializedView` parameter and call 
`materializedView.getDataStoragePath()` to retrieve the `dataLocation`. Or at 
least this method shouldn't try to guess the data path and get the date path as 
argument.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/MaterializedViewRewriter.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptMaterialization;
+import org.apache.calcite.plan.RelOptMaterializations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.exec.dotdrill.MaterializedView;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for materialized view query rewriting.
+ * <p>
+ * When enabled via planner.enable_materialized_view_rewrite, this class 
attempts
+ * to rewrite queries to use materialized views when beneficial.
+ * <p>
+ * Uses Calcite's {@link RelOptMaterializations#useMaterializedViews} API which
+ * normalizes both the query and MV definitions before performing structural
+ * matching via {@link org.apache.calcite.plan.SubstitutionVisitor}.
+ * <p>
+ * Materialized views are discovered by iterating over enabled file-based
+ * storage plugins (the only plugin type that supports MVs) and force-loading
+ * their schemas to find .materialized_view.drill files.
+ */
+public class MaterializedViewRewriter {
+  private static final Logger logger = 
LoggerFactory.getLogger(MaterializedViewRewriter.class);
+
+  private final QueryContext context;
+  private final SchemaPlus rootSchema;
+  private final SqlConverter sqlConverter;
+
+  public MaterializedViewRewriter(QueryContext context, SchemaPlus rootSchema, 
SqlConverter sqlConverter) {
+    this.context = context;
+    this.rootSchema = rootSchema;
+    this.sqlConverter = sqlConverter;
+  }
+
+  /**
+   * Attempts to rewrite the given RelNode to use a materialized view.
+   *
+   * @param queryRel the query plan to potentially rewrite
+   * @return the rewritten plan using an MV, or the original plan if no 
rewrite is possible
+   */
+  public RelNode rewrite(RelNode queryRel) {
+    if (!context.getPlannerSettings().isMaterializedViewRewriteEnabled()) {
+      return queryRel;
+    }
+
+    // Find all available materialized views that have been refreshed
+    List<MaterializedViewCandidate> candidates = 
findCandidateMaterializedViews();
+
+    if (candidates.isEmpty()) {
+      logger.debug("No refreshed materialized views available for rewriting");
+      return queryRel;
+    }
+
+    logger.debug("Found {} materialized view candidates for potential 
rewriting", candidates.size());
+
+    // Build Calcite RelOptMaterialization objects for each refreshed candidate
+    List<RelOptMaterialization> materializations = new ArrayList<>();
+    for (MaterializedViewCandidate candidate : candidates) {
+      if (!candidate.isRefreshed()) {
+        logger.debug("Skipping MV {} - not refreshed", candidate.getName());
+        continue;
+      }
+
+      try {
+        RelOptMaterialization mat = buildMaterialization(candidate);
+        if (mat != null) {
+          materializations.add(mat);
+        }
+      } catch (Exception e) {
+        logger.debug("Failed to build materialization for MV {}: {}", 
candidate.getName(), e.getMessage());
+      }
+    }
+
+    if (materializations.isEmpty()) {
+      logger.debug("No valid materializations could be built");
+      return queryRel;
+    }
+
+    // Use Calcite's materialized view matching API which normalizes both the
+    // query and MV definitions (trimming unused fields, converting 
Filter/Project
+    // to Calc, merging, etc.) before performing structural matching.
+    try {
+      List<Pair<RelNode, List<RelOptMaterialization>>> results =
+          RelOptMaterializations.useMaterializedViews(queryRel, 
materializations);
+
+      if (!results.isEmpty()) {
+        RelNode rewritten = results.get(0).left;
+        if (logger.isInfoEnabled()) {
+          List<RelOptMaterialization> usedMVs = results.get(0).right;
+          logger.info("Query rewritten to use materialized view(s): {}",
+              !usedMVs.isEmpty() ? usedMVs.get(0).qualifiedTableName : 
"unknown");
+        }
+        return rewritten;
+      }
+    } catch (Exception e) {
+      logger.debug("Materialized view rewriting failed: {}", e.getMessage());
+    }
+
+    logger.debug("No materialized view matched the query");
+    return queryRel;
+  }
+
+  /**
+   * Builds a Calcite {@link RelOptMaterialization} for a candidate MV.
+   */
+  private RelOptMaterialization buildMaterialization(MaterializedViewCandidate 
candidate) {
+    RelNode mvQueryRel = parseMvSql(candidate);
+    if (mvQueryRel == null) {
+      return null;
+    }
+
+    RelNode mvTableRel = buildMvScanRel(candidate);
+    if (mvTableRel == null) {
+      return null;
+    }
+
+    List<String> qualifiedTableName = java.util.Arrays.asList(
+        candidate.getSchemaPath().split("\\."));
+
+    return new RelOptMaterialization(
+        mvTableRel,
+        mvQueryRel,
+        null,
+        qualifiedTableName
+    );
+  }
+
+  /**
+   * Parses the MV's SQL definition into a RelNode.
+   */
+  private RelNode parseMvSql(MaterializedViewCandidate candidate) {
+    try {
+      String mvSql = candidate.getSql();
+      org.apache.calcite.sql.SqlNode parsedNode = sqlConverter.parse(mvSql);
+      org.apache.calcite.sql.SqlNode validatedNode = 
sqlConverter.validate(parsedNode);
+      RelRoot relRoot = sqlConverter.toRel(validatedNode);
+      return relRoot.rel;
+    } catch (Exception e) {
+      logger.debug("Failed to parse MV SQL for {}: {}", candidate.getName(), 
e.getMessage());
+      return null;
+    }
+  }
+
+  /**
+   * Builds a RelNode that scans the MV's pre-computed data.
+   */
+  private RelNode buildMvScanRel(MaterializedViewCandidate candidate) {
+    try {
+      String mvDataTable = candidate.getSchemaPath() + ".`" + 
candidate.getName() + "_mv_data`";
+
+      // Build explicit column list from the MV's field definitions to avoid
+      // DYNAMIC_STAR type issues with SELECT *
+      List<org.apache.drill.exec.dotdrill.View.Field> fields = 
candidate.getMaterializedView().getFields();
+      StringBuilder scanSql = new StringBuilder("SELECT ");
+      if (fields != null && !fields.isEmpty()) {
+        for (int i = 0; i < fields.size(); i++) {
+          if (i > 0) {
+            scanSql.append(", ");
+          }
+          scanSql.append("`").append(fields.get(i).getName()).append("`");
+        }
+      } else {
+        scanSql.append("*");
+      }
+      scanSql.append(" FROM ").append(mvDataTable);
+
+      org.apache.calcite.sql.SqlNode parsedNode = 
sqlConverter.parse(scanSql.toString());
+      org.apache.calcite.sql.SqlNode validatedNode = 
sqlConverter.validate(parsedNode);

Review Comment:
   Is the fully qualified name used intentionally?



##########
exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java:
##########
@@ -351,6 +354,278 @@ public void dropView(String viewName) throws IOException {
       getFS().delete(getViewPath(viewName), false);
     }
 
+    private Path getMaterializedViewPath(String name) {
+      return DotDrillType.MATERIALIZED_VIEW.getPath(config.getLocation(), 
name);
+    }
+
+    private Path getMaterializedViewDataPath(String name) {
+      // Use _mv_data suffix to distinguish data directory from MV definition 
lookup
+      return new Path(config.getLocation(), name + "_mv_data");
+    }
+
+    @Override
+    public boolean createMaterializedView(MaterializedView materializedView) 
throws IOException {
+      String viewName = materializedView.getName();
+      Path viewPath = getMaterializedViewPath(viewName);
+      Path dataPath = getMaterializedViewDataPath(viewName);
+
+      boolean replaced = getFS().exists(viewPath);
+
+      // If replacing, first drop the old data
+      if (replaced) {
+        if (getFS().exists(dataPath)) {
+          getFS().delete(dataPath, true);
+        }
+      }
+
+      // Create the data directory for the materialized view
+      final FsPermission dirPerms = new FsPermission(
+          
schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      getFS().mkdirs(dataPath, dirPerms);
+
+      // Set the data storage path in the materialized view
+      materializedView.setDataStoragePath(viewName);
+
+      // Write the materialized view definition file
+      final FsPermission viewPerms = new FsPermission(
+          
schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      try (OutputStream stream = DrillFileSystem.create(getFS(), viewPath, 
viewPerms)) {
+        mapper.writeValue(stream, materializedView);
+      }
+
+      // Sync to metastore if enabled
+      syncMaterializedViewToMetastore(materializedView);
+
+      // Mark as complete (data will be populated by the handler via CTAS-like 
operation)
+      return replaced;
+    }
+
+    @Override
+    public void dropMaterializedView(String viewName) throws IOException {
+      Path viewPath = getMaterializedViewPath(viewName);
+      Path dataPath = getMaterializedViewDataPath(viewName);

Review Comment:
   Use `materializedView.getDataStoragePath()` instead.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java:
##########
@@ -351,6 +354,278 @@ public void dropView(String viewName) throws IOException {
       getFS().delete(getViewPath(viewName), false);
     }
 
+    private Path getMaterializedViewPath(String name) {
+      return DotDrillType.MATERIALIZED_VIEW.getPath(config.getLocation(), 
name);
+    }
+
+    private Path getMaterializedViewDataPath(String name) {
+      // Use _mv_data suffix to distinguish data directory from MV definition 
lookup
+      return new Path(config.getLocation(), name + "_mv_data");
+    }

Review Comment:
   Drop it. Use `materializedView.getDataStoragePath()` instead.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/MaterializedViewRewriter.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptMaterialization;
+import org.apache.calcite.plan.RelOptMaterializations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.exec.dotdrill.MaterializedView;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for materialized view query rewriting.
+ * <p>
+ * When enabled via planner.enable_materialized_view_rewrite, this class 
attempts
+ * to rewrite queries to use materialized views when beneficial.
+ * <p>
+ * Uses Calcite's {@link RelOptMaterializations#useMaterializedViews} API which
+ * normalizes both the query and MV definitions before performing structural
+ * matching via {@link org.apache.calcite.plan.SubstitutionVisitor}.
+ * <p>
+ * Materialized views are discovered by iterating over enabled file-based
+ * storage plugins (the only plugin type that supports MVs) and force-loading
+ * their schemas to find .materialized_view.drill files.
+ */
+public class MaterializedViewRewriter {
+  private static final Logger logger = 
LoggerFactory.getLogger(MaterializedViewRewriter.class);
+
+  private final QueryContext context;
+  private final SchemaPlus rootSchema;
+  private final SqlConverter sqlConverter;
+
+  public MaterializedViewRewriter(QueryContext context, SchemaPlus rootSchema, 
SqlConverter sqlConverter) {
+    this.context = context;
+    this.rootSchema = rootSchema;
+    this.sqlConverter = sqlConverter;
+  }
+
+  /**
+   * Attempts to rewrite the given RelNode to use a materialized view.
+   *
+   * @param queryRel the query plan to potentially rewrite
+   * @return the rewritten plan using an MV, or the original plan if no 
rewrite is possible
+   */
+  public RelNode rewrite(RelNode queryRel) {
+    if (!context.getPlannerSettings().isMaterializedViewRewriteEnabled()) {
+      return queryRel;
+    }
+
+    // Find all available materialized views that have been refreshed
+    List<MaterializedViewCandidate> candidates = 
findCandidateMaterializedViews();
+
+    if (candidates.isEmpty()) {
+      logger.debug("No refreshed materialized views available for rewriting");
+      return queryRel;
+    }
+
+    logger.debug("Found {} materialized view candidates for potential 
rewriting", candidates.size());
+
+    // Build Calcite RelOptMaterialization objects for each refreshed candidate
+    List<RelOptMaterialization> materializations = new ArrayList<>();
+    for (MaterializedViewCandidate candidate : candidates) {
+      if (!candidate.isRefreshed()) {
+        logger.debug("Skipping MV {} - not refreshed", candidate.getName());
+        continue;
+      }
+
+      try {
+        RelOptMaterialization mat = buildMaterialization(candidate);
+        if (mat != null) {
+          materializations.add(mat);
+        }
+      } catch (Exception e) {
+        logger.debug("Failed to build materialization for MV {}: {}", 
candidate.getName(), e.getMessage());
+      }
+    }
+
+    if (materializations.isEmpty()) {
+      logger.debug("No valid materializations could be built");
+      return queryRel;
+    }
+
+    // Use Calcite's materialized view matching API which normalizes both the
+    // query and MV definitions (trimming unused fields, converting 
Filter/Project
+    // to Calc, merging, etc.) before performing structural matching.
+    try {
+      List<Pair<RelNode, List<RelOptMaterialization>>> results =
+          RelOptMaterializations.useMaterializedViews(queryRel, 
materializations);
+
+      if (!results.isEmpty()) {
+        RelNode rewritten = results.get(0).left;
+        if (logger.isInfoEnabled()) {
+          List<RelOptMaterialization> usedMVs = results.get(0).right;
+          logger.info("Query rewritten to use materialized view(s): {}",
+              !usedMVs.isEmpty() ? usedMVs.get(0).qualifiedTableName : 
"unknown");
+        }
+        return rewritten;
+      }
+    } catch (Exception e) {
+      logger.debug("Materialized view rewriting failed: {}", e.getMessage());
+    }
+
+    logger.debug("No materialized view matched the query");
+    return queryRel;
+  }
+
+  /**
+   * Builds a Calcite {@link RelOptMaterialization} for a candidate MV.
+   */
+  private RelOptMaterialization buildMaterialization(MaterializedViewCandidate 
candidate) {
+    RelNode mvQueryRel = parseMvSql(candidate);
+    if (mvQueryRel == null) {
+      return null;
+    }
+
+    RelNode mvTableRel = buildMvScanRel(candidate);
+    if (mvTableRel == null) {
+      return null;
+    }
+
+    List<String> qualifiedTableName = java.util.Arrays.asList(
+        candidate.getSchemaPath().split("\\."));
+
+    return new RelOptMaterialization(
+        mvTableRel,
+        mvQueryRel,
+        null,
+        qualifiedTableName
+    );
+  }
+
+  /**
+   * Parses the MV's SQL definition into a RelNode.
+   */
+  private RelNode parseMvSql(MaterializedViewCandidate candidate) {
+    try {
+      String mvSql = candidate.getSql();
+      org.apache.calcite.sql.SqlNode parsedNode = sqlConverter.parse(mvSql);
+      org.apache.calcite.sql.SqlNode validatedNode = 
sqlConverter.validate(parsedNode);
+      RelRoot relRoot = sqlConverter.toRel(validatedNode);
+      return relRoot.rel;
+    } catch (Exception e) {
+      logger.debug("Failed to parse MV SQL for {}: {}", candidate.getName(), 
e.getMessage());
+      return null;
+    }
+  }
+
+  /**
+   * Builds a RelNode that scans the MV's pre-computed data.
+   */
+  private RelNode buildMvScanRel(MaterializedViewCandidate candidate) {
+    try {
+      String mvDataTable = candidate.getSchemaPath() + ".`" + 
candidate.getName() + "_mv_data`";
+
+      // Build explicit column list from the MV's field definitions to avoid
+      // DYNAMIC_STAR type issues with SELECT *
+      List<org.apache.drill.exec.dotdrill.View.Field> fields = 
candidate.getMaterializedView().getFields();
+      StringBuilder scanSql = new StringBuilder("SELECT ");
+      if (fields != null && !fields.isEmpty()) {
+        for (int i = 0; i < fields.size(); i++) {
+          if (i > 0) {
+            scanSql.append(", ");
+          }
+          scanSql.append("`").append(fields.get(i).getName()).append("`");
+        }
+      } else {
+        scanSql.append("*");
+      }
+      scanSql.append(" FROM ").append(mvDataTable);
+
+      org.apache.calcite.sql.SqlNode parsedNode = 
sqlConverter.parse(scanSql.toString());
+      org.apache.calcite.sql.SqlNode validatedNode = 
sqlConverter.validate(parsedNode);
+      RelRoot relRoot = sqlConverter.toRel(validatedNode);
+      return relRoot.rel;
+    } catch (Exception e) {
+      logger.debug("Failed to build MV scan for {}: {}", candidate.getName(), 
e.getMessage());
+      return null;
+    }
+  }
+
+  /**
+   * Finds all materialized views by iterating over enabled file-based storage
+   * plugins. Only file-based plugins (FileSystemPlugin) support materialized 
views.
+   * <p>
+   * Because Drill's schema tree is lazily loaded, we cannot simply traverse
+   * already-loaded schemas. Instead, we use the StoragePluginRegistry to
+   * discover file-based plugins and force-load their workspace schemas.
+   */
+  private List<MaterializedViewCandidate> findCandidateMaterializedViews() {
+    List<MaterializedViewCandidate> candidates = new ArrayList<>();
+    StoragePluginRegistry pluginRegistry = context.getStorage();
+
+    // Get all enabled storage plugin names
+    Map<String, org.apache.drill.common.logical.StoragePluginConfig> 
enabledPlugins =
+        
pluginRegistry.storedConfigs(StoragePluginRegistry.PluginFilter.ENABLED);
+
+    for (String pluginName : enabledPlugins.keySet()) {
+      try {
+        StoragePlugin plugin = pluginRegistry.getPlugin(pluginName);
+        if (!(plugin instanceof FileSystemPlugin)) {
+          continue;
+        }
+
+        // Force-load this plugin's schema into the root schema tree.
+        // DynamicRootSchema lazily loads schemas only on getSubSchema() calls.
+        SchemaPlus pluginSchema = rootSchema.getSubSchema(pluginName);
+        if (pluginSchema == null) {
+          continue;
+        }
+
+        // Iterate over workspaces (sub-schemas of the plugin)
+        for (String workspaceName : pluginSchema.getSubSchemaNames()) {
+          SchemaPlus workspaceSchema = 
pluginSchema.getSubSchema(workspaceName);
+          if (workspaceSchema == null) {
+            continue;
+          }
+
+          AbstractSchema abstractSchema = 
workspaceSchema.unwrap(AbstractSchema.class);
+          if (abstractSchema == null) {
+            continue;
+          }
+
+          collectMaterializedViewsFromSchema(abstractSchema, candidates);
+        }
+      } catch (PluginException e) {
+        logger.debug("Error accessing plugin {}: {}", pluginName, 
e.getMessage());
+      }
+    }
+
+    return candidates;
+  }
+
+  /**
+   * Collects MVs from a specific workspace schema.
+   */
+  private void collectMaterializedViewsFromSchema(AbstractSchema schema,
+                                                   
List<MaterializedViewCandidate> candidates) {
+    Set<String> tableNames = schema.getTableNames();
+    for (String tableName : tableNames) {
+      try {
+        MaterializedView mv = schema.getMaterializedView(tableName);
+        if (mv != null) {
+          boolean isRefreshed = mv.getRefreshStatus() == 
MaterializedView.RefreshStatus.COMPLETE;
+          candidates.add(new MaterializedViewCandidate(
+              mv.getName(),
+              schema.getFullSchemaName(),
+              mv,
+              isRefreshed));
+        }
+      } catch (IOException e) {
+        logger.debug("Error reading MV {}: {}", tableName, e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * Represents a candidate materialized view for query rewriting.
+   */
+  public static class MaterializedViewCandidate {
+    private final String name;
+    private final String schemaPath;
+    private final MaterializedView materializedView;
+    private final boolean refreshed;

Review Comment:
   Did you intentionally leave `MaterializedView.RefreshStatus` as ENUM with 2 
possible values instead of a boolean? Because, logically, it can be easily 
replaced with `boolean refreshed`, as you did there.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java:
##########
@@ -351,6 +354,278 @@ public void dropView(String viewName) throws IOException {
       getFS().delete(getViewPath(viewName), false);
     }
 
+    private Path getMaterializedViewPath(String name) {
+      return DotDrillType.MATERIALIZED_VIEW.getPath(config.getLocation(), 
name);
+    }
+
+    private Path getMaterializedViewDataPath(String name) {
+      // Use _mv_data suffix to distinguish data directory from MV definition 
lookup
+      return new Path(config.getLocation(), name + "_mv_data");
+    }
+
+    @Override
+    public boolean createMaterializedView(MaterializedView materializedView) 
throws IOException {
+      String viewName = materializedView.getName();
+      Path viewPath = getMaterializedViewPath(viewName);
+      Path dataPath = getMaterializedViewDataPath(viewName);
+
+      boolean replaced = getFS().exists(viewPath);
+
+      // If replacing, first drop the old data
+      if (replaced) {
+        if (getFS().exists(dataPath)) {
+          getFS().delete(dataPath, true);
+        }
+      }
+
+      // Create the data directory for the materialized view
+      final FsPermission dirPerms = new FsPermission(
+          
schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      getFS().mkdirs(dataPath, dirPerms);
+
+      // Set the data storage path in the materialized view
+      materializedView.setDataStoragePath(viewName);
+
+      // Write the materialized view definition file
+      final FsPermission viewPerms = new FsPermission(
+          
schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      try (OutputStream stream = DrillFileSystem.create(getFS(), viewPath, 
viewPerms)) {
+        mapper.writeValue(stream, materializedView);
+      }
+
+      // Sync to metastore if enabled
+      syncMaterializedViewToMetastore(materializedView);
+
+      // Mark as complete (data will be populated by the handler via CTAS-like 
operation)
+      return replaced;
+    }
+
+    @Override
+    public void dropMaterializedView(String viewName) throws IOException {
+      Path viewPath = getMaterializedViewPath(viewName);
+      Path dataPath = getMaterializedViewDataPath(viewName);
+
+      // Delete the definition file
+      if (getFS().exists(viewPath)) {
+        getFS().delete(viewPath, false);
+      }
+
+      // Delete the data directory
+      if (getFS().exists(dataPath)) {
+        getFS().delete(dataPath, true);
+      }
+
+      // Remove from metastore if enabled
+      removeMaterializedViewFromMetastore(viewName);
+    }
+
+    @Override
+    public void refreshMaterializedView(String viewName) throws IOException {
+      // Read the existing materialized view definition
+      MaterializedView mv = getMaterializedView(viewName);
+      if (mv == null) {
+        throw UserException.validationError()
+            .message("Materialized view [%s] not found in schema [%s]", 
viewName, getFullSchemaName())
+            .build(logger);
+      }
+
+      Path dataPath = getMaterializedViewDataPath(viewName);
+
+      // Delete existing data
+      if (getFS().exists(dataPath)) {
+        getFS().delete(dataPath, true);
+      }
+
+      // Recreate the data directory
+      final FsPermission dirPerms = new FsPermission(
+          
schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      getFS().mkdirs(dataPath, dirPerms);
+
+      // Mark as INCOMPLETE while data is being refreshed.
+      // completeMaterializedViewRefresh() should be called after data is 
fully written.
+      MaterializedView updatedMV = mv.withRefreshInfo(
+          mv.getLastRefreshTime(),
+          MaterializedView.RefreshStatus.INCOMPLETE);
+
+      // Write the updated definition file
+      Path viewPath = getMaterializedViewPath(viewName);
+      final FsPermission viewPerms = new FsPermission(
+          
schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      try (OutputStream stream = DrillFileSystem.create(getFS(), viewPath, 
viewPerms)) {
+        mapper.writeValue(stream, updatedMV);
+      }
+
+      // Sync updated metadata to metastore if enabled
+      syncMaterializedViewToMetastore(updatedMV);
+    }
+
+    @Override
+    public void completeMaterializedViewRefresh(String viewName) throws 
IOException {
+      MaterializedView mv = getMaterializedView(viewName);
+      if (mv == null) {
+        throw UserException.validationError()
+            .message("Materialized view [%s] not found in schema [%s]", 
viewName, getFullSchemaName())
+            .build(logger);
+      }
+
+      // Mark as COMPLETE with current timestamp now that data is fully written
+      MaterializedView updatedMV = mv.withRefreshInfo(
+          System.currentTimeMillis(),
+          MaterializedView.RefreshStatus.COMPLETE);
+
+      Path viewPath = getMaterializedViewPath(viewName);
+      final FsPermission viewPerms = new FsPermission(
+          
schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      try (OutputStream stream = DrillFileSystem.create(getFS(), viewPath, 
viewPerms)) {
+        mapper.writeValue(stream, updatedMV);
+      }
+
+      syncMaterializedViewToMetastore(updatedMV);
+    }
+
+    @Override
+    public CreateTableEntry createMaterializedViewDataWriter(String viewName) {
+      // Use Parquet format for storing materialized view data
+      FormatPlugin formatPlugin = plugin.getFormatPlugin("parquet");
+      if (formatPlugin == null) {
+        throw UserException.unsupportedError()
+            .message("Parquet format plugin not available for materialized 
view storage")
+            .build(logger);
+      }
+
+      // Store data in a directory with _mv_data suffix to avoid name collision
+      // with the materialized view lookup (which uses the same base name)
+      String dataLocation = config.getLocation() + Path.SEPARATOR + viewName + 
"_mv_data";
+      return new FileSystemCreateTableEntry(
+          (FileSystemConfig) plugin.getConfig(),
+          formatPlugin,
+          dataLocation,
+          Collections.emptyList(),  // No partition columns for MVs
+          StorageStrategy.DEFAULT);
+    }
+
+    @Override
+    public MaterializedView getMaterializedView(String viewName) throws 
IOException {
+      List<DotDrillFile> files = Collections.emptyList();
+      try {
+        files = DotDrillUtil.getDotDrills(getFS(), new 
Path(config.getLocation()),
+            DrillStringUtils.removeLeadingSlash(viewName), 
DotDrillType.MATERIALIZED_VIEW);
+      } catch (UnsupportedOperationException e) {
+        logger.debug("The filesystem for this workspace does not support this 
operation.", e);
+        return null;
+      } catch (IOException e) {
+        logger.warn("Failure while trying to list materialized view in 
workspace [{}]", getFullSchemaName(), e);
+        return null;
+      }
+
+      for (DotDrillFile f : files) {
+        if (f.getType() == DotDrillType.MATERIALIZED_VIEW) {
+          return f.getMaterializedView(mapper);
+        }
+      }

Review Comment:
   Don't you want to replace it with a more declarative syntax to get first 
`MATERIALIZED_VIEW`:
   ```java
   return files.stream()
       .filter(f -> f.getType() == DotDrillType.MATERIALIZED_VIEW)
       .findFirst()
       .map(f -> f.getMaterializedView(mapper))
       .orElse(null);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to