This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d119b29da2a [SQL] add SHOW command (#36509)
d119b29da2a is described below

commit d119b29da2a40d43b88134fff3859a5f409f914c
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Tue Dec 9 13:45:39 2025 -0500

    [SQL] add SHOW command (#36509)
    
    * add SHOW command
    
    * add SHOW CURRENT
    
    * add SHOW CURRENT
    
    * add LIKE pattern
    
    * spotless
    
    * use SqlCall; remove @Nullable
    
    * spotless
    
    * minor fixes
    
    * use PrintWriter and flush all at once
    
    * correct PrintWriter usage
    
    * cleanup
    
    * fix name
    
    * address comments
    
    * spotless
---
 .github/trigger_files/beam_PostCommit_SQL.json     |   3 +-
 .github/trigger_files/beam_PreCommit_SQL.json      |   2 +-
 .../sql/meta/provider/iceberg/IcebergCatalog.java  |   6 +
 .../extensions/sql/jdbc/BeamSqlLineShowTest.java   | 306 +++++++++++++++++++++
 .../extensions/sql/src/main/codegen/config.fmpp    |   7 +
 .../sql/src/main/codegen/includes/parserImpls.ftl  | 191 +++++++++++++
 .../sdk/extensions/sql/impl/BeamCalciteSchema.java |   4 +
 .../sql/impl/BeamSystemDbMetadataSchema.java       | 111 ++++++++
 .../sdk/extensions/sql/impl/BeamSystemSchema.java  | 155 +++++++++++
 .../sql/impl/BeamSystemTableMetadataSchema.java    | 126 +++++++++
 .../extensions/sql/impl/CatalogManagerSchema.java  |  34 ++-
 .../sql/impl/parser/SqlCreateExternalTable.java    |   2 +-
 .../extensions/sql/impl/parser/SqlDdlNodes.java    |   2 +-
 .../extensions/sql/impl/parser/SqlUseDatabase.java |   2 +-
 .../beam/sdk/extensions/sql/meta/SystemTables.java | 182 ++++++++++++
 .../sdk/extensions/sql/meta/catalog/Catalog.java   |   4 +
 .../sql/meta/catalog/InMemoryCatalog.java          |   6 +
 .../sdk/extensions/sql/BeamSqlCliDatabaseTest.java |   4 +-
 18 files changed, 1134 insertions(+), 13 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_SQL.json 
b/.github/trigger_files/beam_PostCommit_SQL.json
index 3700163b299..5df3841d236 100644
--- a/.github/trigger_files/beam_PostCommit_SQL.json
+++ b/.github/trigger_files/beam_PostCommit_SQL.json
@@ -1,3 +1,4 @@
 {
-  "https://github.com/apache/beam/pull/36890": "fixing some null errors"
+  "comment": "Modify this file in a trivial way to cause this test suite to 
run ",
+  "modification": 3
 }
diff --git a/.github/trigger_files/beam_PreCommit_SQL.json 
b/.github/trigger_files/beam_PreCommit_SQL.json
index 5abe02fc09c..ab4daeae234 100644
--- a/.github/trigger_files/beam_PreCommit_SQL.json
+++ b/.github/trigger_files/beam_PreCommit_SQL.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-  "modification": 1
+  "modification": 3
 }
diff --git 
a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
 
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
index 0ca38824204..7dee72511e8 100644
--- 
a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
+++ 
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
@@ -19,6 +19,7 @@ package 
org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalog;
@@ -71,6 +72,11 @@ public class IcebergCatalog extends InMemoryCatalog {
     return catalogConfig.createNamespace(database);
   }
 
+  @Override
+  public Collection<String> databases() {
+    return catalogConfig.listNamespaces();
+  }
+
   @Override
   public void useDatabase(String database) {
     checkArgument(databaseExists(database), "Database '%s' does not exist.");
diff --git 
a/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineShowTest.java
 
b/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineShowTest.java
new file mode 100644
index 00000000000..0b593a1b2cf
--- /dev/null
+++ 
b/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineShowTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.jdbc;
+
+import static 
org.apache.beam.sdk.extensions.sql.jdbc.BeamSqlLineTestingUtils.buildArgs;
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.oneOf;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+
+public class BeamSqlLineShowTest {
+  @Test
+  public void testShowTables() throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    String[] args =
+        buildArgs(
+            "CREATE DATABASE other_db",
+            "CREATE EXTERNAL TABLE other_db.should_not_show_up (id int, name 
varchar) TYPE 'text'",
+            "CREATE CATALOG my_catalog TYPE 'local'",
+            "CREATE DATABASE my_catalog.my_db",
+            "USE DATABASE my_catalog.my_db",
+            "CREATE EXTERNAL TABLE my_table (id int, name varchar) TYPE 
'text'",
+            "CREATE EXTERNAL TABLE my_other_table (col1 int, col2 timestamp) 
TYPE 'text'",
+            "CREATE EXTERNAL TABLE my_other_table_with_a_long_name (foo 
varchar, bar boolean) TYPE 'test'",
+            "SHOW TABLES");
+
+    BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+    List<String> lines = 
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+    System.out.println(byteArrayOutputStream.toString("UTF-8"));
+    assertThat(
+        Arrays.asList(
+            "+------+------+",
+            "| NAME | TYPE |",
+            "+------+------+",
+            "| my_other_table | text |",
+            "| my_other_table_with_a_long_name | test |",
+            "| my_table | text |",
+            "+------+------+"),
+        everyItem(is(oneOf(lines.toArray()))));
+  }
+
+  @Test
+  public void testShowTablesInOtherDatabase() throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    String[] args =
+        buildArgs(
+            "CREATE DATABASE my_db",
+            "USE DATABASE my_db",
+            "CREATE EXTERNAL TABLE should_not_show_up (id int, name varchar) 
TYPE 'text'",
+            "CREATE CATALOG other_catalog TYPE 'local'",
+            "CREATE DATABASE other_catalog.other_db",
+            "CREATE EXTERNAL TABLE other_catalog.other_db.other_table (id int, 
name varchar) TYPE 'text'",
+            "SHOW TABLES IN other_catalog.other_db");
+
+    BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+    List<String> lines = 
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+    assertThat(
+        Arrays.asList(
+            "+------+------+",
+            "| NAME | TYPE |",
+            "+------+------+",
+            "| other_table | text |",
+            "+------+------+"),
+        everyItem(is(oneOf(lines.toArray()))));
+  }
+
+  @Test
+  public void testShowTablesWithPattern() throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    String[] args =
+        buildArgs(
+            "CREATE DATABASE my_db",
+            "USE DATABASE my_db",
+            "CREATE EXTERNAL TABLE my_table (id int, name varchar) TYPE 
'text'",
+            "CREATE EXTERNAL TABLE my_table_2 (id int, name varchar) TYPE 
'text'",
+            "CREATE EXTERNAL TABLE my_foo_table_1 (id int, name varchar) TYPE 
'text'",
+            "CREATE EXTERNAL TABLE my_foo_table_2 (id int, name varchar) TYPE 
'text'",
+            "SHOW TABLES LIKE '%foo%'");
+
+    BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+    List<String> lines = 
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+    assertThat(
+        Arrays.asList(
+            "+------+------+",
+            "| NAME | TYPE |",
+            "+------+------+",
+            "| my_foo_table_1 | text |",
+            "| my_foo_table_2 | text |",
+            "+------+------+"),
+        everyItem(is(oneOf(lines.toArray()))));
+  }
+
+  @Test
+  public void testShowCurrentDatabase() throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    String[] args =
+        buildArgs(
+            "CREATE DATABASE should_not_show_up",
+            "CREATE CATALOG my_catalog TYPE 'local'",
+            "USE CATALOG my_catalog",
+            "CREATE DATABASE my_db",
+            "CREATE DATABASE my_other_db",
+            "CREATE DATABASE my_database_that_has_a_very_long_name",
+            "USE DATABASE my_other_db",
+            "SHOW CURRENT database");
+
+    BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+    List<String> lines = 
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+    assertThat(
+        Arrays.asList("+------+", "| NAME |", "+------+", "| my_other_db |", 
"+------+"),
+        everyItem(is(oneOf(lines.toArray()))));
+  }
+
+  @Test
+  public void testShowCurrentDatabaseWithNoneSet() throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    String[] args =
+        buildArgs(
+            "CREATE DATABASE should_not_show_up",
+            "CREATE CATALOG my_catalog TYPE 'local'",
+            "USE CATALOG my_catalog",
+            "DROP DATABASE `default`",
+            "SHOW CURRENT DATABASE");
+
+    BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+    List<String> lines = 
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+    assertThat(
+        Arrays.asList("+------+", "| NAME |", "+------+", "+------+"),
+        everyItem(is(oneOf(lines.toArray()))));
+  }
+
+  @Test
+  public void testShowDatabases() throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    String[] args =
+        buildArgs(
+            "CREATE DATABASE should_not_show_up",
+            "CREATE CATALOG my_catalog TYPE 'local'",
+            "USE CATALOG my_catalog",
+            "CREATE DATABASE my_db",
+            "CREATE DATABASE my_other_db",
+            "CREATE DATABASE my_database_that_has_a_very_long_name",
+            "SHOW DATABASES");
+
+    BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+    List<String> lines = 
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+    System.out.println(byteArrayOutputStream.toString("UTF-8"));
+    assertThat(
+        Arrays.asList(
+            "+------+",
+            "| NAME |",
+            "+------+",
+            "| default |",
+            "| my_database_that_has_a_very_long_name |",
+            "| my_db |",
+            "| my_other_db |",
+            "+------+"),
+        everyItem(is(oneOf(lines.toArray()))));
+  }
+
+  @Test
+  public void testShowDatabasesInOtherCatalog() throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    String[] args =
+        buildArgs(
+            "CREATE DATABASE should_not_show_up",
+            "CREATE CATALOG my_catalog TYPE 'local'",
+            "USE CATALOG my_catalog",
+            "CREATE DATABASE my_db",
+            "CREATE CATALOG my_other_catalog TYPE 'local'",
+            "CREATE DATABASE my_other_catalog.other_db",
+            "SHOW DATABASES FROM my_other_catalog");
+
+    BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+    List<String> lines = 
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+    assertThat(
+        Arrays.asList(
+            "+------+", "| NAME |", "+------+", "| default |", "| other_db |", 
"+------+"),
+        everyItem(is(oneOf(lines.toArray()))));
+  }
+
+  @Test
+  public void testShowDatabasesWithPattern() throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    String[] args =
+        buildArgs(
+            "CREATE CATALOG my_catalog TYPE 'local'",
+            "CREATE DATABASE my_catalog.my_db",
+            "CREATE DATABASE my_catalog.other_db",
+            "CREATE DATABASE my_catalog.some_foo_db",
+            "CREATE DATABASE my_catalog.some_other_foo_db",
+            "SHOW DATABASES FROM my_catalog LIKE '%foo%'");
+
+    BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+    List<String> lines = 
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+    assertThat(
+        Arrays.asList(
+            "+------+",
+            "| NAME |",
+            "+------+",
+            "| some_foo_db |",
+            "| some_other_foo_db |",
+            "+------+"),
+        everyItem(is(oneOf(lines.toArray()))));
+  }
+
+  @Test
+  public void testShowCurrentCatalog() throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    String[] args =
+        buildArgs(
+            "CREATE CATALOG my_catalog TYPE 'local'",
+            "CREATE CATALOG my_very_long_catalog_name TYPE 'local'",
+            "SHOW CURRENT CATALOG");
+
+    BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+    List<String> lines = 
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+    assertThat(
+        Arrays.asList(
+            "+------+------+",
+            "| NAME | TYPE |",
+            "+------+------+",
+            "| default | local |",
+            "+------+------+"),
+        everyItem(is(oneOf(lines.toArray()))));
+  }
+
+  @Test
+  public void testShowCatalogs() throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    String[] args =
+        buildArgs(
+            "CREATE CATALOG my_catalog TYPE 'local'",
+            "CREATE CATALOG my_very_long_catalog_name TYPE 'local'",
+            "SHOW CATALOGS");
+
+    BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+    List<String> lines = 
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+    System.out.println(byteArrayOutputStream.toString("UTF-8"));
+    assertThat(
+        Arrays.asList(
+            "+------+------+",
+            "| NAME | TYPE |",
+            "+------+------+",
+            "| default | local |",
+            "| my_catalog | local |",
+            "| my_very_long_catalog_name | local |",
+            "+------+------+"),
+        everyItem(is(oneOf(lines.toArray()))));
+  }
+
+  @Test
+  public void testShowCatalogsWithPattern() throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    String[] args =
+        buildArgs(
+            "CREATE CATALOG my_catalog TYPE 'local'",
+            "CREATE CATALOG my_catalog_2 TYPE 'local'",
+            "CREATE CATALOG my_very_long_catalog_name TYPE 'local'",
+            "SHOW CATALOGS LIKE 'my_catalog%'");
+
+    BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+    List<String> lines = 
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+    assertThat(
+        Arrays.asList(
+            "+------+------+",
+            "| NAME | TYPE |",
+            "+------+------+",
+            "| my_catalog | local |",
+            "| my_catalog_2 | local |",
+            "+------+------+"),
+        everyItem(is(oneOf(lines.toArray()))));
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/codegen/config.fmpp 
b/sdks/java/extensions/sql/src/main/codegen/config.fmpp
index 77772c5858e..73af7e18150 100644
--- a/sdks/java/extensions/sql/src/main/codegen/config.fmpp
+++ b/sdks/java/extensions/sql/src/main/codegen/config.fmpp
@@ -50,6 +50,9 @@ data: {
         "TBLPROPERTIES"
         "PROPERTIES"
         "PARTITIONED"
+        "CATALOGS"
+        "DATABASES"
+        "TABLES"
         "USE"
       ]
 
