This is an automated email from the ASF dual-hosted git repository.

bchapuis pushed a commit to branch materialized-views
in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git

commit 7e96e347bb6f71052a779b84e6f9c324d6062f69
Author: Bertil Chapuis <[email protected]>
AuthorDate: Sun Jan 12 11:14:33 2025 +0100

    Refreshes materialized views
    
    Adds some utilities to refresh materialized views
    so that dependencies come before dependents.
---
 .../postgres/graph/DatabaseMetadataRetriever.java  | 222 +++++++++++++++++++++
 .../postgres/graph/DependencyGraphBuilder.java     | 111 +++++++++++
 .../postgres/graph/MaterializedViewRefresher.java  |  94 +++++++++
 .../apache/baremaps/postgres/graph/RefreshApp.java |  60 ++++++
 4 files changed, 487 insertions(+)

diff --git 
a/baremaps-postgres/src/main/java/org/apache/baremaps/postgres/graph/DatabaseMetadataRetriever.java
 
b/baremaps-postgres/src/main/java/org/apache/baremaps/postgres/graph/DatabaseMetadataRetriever.java
new file mode 100644
index 000000000..8a8b1da60
--- /dev/null
+++ 
b/baremaps-postgres/src/main/java/org/apache/baremaps/postgres/graph/DatabaseMetadataRetriever.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.baremaps.postgres.graph;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to retrieve metadata about tables, views, and materialized 
views.
+ */
+class DatabaseMetadataRetriever {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(DatabaseMetadataRetriever.class.getName());
+
+  private DatabaseMetadataRetriever() {
+    // Prevent instantiation
+  }
+
+  /**
+   * Retrieves all tables, views, and materialized views in the given schema.
+   *
+   * @param connection the database connection
+   * @param schema the schema name
+   * @return a list of DatabaseObject records
+   * @throws SQLException if a database error occurs
+   */
+  public static List<DatabaseObject> getObjects(Connection connection, String 
schema)
+      throws SQLException {
+    var result = new ArrayList<DatabaseObject>();
+
+    var sql = """
+            SELECT c.oid,
+                   c.relname,
+                   c.relkind,
+                   n.nspname
+              FROM pg_class c
+              JOIN pg_namespace n ON n.oid = c.relnamespace
+             WHERE n.nspname = ?
+               AND c.relkind IN ('r', 'v', 'm')
+        """;
+
+    try (var ps = connection.prepareStatement(sql)) {
+      ps.setString(1, schema);
+      try (var rs = ps.executeQuery()) {
+        while (rs.next()) {
+          var oid = rs.getLong("oid");
+          var relName = rs.getString("relname");
+          var relKind = rs.getString("relkind");
+          var nspName = rs.getString("nspname");
+
+          var objectType = switch (relKind) {
+            case "r" -> ObjectType.TABLE;
+            case "v" -> ObjectType.VIEW;
+            case "m" -> ObjectType.MATERIALIZED_VIEW;
+            default -> null;
+          };
+
+          var dbObj = new DatabaseObject(nspName, relName, objectType);
+          result.add(dbObj);
+        }
+      }
+    }
+
+    LOGGER.info("Found " + result.size() + " objects in schema " + schema);
+    return result;
+  }
+
+  /**
+   * Retrieves dependencies between database objects in the given schema.
+   *
+   * @param connection the database connection
+   * @param schema the schema name
+   * @param objects a list of database objects
+   * @return a list of DatabaseDependency records
+   * @throws SQLException if a database error occurs
+   */
+  public static List<DatabaseDependency> getDependencies(Connection 
connection, String schema,
+      List<DatabaseObject> objects) throws SQLException {
+    var sql = """
+            SELECT dependent_ns.nspname   AS dependent_schema,
+                   dependent_c.relname    AS dependent_name,
+                   source_ns.nspname      AS source_schema,
+                   source_c.relname       AS source_name
+              FROM pg_depend d
+              JOIN pg_rewrite r
+                ON r.oid = d.objid
+              JOIN pg_class dependent_c
+                ON r.ev_class = dependent_c.oid
+              JOIN pg_namespace dependent_ns
+                ON dependent_c.relnamespace = dependent_ns.oid
+              JOIN pg_class source_c
+                ON d.refobjid = source_c.oid
+              JOIN pg_namespace source_ns
+                ON source_c.relnamespace = source_ns.oid
+             WHERE dependent_ns.nspname = ?
+               AND source_ns.nspname = ?
+        """;
+
+    // Create a fast lookup by (schema + name).
+    var lookupMap = new HashMap<String, DatabaseObject>();
+    for (var obj : objects) {
+      var key = obj.schemaName() + "." + obj.objectName();
+      lookupMap.put(key, obj);
+    }
+
+    var result = new ArrayList<DatabaseDependency>();
+    try (var ps = connection.prepareStatement(sql)) {
+      ps.setString(1, schema);
+      ps.setString(2, schema);
+      try (var rs = ps.executeQuery()) {
+        while (rs.next()) {
+          var dependentSchema = rs.getString("dependent_schema");
+          var dependentName = rs.getString("dependent_name");
+          var sourceSchema = rs.getString("source_schema");
+          var sourceName = rs.getString("source_name");
+
+          var dependentKey = dependentSchema + "." + dependentName;
+          var sourceKey = sourceSchema + "." + sourceName;
+
+          var dependentObj = lookupMap.get(dependentKey);
+          var sourceObj = lookupMap.get(sourceKey);
+
+          if (dependentObj != null && sourceObj != null) {
+            // Skip self-loop dependencies.
+            if (!dependentObj.equals(sourceObj)) {
+              result.add(new DatabaseDependency(sourceObj, dependentObj));
+            }
+          }
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Retrieves indexes for the given table or materialized view.
+   *
+   * @param connection the database connection
+   * @param schema the schema name
+   * @param tableName the table or materialized view name
+   * @return a list of DatabaseIndex records
+   * @throws SQLException if a database error occurs
+   */
+  public static List<DatabaseIndex> getIndexes(Connection connection, String 
schema,
+      String tableName) throws SQLException {
+    var sql = """
+            SELECT indexname, indexdef
+              FROM pg_indexes
+             WHERE schemaname = ?
+               AND tablename  = ?
+        """;
+
+    var result = new ArrayList<DatabaseIndex>();
+    try (var ps = connection.prepareStatement(sql)) {
+      ps.setString(1, schema);
+      ps.setString(2, tableName);
+      try (var rs = ps.executeQuery()) {
+        while (rs.next()) {
+          var indexName = rs.getString("indexname");
+          var indexDef = rs.getString("indexdef");
+          result.add(new DatabaseIndex(indexName, indexDef));
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Represents the type of database object.
+   */
+  public enum ObjectType {
+    TABLE,
+    VIEW,
+    MATERIALIZED_VIEW
+  }
+
+  /**
+   * A record representing a database object (table, view, materialized view).
+   */
+  public record DatabaseObject(
+      String schemaName,
+      String objectName,
+      ObjectType objectType) {
+
+  }
+
+  /**
+   * Record representing a dependency between two database objects.
+   */
+  record DatabaseDependency(DatabaseObject source, DatabaseObject dependent) {
+
+  }
+
+
+  /**
+   * Record representing a database index.
+   */
+  record DatabaseIndex(String indexName, String indexDef) {
+  }
+
+}
diff --git 
a/baremaps-postgres/src/main/java/org/apache/baremaps/postgres/graph/DependencyGraphBuilder.java
 
b/baremaps-postgres/src/main/java/org/apache/baremaps/postgres/graph/DependencyGraphBuilder.java
new file mode 100644
index 000000000..ef2e118f7
--- /dev/null
+++ 
b/baremaps-postgres/src/main/java/org/apache/baremaps/postgres/graph/DependencyGraphBuilder.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.baremaps.postgres.graph;
+
+import com.google.common.graph.GraphBuilder;
+import com.google.common.graph.MutableGraph;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import 
org.apache.baremaps.postgres.graph.DatabaseMetadataRetriever.DatabaseDependency;
+import 
org.apache.baremaps.postgres.graph.DatabaseMetadataRetriever.DatabaseObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to build a directed dependency graph among DatabaseObject 
items.
+ */
+class DependencyGraphBuilder {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(DependencyGraphBuilder.class.getName());
+
+  private DependencyGraphBuilder() {
+    // Prevent instantiation
+  }
+
+  /**
+   * Builds a directed graph using Guava for the given list of objects, then 
populates edges from
+   * system catalogs.
+   *
+   * @return a MutableGraph<DatabaseObject>
+   */
+  public static MutableGraph<DatabaseObject> buildGraph(
+      Connection connection,
+      String schema,
+      List<DatabaseObject> objects,
+      List<DatabaseDependency> dependencies) throws SQLException {
+
+    // Build a directed graph
+    MutableGraph<DatabaseObject> graph = GraphBuilder
+        .directed()
+        .allowsSelfLoops(false)
+        .build();
+
+    // Add nodes for objects
+    objects.forEach(graph::addNode);
+
+    // Add edges for dependencies
+    dependencies.forEach(dep -> graph.putEdge(dep.source(), dep.dependent()));
+
+    return graph;
+  }
+
+  /**
+   * Performs a topological sort of the given graph using Kahn's algorithm, so 
that dependencies
+   * appear before their dependents.
+   */
+  public static List<DatabaseObject> 
topologicalSort(MutableGraph<DatabaseObject> graph) {
+    var inDegree = new HashMap<DatabaseObject, Integer>();
+    for (var node : graph.nodes()) {
+      inDegree.put(node, 0);
+    }
+
+    for (var node : graph.nodes()) {
+      for (var successor : graph.successors(node)) {
+        inDegree.compute(successor, (k, v) -> v == null ? 1 : v + 1);
+      }
+    }
+
+    var queue = new LinkedList<DatabaseObject>();
+    for (var entry : inDegree.entrySet()) {
+      if (entry.getValue() == 0) {
+        queue.add(entry.getKey());
+      }
+    }
+
+    var result = new ArrayList<DatabaseObject>();
+    while (!queue.isEmpty()) {
+      var current = queue.poll();
+      result.add(current);
+      for (var succ : graph.successors(current)) {
+        var newVal = inDegree.get(succ) - 1;
+        inDegree.put(succ, newVal);
+        if (newVal == 0) {
+          queue.add(succ);
+        }
+      }
+    }
+
+    return result;
+  }
+
+}
diff --git 
a/baremaps-postgres/src/main/java/org/apache/baremaps/postgres/graph/MaterializedViewRefresher.java
 
b/baremaps-postgres/src/main/java/org/apache/baremaps/postgres/graph/MaterializedViewRefresher.java
new file mode 100644
index 000000000..546afaa54
--- /dev/null
+++ 
b/baremaps-postgres/src/main/java/org/apache/baremaps/postgres/graph/MaterializedViewRefresher.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.baremaps.postgres.graph;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import 
org.apache.baremaps.postgres.graph.DatabaseMetadataRetriever.DatabaseIndex;
+import 
org.apache.baremaps.postgres.graph.DatabaseMetadataRetriever.DatabaseObject;
+import org.apache.baremaps.postgres.graph.DatabaseMetadataRetriever.ObjectType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to refresh materialized views in topological order: 1) drop 
indexes, 2) refresh MV,
+ * 3) recreate indexes.
+ */
+class MaterializedViewRefresher {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(MaterializedViewRefresher.class.getName());
+
+  private MaterializedViewRefresher() {
+    // Prevent instantiation
+  }
+
+  /**
+   * Traverse the sorted objects. When we hit a materialized view, drop its 
indexes, refresh it, and
+   * recreate indexes.
+   */
+  public static void refreshMaterializedViews(Connection connection,
+      List<DatabaseObject> sortedObjects) {
+    for (var obj : sortedObjects) {
+      if (obj.objectType() == ObjectType.MATERIALIZED_VIEW) {
+        LOGGER.info("Refreshing materialized view: " + obj.schemaName() + "." 
+ obj.objectName());
+        try {
+          var indexes =
+              DatabaseMetadataRetriever.getIndexes(connection, 
obj.schemaName(), obj.objectName());
+          dropIndexes(connection, indexes);
+          refreshMaterializedView(connection, obj);
+          recreateIndexes(connection, indexes);
+        } catch (SQLException ex) {
+          LOGGER.error("Error refreshing materialized view: " +
+              obj.schemaName() + "." + obj.objectName(), ex);
+        }
+      }
+    }
+  }
+
+  private static void dropIndexes(Connection connection, List<DatabaseIndex> 
indexes)
+      throws SQLException {
+    for (var idx : indexes) {
+      LOGGER.info("Dropping index: " + idx.indexName());
+      try (var st = connection.createStatement()) {
+        var dropSql = "DROP INDEX IF EXISTS " + idx.indexName();
+        st.execute(dropSql);
+      }
+    }
+  }
+
+  private static void refreshMaterializedView(Connection connection, 
DatabaseObject mv)
+      throws SQLException {
+    var refreshSql =
+        "REFRESH MATERIALIZED VIEW " + mv.schemaName() + "." + mv.objectName() 
+ " WITH DATA";
+    try (var st = connection.createStatement()) {
+      st.execute(refreshSql);
+    }
+  }
+
+  private static void recreateIndexes(Connection connection, 
List<DatabaseIndex> indexes)
+      throws SQLException {
+    for (var idx : indexes) {
+      LOGGER.info("Recreating index: " + idx.indexName());
+      try (var st = connection.createStatement()) {
+        st.execute(idx.indexDef());
+      }
+    }
+  }
+}
diff --git 
a/baremaps-postgres/src/main/java/org/apache/baremaps/postgres/graph/RefreshApp.java
 
b/baremaps-postgres/src/main/java/org/apache/baremaps/postgres/graph/RefreshApp.java
new file mode 100644
index 000000000..55fd9f8a2
--- /dev/null
+++ 
b/baremaps-postgres/src/main/java/org/apache/baremaps/postgres/graph/RefreshApp.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.baremaps.postgres.graph;
+
+import java.sql.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main entry point for refreshing materialized views in a robust, modular way.
+ */
+public class RefreshApp {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RefreshApp.class.getName());
+
+  private static final String DB_URL = 
"jdbc:postgresql://localhost:5432/baremaps";
+  private static final String DB_USER = "baremaps";
+  private static final String DB_PASSWORD = "baremaps";
+  private static final String SCHEMA = "public";
+
+  public static void main(String[] args) {
+    try (var connection = DriverManager.getConnection(DB_URL, DB_USER, 
DB_PASSWORD)) {
+      LOGGER.info("Connected to PostgreSQL database.");
+
+      // 1. Retrieve database objects (tables, views, materialized views).
+      var objects = DatabaseMetadataRetriever.getObjects(connection, SCHEMA);
+
+      // 2. Retrieve dependencies between database objects.
+      var dependencies = DatabaseMetadataRetriever.getDependencies(connection, 
SCHEMA, objects);
+
+      // 3. Build a directed graph of dependencies between the database 
objects.
+      var graph = DependencyGraphBuilder.buildGraph(connection, SCHEMA, 
objects, dependencies);
+
+      // 4. Perform a topological sort so that dependencies come before 
dependents.
+      var sorted = DependencyGraphBuilder.topologicalSort(graph);
+
+      // 5. Refresh materialized views, dropping and recreating indexes if 
present.
+      MaterializedViewRefresher.refreshMaterializedViews(connection, sorted);
+
+      LOGGER.info("Done refreshing materialized views.");
+    } catch (SQLException ex) {
+      LOGGER.error("Database error", ex);
+    }
+  }
+}

Reply via email to