@@ -422,6 +425,10 @@ data: {
       # Return type of method implementation should be 'SqlNode'.
       # Example: SqlShowDatabases(), SqlShowTables().
       statementParserMethods: [
+        "SqlShowTables(Span.of())"
+        "SqlShowDatabases(Span.of())"
+        "SqlShowCatalogs(Span.of())"
+        "SqlShowCurrent(Span.of())"
         "SqlUseCatalog(Span.of(), null)"
         "SqlUseDatabase(Span.of(), null)"
         "SqlSetOptionBeam(Span.of(), null)"
diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl 
b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
index 46102c7b92f..d3bb8c2af56 100644
--- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
+++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
@@ -264,6 +264,47 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) :
     }
 }
 
+/**
+ * SHOW CATALOGS [ LIKE regex_pattern ]
+ */
+SqlCall SqlShowCatalogs(Span s) :
+{
+    SqlNode regex = null;
+}
+{
+    <SHOW> <CATALOGS> { s.add(this); }
+    [ <LIKE> regex = StringLiteral() ]
+    {
+        List<String> path = new ArrayList<String>();
+        path.add("beamsystem");
+        path.add("catalogs");
+        SqlNodeList selectList = 
SqlNodeList.of(SqlIdentifier.star(s.end(this)));
+        SqlIdentifier from = new SqlIdentifier(path, s.end(this));
+        SqlNode where = null;
+        if (regex != null) {
+            SqlIdentifier nameIdentifier = new SqlIdentifier("NAME", 
s.end(this));
+            where = SqlStdOperatorTable.LIKE.createCall(
+                s.end(this),
+                nameIdentifier, regex);
+        }
+
+        return new SqlSelect(
+            s.end(this),
+            null,
+            selectList,
+            from,
+            where,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null);
+    }
+}
+
+
 /**
  * CREATE DATABASE ( IF NOT EXISTS )? ( catalog_name '.' )? database_name
  */
@@ -331,6 +372,98 @@ SqlDrop SqlDropDatabase(Span s, boolean replace) :
     }
 }
 
+/**
+ * SHOW DATABASES [ ( FROM | IN )? catalog_name ] [LIKE regex_pattern ]
+ */
+SqlCall SqlShowDatabases(Span s) :
+{
+    SqlIdentifier catalogName = null;
+    SqlNode regex = null;
+}
+{
+    <SHOW> <DATABASES> { s.add(this); }
+    [ ( <FROM> | <IN> ) catalogName = SimpleIdentifier() ]
+    [ <LIKE> regex = StringLiteral() ]
+    {
+        List<String> path = new ArrayList<String>();
+        path.add("beamsystem");
+        path.add("databases");
+        SqlNodeList selectList = 
SqlNodeList.of(SqlIdentifier.star(s.end(this)));
+        SqlNode where = null;
+        if (regex != null) {
+            SqlIdentifier nameIdentifier = new SqlIdentifier("NAME", 
s.end(this));
+            where = SqlStdOperatorTable.LIKE.createCall(
+                s.end(this),
+                nameIdentifier, regex);
+        }
+        if (catalogName != null) {
+            path.add(catalogName.getSimple());
+        } else {
+            path.add("__current_catalog__");
+        }
+        SqlIdentifier from = new SqlIdentifier(path, s.end(this));
+
+        return new SqlSelect(
+            s.end(this),
+            null,
+            selectList,
+            from,
+            where,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null);
+    }
+}
+
+/**
+ * SHOW CURRENT ( CATALOG | DATABASE )
+ */
+SqlCall SqlShowCurrent(Span s) :
+{
+}
+{
+    <SHOW> <CURRENT> { s.add(this); }
+    {
+        List<String> path = new ArrayList<String>();
+        path.add("beamsystem");
+    }
+    (
+        <CATALOG> {
+            path.add("__current_catalog__");
+        }
+    |
+        <DATABASE> {
+            path.add("__current_database__");
+        }
+    )
+    {
+        if (path.size() != 2) {
+            throw new ParseException(
+                "Expected SHOW CURRENT CATALOG or SHOW CURRENT DATABASE");
+        }
+        SqlNodeList selectList = 
SqlNodeList.of(SqlIdentifier.star(s.end(this)));
+        SqlIdentifier from = new SqlIdentifier(path, s.end(this));
+
+        return new SqlSelect(
+            s.end(this),
+            null,
+            selectList,
+            from,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null);
+    }
+}
+
 
 SqlNodeList PartitionFieldList() :
 {
@@ -456,6 +589,64 @@ SqlDrop SqlDropTable(Span s, boolean replace) :
     }
 }
 
+/**
+ * SHOW TABLES [ ( FROM | IN )? [ catalog_name '.' ] database_name ] [ LIKE 
regex_pattern ]
+ */
+SqlCall SqlShowTables(Span s) :
+{
+    SqlIdentifier databaseCatalog = null;
+    SqlNode regex = null;
+}
+{
+    <SHOW> <TABLES> { s.add(this); }
+    [ (<FROM> | <IN>) databaseCatalog = CompoundIdentifier() ]
+    [ <LIKE> regex = StringLiteral() ]
+    {
+        List<String> path = new ArrayList<String>();
+        path.add("beamsystem");
+        path.add("tables");
+        SqlNodeList selectList = 
SqlNodeList.of(SqlIdentifier.star(s.end(this)));
+        SqlNode where = null;
+        if (regex != null) {
+            SqlIdentifier nameIdentifier = new SqlIdentifier("NAME", 
s.end(this));
+            where = SqlStdOperatorTable.LIKE.createCall(
+                s.end(this),
+                nameIdentifier, regex);
+        }
+        if (databaseCatalog != null) {
+            List<String> components = databaseCatalog.names;
+            if (components.size() == 1) {
+                path.add("__current_catalog__");
+                path.add(components.get(0));
+            } else if (components.size() == 2) {
+                path.addAll(components);
+            } else {
+                throw new ParseException(
+                    "SHOW TABLES FROM/IN accepts at most a catalog name and a 
database name.");
+            }
+        } else {
+            path.add("__current_catalog__");
+            path.add("__current_database__");
+        }
+        SqlIdentifier from = new SqlIdentifier(path, s.end(this));
+
+        return new SqlSelect(
+            s.end(this),
+            null,
+            selectList,
+            from,
+            where,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null);
+    }
+}
+
+
 Schema.FieldType FieldType() :
 {
     final SqlTypeName collectionTypeName;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
index 6ef6e82e6a7..f7783e7c3ec 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
@@ -129,6 +129,10 @@ public class BeamCalciteSchema implements Schema {
         connection.getPipelineOptions());
   }
 
+  public Collection<Table> getTables() {
+    return tableProvider.getTables().values();
+  }
+
   @Override
   public Set<String> getFunctionNames() {
     return Collections.emptySet();
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemDbMetadataSchema.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemDbMetadataSchema.java
new file mode 100644
index 00000000000..66c05a35313
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemDbMetadataSchema.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.sql.meta.SystemTables;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.linq4j.tree.Expression;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelProtoDataType;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaVersion;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schemas;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** A Calcite {@link Schema} responsible for {@code SHOW DATABASES} requests. 
*/
+public class BeamSystemDbMetadataSchema implements Schema {
+  private final CatalogManager catalogManager;
+
+  BeamSystemDbMetadataSchema(CatalogManager catalogManager) {
+    this.catalogManager = catalogManager;
+  }
+
+  @Override
+  public @Nullable Table getTable(String catalogName) {
+    Catalog catalog;
+    if (catalogName.equals("__current_catalog__")) {
+      catalog = catalogManager.currentCatalog();
+    } else {
+      catalog =
+          checkArgumentNotNull(
+              catalogManager.getCatalog(catalogName), "Catalog '%s' does not 
exist.", catalogName);
+    }
+
+    return BeamCalciteTable.of(SystemTables.databases(catalog, false));
+  }
+
+  @Override
+  public Set<String> getTableNames() {
+    return 
catalogManager.catalogs().stream().map(Catalog::name).collect(Collectors.toSet());
+  }
+
+  @Override
+  public @Nullable Schema getSubSchema(@Nullable String name) {
+    return null;
+  }
+
+  @Override
+  public Set<String> getSubSchemaNames() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Set<String> getTypeNames() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public @Nullable RelProtoDataType getType(String s) {
+    return null;
+  }
+
+  @Override
+  public Collection<Function> getFunctions(String s) {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Set<String> getFunctionNames() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Expression getExpression(@Nullable SchemaPlus schemaPlus, String s) {
+    return Schemas.subSchemaExpression(checkStateNotNull(schemaPlus), s, 
getClass());
+  }
+
+  @Override
+  public boolean isMutable() {
+    return true;
+  }
+
+  @Override
+  public Schema snapshot(SchemaVersion schemaVersion) {
+    return this;
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemSchema.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemSchema.java
new file mode 100644
index 00000000000..c9f7c417ca9
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemSchema.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.beam.sdk.extensions.sql.meta.SystemTables;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.linq4j.tree.Expression;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelProtoDataType;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaVersion;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schemas;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A Calcite {@link Schema} specialized for displaying the session's metadata. 
Top node that manages
+ * requests to {@code SHOW} {@code CATALOGS}, {@code DATABASES}, and {@code 
TABLES}. Used by {@link
+ * CatalogManagerSchema}.
+ *
+ * <p>{@code SHOW} requests are treated as aliases, listed below:
+ *
+ * <ul>
+ *   <li>{@code SHOW CURRENT CATALOG} --> {@code SELECT * FROM 
`beamsystem`.`__current_catalog__`}
+ *   <li>{@code SHOW CATALOGS} --> {@code SELECT * FROM 
`beamsystem`.`catalogs`}
+ *   <li>{@code SHOW CATALOGS LIKE '{pattern}'} --> {@code SELECT * FROM 
`beamsystem`.`catalogs`
+ *       WHERE NAME LIKE '{pattern}'}
+ *   <li>{@code SHOW CURRENT DATABASE} --> {@code SELECT * FROM 
`beamsystem`.`__current_database__`}
+ *   <li>{@code SHOW DATABASES} --> {@code SELECT * FROM
+ *       `beamsystem`.`databases`.`__current_catalog__`}
+ *   <li>{@code SHOW DATABASES FROM my_catalog} --> {@code SELECT * FROM
+ *       `beamsystem`.`databases`.`my_catalog`}
+ *   <li>{@code SHOW DATABASES FROM my_catalog LIKE '{pattern}'} --> {@code 
SELECT * FROM
+ *       `beamsystem`.`databases`.`my_catalog` WHERE NAME LIKE '{pattern}'}
+ *   <li>{@code SHOW TABLES} --> {@code SELECT * FROM
+ *       `beamsystem`.`tables`.`__current_catalog__`.`__current_database__`}
+ *   <li>{@code SHOW TABLES FROM my_db} --> {@code SELECT * FROM
+ *       `beamsystem`.`tables`.`__current_catalog__`.`my_db`}
+ *   <li>{@code SHOW TABLES FROM my_catalog.my_db} --> {@code SELECT * FROM
+ *       `beamsystem`.`tables`.`my_catalog`.`my_db`}
+ *   <li>{@code SHOW TABLES FROM my_catalog.my_db LIKE '{pattern}'} --> {@code 
SELECT * FROM
+ *       `beamsystem`.`tables`.`my_catalog`.`my_db` WHERE NAME LIKE 
'{pattern}'}
+ * </ul>
+ */
+public class BeamSystemSchema implements Schema {
+  private final CatalogManager catalogManager;
+  private final BeamSystemDbMetadataSchema dbSchema;
+  private final BeamSystemTableMetadataSchema tableSchema;
+  public static final String BEAMSYSTEM = "beamsystem";
+  private static final String CATALOGS = "catalogs";
+  private static final String DATABASES = "databases";
+  private static final String TABLES = "tables";
+
+  BeamSystemSchema(CatalogManager catalogManager) {
+    this.catalogManager = catalogManager;
+    this.dbSchema = new BeamSystemDbMetadataSchema(catalogManager);
+    this.tableSchema = new BeamSystemTableMetadataSchema(catalogManager, null);
+  }
+
+  @Override
+  public @Nullable Table getTable(String table) {
+    switch (table) {
+      case CATALOGS:
+        return BeamCalciteTable.of(SystemTables.catalogs(catalogManager, 
false));
+      case "__current_catalog__":
+        return BeamCalciteTable.of(SystemTables.catalogs(catalogManager, 
true));
+      case "__current_database__":
+        return 
BeamCalciteTable.of(SystemTables.databases(catalogManager.currentCatalog(), 
true));
+      default:
+        return null;
+    }
+  }
+
+  @Override
+  public Set<String> getTableNames() {
+    return ImmutableSet.of(CATALOGS);
+  }
+
+  @Override
+  public @Nullable Schema getSubSchema(@Nullable String name) {
+    if (name == null) {
+      return null;
+    }
+    switch (name) {
+      case DATABASES:
+        return dbSchema;
+      case TABLES:
+        return tableSchema;
+      default:
+        return null;
+    }
+  }
+
+  @Override
+  public Set<String> getSubSchemaNames() {
+    return ImmutableSet.of(DATABASES, TABLES);
+  }
+
+  @Override
+  public Set<String> getTypeNames() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public @Nullable RelProtoDataType getType(String s) {
+    return null;
+  }
+
+  @Override
+  public Collection<Function> getFunctions(String s) {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Set<String> getFunctionNames() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Expression getExpression(@Nullable SchemaPlus schemaPlus, String s) {
+    return Schemas.subSchemaExpression(checkStateNotNull(schemaPlus), s, 
getClass());
+  }
+
+  @Override
+  public boolean isMutable() {
+    return true;
+  }
+
+  @Override
+  public Schema snapshot(SchemaVersion schemaVersion) {
+    return this;
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemTableMetadataSchema.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemTableMetadataSchema.java
new file mode 100644
index 00000000000..b081a1b886c
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemTableMetadataSchema.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.beam.sdk.extensions.sql.meta.SystemTables;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.linq4j.tree.Expression;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelProtoDataType;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaVersion;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schemas;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** A Calcite {@link Schema} responsible for {@code SHOW TABLES} requests. */
+public class BeamSystemTableMetadataSchema implements Schema {
+  private final CatalogManager catalogManager;
+  private final @MonotonicNonNull String catalog;
+
+  BeamSystemTableMetadataSchema(CatalogManager catalogManager, @Nullable 
String catalog) {
+    this.catalogManager = catalogManager;
+    this.catalog = catalog;
+  }
+
+  @Override
+  public @Nullable Table getTable(String dbName) {
+    // returns a table if this instance has a catalog referenced
+    if (catalog == null) {
+      return null;
+    }
+
+    Catalog cat =
+        checkArgumentNotNull(
+            catalogManager.getCatalog(catalog), "Catalog '%s' does not 
exist.", catalog);
+    if (dbName.equals("__current_database__")) {
+      dbName =
+          checkStateNotNull(
+              cat.currentDatabase(),
+              "Catalog '%s' has not set a default database. Please specify 
one.");
+    }
+    return BeamCalciteTable.of(SystemTables.tables(cat, dbName));
+  }
+
+  @Override
+  public Set<String> getTableNames() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public @Nullable Schema getSubSchema(@Nullable String catalogName) {
+    // if this is a top instance (i.e. no catalog reference), return child 
schema with the specified
+    // catalog referenced
+    if (catalog == null && catalogName != null) {
+      if (catalogName.equals("__current_catalog__")) {
+        catalogName = catalogManager.currentCatalog().name();
+      }
+      return new BeamSystemTableMetadataSchema(catalogManager, catalogName);
+    }
+    return null;
+  }
+
+  @Override
+  public Set<String> getSubSchemaNames() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Set<String> getTypeNames() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public @Nullable RelProtoDataType getType(String s) {
+    return null;
+  }
+
+  @Override
+  public Collection<Function> getFunctions(String s) {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Set<String> getFunctionNames() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Expression getExpression(@Nullable SchemaPlus schemaPlus, String s) {
+    return Schemas.subSchemaExpression(checkStateNotNull(schemaPlus), s, 
getClass());
+  }
+
+  @Override
+  public boolean isMutable() {
+    return true;
+  }
+
+  @Override
+  public Schema snapshot(SchemaVersion schemaVersion) {
+    return this;
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogManagerSchema.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogManagerSchema.java
index ec225efc1c3..098b72b2869 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogManagerSchema.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogManagerSchema.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl;
 
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 import static 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE;
 
@@ -43,6 +44,7 @@ import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifi
 import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlUtil;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,11 +57,13 @@ public class CatalogManagerSchema implements Schema {
   private static final Logger LOG = 
LoggerFactory.getLogger(CatalogManagerSchema.class);
   private final JdbcConnection connection;
   private final CatalogManager catalogManager;
+  private final BeamSystemSchema beamSystemSchema;
   private final Map<String, CatalogSchema> catalogSubSchemas = new HashMap<>();
 
   CatalogManagerSchema(JdbcConnection jdbcConnection, CatalogManager 
catalogManager) {
     this.connection = jdbcConnection;
     this.catalogManager = catalogManager;
+    this.beamSystemSchema = new BeamSystemSchema(catalogManager);
   }
 
   @VisibleForTesting
@@ -176,15 +180,23 @@ public class CatalogManagerSchema implements Schema {
     return getCurrentCatalogSchema().getTableNames();
   }
 
+  /**
+   * Returns the {@link CatalogSchema} for the catalog referenced in this 
{@link TableName}. If the
+   * path does not reference a catalog, the currently use {@link 
CatalogSchema} will be returned.
+   */
   public CatalogSchema getCatalogSchema(TableName tablePath) {
-    @Nullable Schema catalogSchema = getSubSchema(tablePath.catalog());
-    if (catalogSchema == null) {
-      catalogSchema = getCurrentCatalogSchema();
-    }
+    return tablePath.catalog() != null
+        ? getCatalogSchema(tablePath.catalog())
+        : getCurrentCatalogSchema();
+  }
+
+  public CatalogSchema getCatalogSchema(@Nullable String catalog) {
+    Schema catalogSchema =
+        checkArgumentNotNull(getSubSchema(catalog), "Catalog '%s' not found.", 
catalog);
     Preconditions.checkState(
         catalogSchema instanceof CatalogSchema,
         "Unexpected Schema type for Catalog '%s': %s",
-        tablePath.catalog(),
+        catalog,
         catalogSchema.getClass());
     return (CatalogSchema) catalogSchema;
   }
@@ -202,6 +214,9 @@ public class CatalogManagerSchema implements Schema {
     if (name == null) {
       return null;
     }
+    if (name.equals(BeamSystemSchema.BEAMSYSTEM)) {
+      return beamSystemSchema;
+    }
     @Nullable CatalogSchema catalogSchema = catalogSubSchemas.get(name);
     if (catalogSchema == null) {
       @Nullable Catalog catalog = catalogManager.getCatalog(name);
@@ -222,7 +237,14 @@ public class CatalogManagerSchema implements Schema {
 
   @Override
   public Set<String> getSubSchemaNames() {
-    return 
catalogManager.catalogs().stream().map(Catalog::name).collect(Collectors.toSet());
+    return ImmutableSet.<String>builder()
+        
.addAll(catalogs().stream().map(Catalog::name).collect(Collectors.toSet()))
+        .add(BeamSystemSchema.BEAMSYSTEM)
+        .build();
+  }
+
+  public Collection<Catalog> catalogs() {
+    return catalogManager.catalogs();
   }
 
   public void setPipelineOption(String key, String value) {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
index ab644145b4f..de7903897b6 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
@@ -159,7 +159,7 @@ public class SqlCreateExternalTable extends SqlCreate 
implements BeamSqlParser.E
       CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) 
schema;
       catalogManagerSchema.maybeRegisterProvider(pathOverride, 
SqlDdlNodes.getString(type));
 
-      CatalogSchema catalogSchema = 
catalogManagerSchema.getCatalogSchema(pathOverride);
+      CatalogSchema catalogSchema = ((CatalogManagerSchema) 
schema).getCatalogSchema(pathOverride);
       beamCalciteSchema = catalogSchema.getDatabaseSchema(pathOverride);
     } else if (schema instanceof BeamCalciteSchema) {
       beamCalciteSchema = (BeamCalciteSchema) schema;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
index c5d162ebbb6..6f4d8ee79d9 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
@@ -85,7 +85,7 @@ public class SqlDdlNodes {
     }
   }
 
-  static @Nullable String getString(SqlNode n) {
+  static @Nullable String getString(@Nullable SqlNode n) {
     if (n == null) {
       return null;
     }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
index 9d06e471dbb..f0e3fa59ddc 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
@@ -78,7 +78,7 @@ public class SqlUseDatabase extends SqlSetOption implements 
BeamSqlParser.Execut
     }
 
     CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) schema;
-    CatalogSchema catalogSchema = 
catalogManagerSchema.getCatalogSchema(pathOverride);
+    CatalogSchema catalogSchema = ((CatalogManagerSchema) 
schema).getCatalogSchema(pathOverride);
     // if database exists in a different catalog, we need to also switch to 
that catalog
     if (pathOverride.catalog() != null
         && !pathOverride
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/SystemTables.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/SystemTables.java
new file mode 100644
index 00000000000..8e91e9eb030
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/SystemTables.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Provides {@link BeamSqlTable}s that track metadata around catalogs, 
databases, and tables. For
+ * now, it tracks the following:
+ *
+ * <ul>
+ *   <li>Catalogs: Name and Type
+ *   <li>Databases: Name
+ *   <li>Tables: Name and Type
+ * </ul>
+ */
+public class SystemTables {
+  public static CatalogsMetaTable catalogs(CatalogManager catalogManager, 
boolean currentOnly) {
+    return new CatalogsMetaTable(catalogManager, currentOnly);
+  }
+
+  public static DatabasesMetaTable databases(Catalog catalog, boolean 
currentOnly) {
+    return new DatabasesMetaTable(catalog, currentOnly);
+  }
+
+  public static TablesMetaTable tables(Catalog catalog, String dbName) {
+    return new TablesMetaTable(catalog, dbName);
+  }
+
+  public static class CatalogsMetaTable extends BaseBeamTable {
+    private final CatalogManager catalogManager;
+    private final boolean currentOnly;
+
+    private static final Schema SCHEMA =
+        Schema.builder().addStringField("NAME").addStringField("TYPE").build();
+
+    public CatalogsMetaTable(CatalogManager catalogManager, boolean 
currentOnly) {
+      this.catalogManager = catalogManager;
+      this.currentOnly = currentOnly;
+    }
+
+    @Override
+    public PCollection<Row> buildIOReader(PBegin begin) {
+      Collection<Catalog> catalogs =
+          currentOnly
+              ? ImmutableList.of(catalogManager.currentCatalog())
+              : catalogManager.catalogs();
+      List<Row> rows =
+          catalogs.stream()
+              .map(cat -> Row.withSchema(SCHEMA).addValues(cat.name(), 
cat.type()).build())
+              .collect(Collectors.toList());
+
+      return begin.apply(Create.of(rows).withRowSchema(SCHEMA));
+    }
+
+    @Override
+    public POutput buildIOWriter(PCollection<Row> input) {
+      throw new UnsupportedOperationException("Cannot write to SHOW CATALOGS");
+    }
+
+    @Override
+    public PCollection.IsBounded isBounded() {
+      return PCollection.IsBounded.BOUNDED;
+    }
+
+    @Override
+    public Schema getSchema() {
+      return SCHEMA;
+    }
+  }
+
+  public static class DatabasesMetaTable extends BaseBeamTable {
+    private final Catalog catalog;
+    private final boolean currentOnly;
+    private static final Schema SCHEMA = 
Schema.builder().addStringField("NAME").build();
+
+    DatabasesMetaTable(Catalog catalog, boolean currentOnly) {
+      this.catalog = catalog;
+      this.currentOnly = currentOnly;
+    }
+
+    @Override
+    public PCollection<Row> buildIOReader(PBegin begin) {
+      Collection<String> databases;
+      if (currentOnly) {
+        @Nullable String currentDb = catalog.currentDatabase();
+        databases = currentDb != null ? Collections.singleton(currentDb) : 
Collections.emptyList();
+      } else {
+        databases = catalog.databases();
+      }
+      List<Row> rows =
+          databases.stream()
+              .map(db -> Row.withSchema(SCHEMA).addValues(db).build())
+              .collect(Collectors.toList());
+
+      return begin.apply(Create.of(rows).withRowSchema(SCHEMA));
+    }
+
+    @Override
+    public POutput buildIOWriter(PCollection<Row> input) {
+      throw new UnsupportedOperationException("Cannot write to SHOW 
DATABASES");
+    }
+
+    @Override
+    public PCollection.IsBounded isBounded() {
+      return PCollection.IsBounded.BOUNDED;
+    }
+
+    @Override
+    public Schema getSchema() {
+      return SCHEMA;
+    }
+  }
+
+  public static class TablesMetaTable extends BaseBeamTable {
+    private final Catalog catalog;
+    private final String dbName;
+    private static final Schema SCHEMA =
+        Schema.builder().addStringField("NAME").addStringField("TYPE").build();
+
+    public TablesMetaTable(Catalog catalog, String dbName) {
+      this.catalog = catalog;
+      this.dbName = dbName;
+    }
+
+    @Override
+    public PCollection<Row> buildIOReader(PBegin begin) {
+      // Note: This captures the state *at the moment of planning*
+      List<Row> rows =
+          catalog.metaStore(dbName).getTables().values().stream()
+              .map(
+                  table ->
+                      Row.withSchema(SCHEMA).addValues(table.getName(), 
table.getType()).build())
+              .collect(Collectors.toList());
+
+      return begin.apply(Create.of(rows).withRowSchema(SCHEMA));
+    }
+
+    @Override
+    public POutput buildIOWriter(PCollection<Row> input) {
+      throw new UnsupportedOperationException("Cannot write to SHOW TABLES");
+    }
+
+    @Override
+    public PCollection.IsBounded isBounded() {
+      return PCollection.IsBounded.BOUNDED;
+    }
+
+    @Override
+    public Schema getSchema() {
+      return SCHEMA;
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
index db7724a4809..c387a5ace10 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.catalog;
 
+import java.util.Collection;
 import java.util.Map;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
@@ -50,6 +51,9 @@ public interface Catalog {
   @Nullable
   String currentDatabase();
 
+  /** Returns a collection of existing database names. */
+  Collection<String> databases();
+
   /**
    * Creates a database with this name.
    *
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
index 3c7ef5623b1..7c0d8b9d32e 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.meta.catalog;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -97,6 +98,11 @@ public class InMemoryCatalog implements Catalog {
     return currentDatabase;
   }
 
+  @Override
+  public Collection<String> databases() {
+    return databases;
+  }
+
   @Override
   public boolean dropDatabase(String database, boolean cascade) {
     checkState(!cascade, "%s does not support CASCADE.", 
getClass().getSimpleName());
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java
index cca1bfd93f2..588caa78a2b 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java
@@ -105,8 +105,8 @@ public class BeamSqlCliDatabaseTest {
     assertEquals(
         ImmutableSet.of("default"),
         
catalogManager.catalogs().stream().map(Catalog::name).collect(Collectors.toSet()));
-    thrown.expect(CalciteContextException.class);
-    thrown.expectMessage("Cannot use catalog: 'my_catalog' not found.");
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Catalog 'my_catalog' not found");
     cli.execute("USE DATABASE my_catalog.my_database");
   }
 

Reply via email to