This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d119b29da2a [SQL] add SHOW command (#36509)
d119b29da2a is described below
commit d119b29da2a40d43b88134fff3859a5f409f914c
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Tue Dec 9 13:45:39 2025 -0500
[SQL] add SHOW command (#36509)
* add SHOW command
* add SHOW CURRENT
* add SHOW CURRENT
* add LIKE pattern
* spotless
* use SqlCall; remove @Nullable
* spotless
* minor fixes
* use PrintWriter and flush all at once
* correct PrintWriter usage
* cleanup
* fix name
* address comments
* spotless
---
.github/trigger_files/beam_PostCommit_SQL.json | 3 +-
.github/trigger_files/beam_PreCommit_SQL.json | 2 +-
.../sql/meta/provider/iceberg/IcebergCatalog.java | 6 +
.../extensions/sql/jdbc/BeamSqlLineShowTest.java | 306 +++++++++++++++++++++
.../extensions/sql/src/main/codegen/config.fmpp | 7 +
.../sql/src/main/codegen/includes/parserImpls.ftl | 191 +++++++++++++
.../sdk/extensions/sql/impl/BeamCalciteSchema.java | 4 +
.../sql/impl/BeamSystemDbMetadataSchema.java | 111 ++++++++
.../sdk/extensions/sql/impl/BeamSystemSchema.java | 155 +++++++++++
.../sql/impl/BeamSystemTableMetadataSchema.java | 126 +++++++++
.../extensions/sql/impl/CatalogManagerSchema.java | 34 ++-
.../sql/impl/parser/SqlCreateExternalTable.java | 2 +-
.../extensions/sql/impl/parser/SqlDdlNodes.java | 2 +-
.../extensions/sql/impl/parser/SqlUseDatabase.java | 2 +-
.../beam/sdk/extensions/sql/meta/SystemTables.java | 182 ++++++++++++
.../sdk/extensions/sql/meta/catalog/Catalog.java | 4 +
.../sql/meta/catalog/InMemoryCatalog.java | 6 +
.../sdk/extensions/sql/BeamSqlCliDatabaseTest.java | 4 +-
18 files changed, 1134 insertions(+), 13 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_SQL.json
b/.github/trigger_files/beam_PostCommit_SQL.json
index 3700163b299..5df3841d236 100644
--- a/.github/trigger_files/beam_PostCommit_SQL.json
+++ b/.github/trigger_files/beam_PostCommit_SQL.json
@@ -1,3 +1,4 @@
{
- "https://github.com/apache/beam/pull/36890": "fixing some null errors"
+ "comment": "Modify this file in a trivial way to cause this test suite to
run ",
+ "modification": 3
}
diff --git a/.github/trigger_files/beam_PreCommit_SQL.json
b/.github/trigger_files/beam_PreCommit_SQL.json
index 5abe02fc09c..ab4daeae234 100644
--- a/.github/trigger_files/beam_PreCommit_SQL.json
+++ b/.github/trigger_files/beam_PreCommit_SQL.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
- "modification": 1
+ "modification": 3
}
diff --git
a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
index 0ca38824204..7dee72511e8 100644
---
a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
+++
b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java
@@ -19,6 +19,7 @@ package
org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalog;
@@ -71,6 +72,11 @@ public class IcebergCatalog extends InMemoryCatalog {
return catalogConfig.createNamespace(database);
}
+ @Override
+ public Collection<String> databases() {
+ return catalogConfig.listNamespaces();
+ }
+
@Override
public void useDatabase(String database) {
checkArgument(databaseExists(database), "Database '%s' does not exist.");
diff --git
a/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineShowTest.java
b/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineShowTest.java
new file mode 100644
index 00000000000..0b593a1b2cf
--- /dev/null
+++
b/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineShowTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.jdbc;
+
+import static
org.apache.beam.sdk.extensions.sql.jdbc.BeamSqlLineTestingUtils.buildArgs;
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.oneOf;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+
+public class BeamSqlLineShowTest {
+ @Test
+ public void testShowTables() throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ String[] args =
+ buildArgs(
+ "CREATE DATABASE other_db",
+ "CREATE EXTERNAL TABLE other_db.should_not_show_up (id int, name
varchar) TYPE 'text'",
+ "CREATE CATALOG my_catalog TYPE 'local'",
+ "CREATE DATABASE my_catalog.my_db",
+ "USE DATABASE my_catalog.my_db",
+ "CREATE EXTERNAL TABLE my_table (id int, name varchar) TYPE
'text'",
+ "CREATE EXTERNAL TABLE my_other_table (col1 int, col2 timestamp)
TYPE 'text'",
+ "CREATE EXTERNAL TABLE my_other_table_with_a_long_name (foo
varchar, bar boolean) TYPE 'test'",
+ "SHOW TABLES");
+
+ BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+ List<String> lines =
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+ System.out.println(byteArrayOutputStream.toString("UTF-8"));
+ assertThat(
+ Arrays.asList(
+ "+------+------+",
+ "| NAME | TYPE |",
+ "+------+------+",
+ "| my_other_table | text |",
+ "| my_other_table_with_a_long_name | test |",
+ "| my_table | text |",
+ "+------+------+"),
+ everyItem(is(oneOf(lines.toArray()))));
+ }
+
+ @Test
+ public void testShowTablesInOtherDatabase() throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ String[] args =
+ buildArgs(
+ "CREATE DATABASE my_db",
+ "USE DATABASE my_db",
+ "CREATE EXTERNAL TABLE should_not_show_up (id int, name varchar)
TYPE 'text'",
+ "CREATE CATALOG other_catalog TYPE 'local'",
+ "CREATE DATABASE other_catalog.other_db",
+ "CREATE EXTERNAL TABLE other_catalog.other_db.other_table (id int,
name varchar) TYPE 'text'",
+ "SHOW TABLES IN other_catalog.other_db");
+
+ BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+ List<String> lines =
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+ assertThat(
+ Arrays.asList(
+ "+------+------+",
+ "| NAME | TYPE |",
+ "+------+------+",
+ "| other_table | text |",
+ "+------+------+"),
+ everyItem(is(oneOf(lines.toArray()))));
+ }
+
+ @Test
+ public void testShowTablesWithPattern() throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ String[] args =
+ buildArgs(
+ "CREATE DATABASE my_db",
+ "USE DATABASE my_db",
+ "CREATE EXTERNAL TABLE my_table (id int, name varchar) TYPE
'text'",
+ "CREATE EXTERNAL TABLE my_table_2 (id int, name varchar) TYPE
'text'",
+ "CREATE EXTERNAL TABLE my_foo_table_1 (id int, name varchar) TYPE
'text'",
+ "CREATE EXTERNAL TABLE my_foo_table_2 (id int, name varchar) TYPE
'text'",
+ "SHOW TABLES LIKE '%foo%'");
+
+ BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+ List<String> lines =
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+ assertThat(
+ Arrays.asList(
+ "+------+------+",
+ "| NAME | TYPE |",
+ "+------+------+",
+ "| my_foo_table_1 | text |",
+ "| my_foo_table_2 | text |",
+ "+------+------+"),
+ everyItem(is(oneOf(lines.toArray()))));
+ }
+
+ @Test
+ public void testShowCurrentDatabase() throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ String[] args =
+ buildArgs(
+ "CREATE DATABASE should_not_show_up",
+ "CREATE CATALOG my_catalog TYPE 'local'",
+ "USE CATALOG my_catalog",
+ "CREATE DATABASE my_db",
+ "CREATE DATABASE my_other_db",
+ "CREATE DATABASE my_database_that_has_a_very_long_name",
+ "USE DATABASE my_other_db",
+ "SHOW CURRENT database");
+
+ BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+ List<String> lines =
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+ assertThat(
+ Arrays.asList("+------+", "| NAME |", "+------+", "| my_other_db |",
"+------+"),
+ everyItem(is(oneOf(lines.toArray()))));
+ }
+
+ @Test
+ public void testShowCurrentDatabaseWithNoneSet() throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ String[] args =
+ buildArgs(
+ "CREATE DATABASE should_not_show_up",
+ "CREATE CATALOG my_catalog TYPE 'local'",
+ "USE CATALOG my_catalog",
+ "DROP DATABASE `default`",
+ "SHOW CURRENT DATABASE");
+
+ BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+ List<String> lines =
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+ assertThat(
+ Arrays.asList("+------+", "| NAME |", "+------+", "+------+"),
+ everyItem(is(oneOf(lines.toArray()))));
+ }
+
+ @Test
+ public void testShowDatabases() throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ String[] args =
+ buildArgs(
+ "CREATE DATABASE should_not_show_up",
+ "CREATE CATALOG my_catalog TYPE 'local'",
+ "USE CATALOG my_catalog",
+ "CREATE DATABASE my_db",
+ "CREATE DATABASE my_other_db",
+ "CREATE DATABASE my_database_that_has_a_very_long_name",
+ "SHOW DATABASES");
+
+ BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+ List<String> lines =
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+ System.out.println(byteArrayOutputStream.toString("UTF-8"));
+ assertThat(
+ Arrays.asList(
+ "+------+",
+ "| NAME |",
+ "+------+",
+ "| default |",
+ "| my_database_that_has_a_very_long_name |",
+ "| my_db |",
+ "| my_other_db |",
+ "+------+"),
+ everyItem(is(oneOf(lines.toArray()))));
+ }
+
+ @Test
+ public void testShowDatabasesInOtherCatalog() throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ String[] args =
+ buildArgs(
+ "CREATE DATABASE should_not_show_up",
+ "CREATE CATALOG my_catalog TYPE 'local'",
+ "USE CATALOG my_catalog",
+ "CREATE DATABASE my_db",
+ "CREATE CATALOG my_other_catalog TYPE 'local'",
+ "CREATE DATABASE my_other_catalog.other_db",
+ "SHOW DATABASES FROM my_other_catalog");
+
+ BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+ List<String> lines =
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+ assertThat(
+ Arrays.asList(
+ "+------+", "| NAME |", "+------+", "| default |", "| other_db |",
"+------+"),
+ everyItem(is(oneOf(lines.toArray()))));
+ }
+
+ @Test
+ public void testShowDatabasesWithPattern() throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ String[] args =
+ buildArgs(
+ "CREATE CATALOG my_catalog TYPE 'local'",
+ "CREATE DATABASE my_catalog.my_db",
+ "CREATE DATABASE my_catalog.other_db",
+ "CREATE DATABASE my_catalog.some_foo_db",
+ "CREATE DATABASE my_catalog.some_other_foo_db",
+ "SHOW DATABASES FROM my_catalog LIKE '%foo%'");
+
+ BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+ List<String> lines =
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+ assertThat(
+ Arrays.asList(
+ "+------+",
+ "| NAME |",
+ "+------+",
+ "| some_foo_db |",
+ "| some_other_foo_db |",
+ "+------+"),
+ everyItem(is(oneOf(lines.toArray()))));
+ }
+
+ @Test
+ public void testShowCurrentCatalog() throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ String[] args =
+ buildArgs(
+ "CREATE CATALOG my_catalog TYPE 'local'",
+ "CREATE CATALOG my_very_long_catalog_name TYPE 'local'",
+ "SHOW CURRENT CATALOG");
+
+ BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+ List<String> lines =
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+ assertThat(
+ Arrays.asList(
+ "+------+------+",
+ "| NAME | TYPE |",
+ "+------+------+",
+ "| default | local |",
+ "+------+------+"),
+ everyItem(is(oneOf(lines.toArray()))));
+ }
+
+ @Test
+ public void testShowCatalogs() throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ String[] args =
+ buildArgs(
+ "CREATE CATALOG my_catalog TYPE 'local'",
+ "CREATE CATALOG my_very_long_catalog_name TYPE 'local'",
+ "SHOW CATALOGS");
+
+ BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+ List<String> lines =
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+ System.out.println(byteArrayOutputStream.toString("UTF-8"));
+ assertThat(
+ Arrays.asList(
+ "+------+------+",
+ "| NAME | TYPE |",
+ "+------+------+",
+ "| default | local |",
+ "| my_catalog | local |",
+ "| my_very_long_catalog_name | local |",
+ "+------+------+"),
+ everyItem(is(oneOf(lines.toArray()))));
+ }
+
+ @Test
+ public void testShowCatalogsWithPattern() throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ String[] args =
+ buildArgs(
+ "CREATE CATALOG my_catalog TYPE 'local'",
+ "CREATE CATALOG my_catalog_2 TYPE 'local'",
+ "CREATE CATALOG my_very_long_catalog_name TYPE 'local'",
+ "SHOW CATALOGS LIKE 'my_catalog%'");
+
+ BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null);
+
+ List<String> lines =
Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n"));
+ assertThat(
+ Arrays.asList(
+ "+------+------+",
+ "| NAME | TYPE |",
+ "+------+------+",
+ "| my_catalog | local |",
+ "| my_catalog_2 | local |",
+ "+------+------+"),
+ everyItem(is(oneOf(lines.toArray()))));
+ }
+}
diff --git a/sdks/java/extensions/sql/src/main/codegen/config.fmpp
b/sdks/java/extensions/sql/src/main/codegen/config.fmpp
index 77772c5858e..73af7e18150 100644
--- a/sdks/java/extensions/sql/src/main/codegen/config.fmpp
+++ b/sdks/java/extensions/sql/src/main/codegen/config.fmpp
@@ -50,6 +50,9 @@ data: {
"TBLPROPERTIES"
"PROPERTIES"
"PARTITIONED"
+ "CATALOGS"
+ "DATABASES"
+ "TABLES"
"USE"
]
@@ -422,6 +425,10 @@ data: {
# Return type of method implementation should be 'SqlNode'.
# Example: SqlShowDatabases(), SqlShowTables().
statementParserMethods: [
+ "SqlShowTables(Span.of())"
+ "SqlShowDatabases(Span.of())"
+ "SqlShowCatalogs(Span.of())"
+ "SqlShowCurrent(Span.of())"
"SqlUseCatalog(Span.of(), null)"
"SqlUseDatabase(Span.of(), null)"
"SqlSetOptionBeam(Span.of(), null)"
diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
index 46102c7b92f..d3bb8c2af56 100644
--- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
+++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
@@ -264,6 +264,47 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) :
}
}
+/**
+ * SHOW CATALOGS [ LIKE regex_pattern ]
+ */
+SqlCall SqlShowCatalogs(Span s) :
+{
+ SqlNode regex = null;
+}
+{
+ <SHOW> <CATALOGS> { s.add(this); }
+ [ <LIKE> regex = StringLiteral() ]
+ {
+ List<String> path = new ArrayList<String>();
+ path.add("beamsystem");
+ path.add("catalogs");
+ SqlNodeList selectList =
SqlNodeList.of(SqlIdentifier.star(s.end(this)));
+ SqlIdentifier from = new SqlIdentifier(path, s.end(this));
+ SqlNode where = null;
+ if (regex != null) {
+ SqlIdentifier nameIdentifier = new SqlIdentifier("NAME",
s.end(this));
+ where = SqlStdOperatorTable.LIKE.createCall(
+ s.end(this),
+ nameIdentifier, regex);
+ }
+
+ return new SqlSelect(
+ s.end(this),
+ null,
+ selectList,
+ from,
+ where,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+ }
+}
+
+
/**
* CREATE DATABASE ( IF NOT EXISTS )? ( catalog_name '.' )? database_name
*/
@@ -331,6 +372,98 @@ SqlDrop SqlDropDatabase(Span s, boolean replace) :
}
}
+/**
+ * SHOW DATABASES [ ( FROM | IN )? catalog_name ] [LIKE regex_pattern ]
+ */
+SqlCall SqlShowDatabases(Span s) :
+{
+ SqlIdentifier catalogName = null;
+ SqlNode regex = null;
+}
+{
+ <SHOW> <DATABASES> { s.add(this); }
+ [ ( <FROM> | <IN> ) catalogName = SimpleIdentifier() ]
+ [ <LIKE> regex = StringLiteral() ]
+ {
+ List<String> path = new ArrayList<String>();
+ path.add("beamsystem");
+ path.add("databases");
+ SqlNodeList selectList =
SqlNodeList.of(SqlIdentifier.star(s.end(this)));
+ SqlNode where = null;
+ if (regex != null) {
+ SqlIdentifier nameIdentifier = new SqlIdentifier("NAME",
s.end(this));
+ where = SqlStdOperatorTable.LIKE.createCall(
+ s.end(this),
+ nameIdentifier, regex);
+ }
+ if (catalogName != null) {
+ path.add(catalogName.getSimple());
+ } else {
+ path.add("__current_catalog__");
+ }
+ SqlIdentifier from = new SqlIdentifier(path, s.end(this));
+
+ return new SqlSelect(
+ s.end(this),
+ null,
+ selectList,
+ from,
+ where,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+ }
+}
+
+/**
+ * SHOW CURRENT ( CATALOG | DATABASE )
+ */
+SqlCall SqlShowCurrent(Span s) :
+{
+}
+{
+ <SHOW> <CURRENT> { s.add(this); }
+ {
+ List<String> path = new ArrayList<String>();
+ path.add("beamsystem");
+ }
+ (
+ <CATALOG> {
+ path.add("__current_catalog__");
+ }
+ |
+ <DATABASE> {
+ path.add("__current_database__");
+ }
+ )
+ {
+ if (path.size() != 2) {
+ throw new ParseException(
+ "Expected SHOW CURRENT CATALOG or SHOW CURRENT DATABASE");
+ }
+ SqlNodeList selectList =
SqlNodeList.of(SqlIdentifier.star(s.end(this)));
+ SqlIdentifier from = new SqlIdentifier(path, s.end(this));
+
+ return new SqlSelect(
+ s.end(this),
+ null,
+ selectList,
+ from,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+ }
+}
+
SqlNodeList PartitionFieldList() :
{
@@ -456,6 +589,64 @@ SqlDrop SqlDropTable(Span s, boolean replace) :
}
}
+/**
+ * SHOW TABLES [ ( FROM | IN )? [ catalog_name '.' ] database_name ] [ LIKE
regex_pattern ]
+ */
+SqlCall SqlShowTables(Span s) :
+{
+ SqlIdentifier databaseCatalog = null;
+ SqlNode regex = null;
+}
+{
+ <SHOW> <TABLES> { s.add(this); }
+ [ (<FROM> | <IN>) databaseCatalog = CompoundIdentifier() ]
+ [ <LIKE> regex = StringLiteral() ]
+ {
+ List<String> path = new ArrayList<String>();
+ path.add("beamsystem");
+ path.add("tables");
+ SqlNodeList selectList =
SqlNodeList.of(SqlIdentifier.star(s.end(this)));
+ SqlNode where = null;
+ if (regex != null) {
+ SqlIdentifier nameIdentifier = new SqlIdentifier("NAME",
s.end(this));
+ where = SqlStdOperatorTable.LIKE.createCall(
+ s.end(this),
+ nameIdentifier, regex);
+ }
+ if (databaseCatalog != null) {
+ List<String> components = databaseCatalog.names;
+ if (components.size() == 1) {
+ path.add("__current_catalog__");
+ path.add(components.get(0));
+ } else if (components.size() == 2) {
+ path.addAll(components);
+ } else {
+ throw new ParseException(
+ "SHOW TABLES FROM/IN accepts at most a catalog name and a
database name.");
+ }
+ } else {
+ path.add("__current_catalog__");
+ path.add("__current_database__");
+ }
+ SqlIdentifier from = new SqlIdentifier(path, s.end(this));
+
+ return new SqlSelect(
+ s.end(this),
+ null,
+ selectList,
+ from,
+ where,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+ }
+}
+
+
Schema.FieldType FieldType() :
{
final SqlTypeName collectionTypeName;
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
index 6ef6e82e6a7..f7783e7c3ec 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
@@ -129,6 +129,10 @@ public class BeamCalciteSchema implements Schema {
connection.getPipelineOptions());
}
+ public Collection<Table> getTables() {
+ return tableProvider.getTables().values();
+ }
+
@Override
public Set<String> getFunctionNames() {
return Collections.emptySet();
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemDbMetadataSchema.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemDbMetadataSchema.java
new file mode 100644
index 00000000000..66c05a35313
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemDbMetadataSchema.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.sql.meta.SystemTables;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.linq4j.tree.Expression;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelProtoDataType;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaVersion;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schemas;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** A Calcite {@link Schema} responsible for {@code SHOW DATABASES} requests.
*/
+public class BeamSystemDbMetadataSchema implements Schema {
+ private final CatalogManager catalogManager;
+
+ BeamSystemDbMetadataSchema(CatalogManager catalogManager) {
+ this.catalogManager = catalogManager;
+ }
+
+ @Override
+ public @Nullable Table getTable(String catalogName) {
+ Catalog catalog;
+ if (catalogName.equals("__current_catalog__")) {
+ catalog = catalogManager.currentCatalog();
+ } else {
+ catalog =
+ checkArgumentNotNull(
+ catalogManager.getCatalog(catalogName), "Catalog '%s' does not
exist.", catalogName);
+ }
+
+ return BeamCalciteTable.of(SystemTables.databases(catalog, false));
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ return
catalogManager.catalogs().stream().map(Catalog::name).collect(Collectors.toSet());
+ }
+
+ @Override
+ public @Nullable Schema getSubSchema(@Nullable String name) {
+ return null;
+ }
+
+ @Override
+ public Set<String> getSubSchemaNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<String> getTypeNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public @Nullable RelProtoDataType getType(String s) {
+ return null;
+ }
+
+ @Override
+ public Collection<Function> getFunctions(String s) {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<String> getFunctionNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Expression getExpression(@Nullable SchemaPlus schemaPlus, String s) {
+ return Schemas.subSchemaExpression(checkStateNotNull(schemaPlus), s,
getClass());
+ }
+
+ @Override
+ public boolean isMutable() {
+ return true;
+ }
+
+ @Override
+ public Schema snapshot(SchemaVersion schemaVersion) {
+ return this;
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemSchema.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemSchema.java
new file mode 100644
index 00000000000..c9f7c417ca9
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemSchema.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.beam.sdk.extensions.sql.meta.SystemTables;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.linq4j.tree.Expression;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelProtoDataType;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaVersion;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schemas;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A Calcite {@link Schema} specialized for displaying the session's metadata.
Top node that manages
+ * requests to {@code SHOW} {@code CATALOGS}, {@code DATABASES}, and {@code
TABLES}. Used by {@link
+ * CatalogManagerSchema}.
+ *
+ * <p>{@code SHOW} requests are treated as aliases, listed below:
+ *
+ * <ul>
+ * <li>{@code SHOW CURRENT CATALOG} --> {@code SELECT * FROM
`beamsystem`.`__current_catalog__`}
+ * <li>{@code SHOW CATALOGS} --> {@code SELECT * FROM
`beamsystem`.`catalogs`}
+ * <li>{@code SHOW CATALOGS LIKE '{pattern}'} --> {@code SELECT * FROM
`beamsystem`.`catalogs`
+ * WHERE NAME LIKE '{pattern}'}
+ * <li>{@code SHOW CURRENT DATABASE} --> {@code SELECT * FROM
`beamsystem`.`__current_database__`}
+ * <li>{@code SHOW DATABASES} --> {@code SELECT * FROM
+ * `beamsystem`.`databases`.`__current_catalog__`}
+ * <li>{@code SHOW DATABASES FROM my_catalog} --> {@code SELECT * FROM
+ * `beamsystem`.`databases`.`my_catalog`}
+ * <li>{@code SHOW DATABASES FROM my_catalog LIKE '{pattern}'} --> {@code
SELECT * FROM
+ * `beamsystem`.`databases`.`my_catalog` WHERE NAME LIKE '{pattern}'}
+ * <li>{@code SHOW TABLES} --> {@code SELECT * FROM
+ * `beamsystem`.`tables`.`__current_catalog__`.`__current_database__`}
+ * <li>{@code SHOW TABLES FROM my_db} --> {@code SELECT * FROM
+ * `beamsystem`.`tables`.`__current_catalog__`.`my_db`}
+ * <li>{@code SHOW TABLES FROM my_catalog.my_db} --> {@code SELECT * FROM
+ * `beamsystem`.`tables`.`my_catalog`.`my_db`}
+ * <li>{@code SHOW TABLES FROM my_catalog.my_db LIKE '{pattern}'} --> {@code
SELECT * FROM
+ * `beamsystem`.`tables`.`my_catalog`.`my_db` WHERE NAME LIKE
'{pattern}'}
+ * </ul>
+ */
+public class BeamSystemSchema implements Schema {
+ private final CatalogManager catalogManager;
+ private final BeamSystemDbMetadataSchema dbSchema;
+ private final BeamSystemTableMetadataSchema tableSchema;
+ public static final String BEAMSYSTEM = "beamsystem";
+ private static final String CATALOGS = "catalogs";
+ private static final String DATABASES = "databases";
+ private static final String TABLES = "tables";
+
+ BeamSystemSchema(CatalogManager catalogManager) {
+ this.catalogManager = catalogManager;
+ this.dbSchema = new BeamSystemDbMetadataSchema(catalogManager);
+ this.tableSchema = new BeamSystemTableMetadataSchema(catalogManager, null);
+ }
+
+ @Override
+ public @Nullable Table getTable(String table) {
+ switch (table) {
+ case CATALOGS:
+ return BeamCalciteTable.of(SystemTables.catalogs(catalogManager,
false));
+ case "__current_catalog__":
+ return BeamCalciteTable.of(SystemTables.catalogs(catalogManager,
true));
+ case "__current_database__":
+ return
BeamCalciteTable.of(SystemTables.databases(catalogManager.currentCatalog(),
true));
+ default:
+ return null;
+ }
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ return ImmutableSet.of(CATALOGS);
+ }
+
+ @Override
+ public @Nullable Schema getSubSchema(@Nullable String name) {
+ if (name == null) {
+ return null;
+ }
+ switch (name) {
+ case DATABASES:
+ return dbSchema;
+ case TABLES:
+ return tableSchema;
+ default:
+ return null;
+ }
+ }
+
+ @Override
+ public Set<String> getSubSchemaNames() {
+ return ImmutableSet.of(DATABASES, TABLES);
+ }
+
+ @Override
+ public Set<String> getTypeNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public @Nullable RelProtoDataType getType(String s) {
+ return null;
+ }
+
+ @Override
+ public Collection<Function> getFunctions(String s) {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<String> getFunctionNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Expression getExpression(@Nullable SchemaPlus schemaPlus, String s) {
+ return Schemas.subSchemaExpression(checkStateNotNull(schemaPlus), s,
getClass());
+ }
+
+ @Override
+ public boolean isMutable() {
+ return true;
+ }
+
+ @Override
+ public Schema snapshot(SchemaVersion schemaVersion) {
+ return this;
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemTableMetadataSchema.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemTableMetadataSchema.java
new file mode 100644
index 00000000000..b081a1b886c
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemTableMetadataSchema.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.beam.sdk.extensions.sql.meta.SystemTables;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.linq4j.tree.Expression;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelProtoDataType;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaVersion;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schemas;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** A Calcite {@link Schema} responsible for {@code SHOW TABLES} requests. */
+public class BeamSystemTableMetadataSchema implements Schema {
+ private final CatalogManager catalogManager;
+ private final @MonotonicNonNull String catalog;
+
+ BeamSystemTableMetadataSchema(CatalogManager catalogManager, @Nullable
String catalog) {
+ this.catalogManager = catalogManager;
+ this.catalog = catalog;
+ }
+
+ @Override
+ public @Nullable Table getTable(String dbName) {
+ // returns a table if this instance has a catalog referenced
+ if (catalog == null) {
+ return null;
+ }
+
+ Catalog cat =
+ checkArgumentNotNull(
+ catalogManager.getCatalog(catalog), "Catalog '%s' does not
exist.", catalog);
+ if (dbName.equals("__current_database__")) {
+ dbName =
+ checkStateNotNull(
+ cat.currentDatabase(),
+ "Catalog '%s' has not set a default database. Please specify
one.");
+ }
+ return BeamCalciteTable.of(SystemTables.tables(cat, dbName));
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public @Nullable Schema getSubSchema(@Nullable String catalogName) {
+ // if this is a top instance (i.e. no catalog reference), return child
schema with the specified
+ // catalog referenced
+ if (catalog == null && catalogName != null) {
+ if (catalogName.equals("__current_catalog__")) {
+ catalogName = catalogManager.currentCatalog().name();
+ }
+ return new BeamSystemTableMetadataSchema(catalogManager, catalogName);
+ }
+ return null;
+ }
+
+ @Override
+ public Set<String> getSubSchemaNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<String> getTypeNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public @Nullable RelProtoDataType getType(String s) {
+ return null;
+ }
+
+ @Override
+ public Collection<Function> getFunctions(String s) {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<String> getFunctionNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Expression getExpression(@Nullable SchemaPlus schemaPlus, String s) {
+ return Schemas.subSchemaExpression(checkStateNotNull(schemaPlus), s,
getClass());
+ }
+
+ @Override
+ public boolean isMutable() {
+ return true;
+ }
+
+ @Override
+ public Schema snapshot(SchemaVersion schemaVersion) {
+ return this;
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogManagerSchema.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogManagerSchema.java
index ec225efc1c3..098b72b2869 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogManagerSchema.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogManagerSchema.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.impl;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE;
@@ -43,6 +44,7 @@ import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifi
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlUtil;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,11 +57,13 @@ public class CatalogManagerSchema implements Schema {
private static final Logger LOG =
LoggerFactory.getLogger(CatalogManagerSchema.class);
private final JdbcConnection connection;
private final CatalogManager catalogManager;
+ private final BeamSystemSchema beamSystemSchema;
private final Map<String, CatalogSchema> catalogSubSchemas = new HashMap<>();
CatalogManagerSchema(JdbcConnection jdbcConnection, CatalogManager
catalogManager) {
this.connection = jdbcConnection;
this.catalogManager = catalogManager;
+ this.beamSystemSchema = new BeamSystemSchema(catalogManager);
}
@VisibleForTesting
@@ -176,15 +180,23 @@ public class CatalogManagerSchema implements Schema {
return getCurrentCatalogSchema().getTableNames();
}
+ /**
+ * Returns the {@link CatalogSchema} for the catalog referenced in this
{@link TableName}. If the
+ * path does not reference a catalog, the currently use {@link
CatalogSchema} will be returned.
+ */
public CatalogSchema getCatalogSchema(TableName tablePath) {
- @Nullable Schema catalogSchema = getSubSchema(tablePath.catalog());
- if (catalogSchema == null) {
- catalogSchema = getCurrentCatalogSchema();
- }
+ return tablePath.catalog() != null
+ ? getCatalogSchema(tablePath.catalog())
+ : getCurrentCatalogSchema();
+ }
+
+ public CatalogSchema getCatalogSchema(@Nullable String catalog) {
+ Schema catalogSchema =
+ checkArgumentNotNull(getSubSchema(catalog), "Catalog '%s' not found.",
catalog);
Preconditions.checkState(
catalogSchema instanceof CatalogSchema,
"Unexpected Schema type for Catalog '%s': %s",
- tablePath.catalog(),
+ catalog,
catalogSchema.getClass());
return (CatalogSchema) catalogSchema;
}
@@ -202,6 +214,9 @@ public class CatalogManagerSchema implements Schema {
if (name == null) {
return null;
}
+ if (name.equals(BeamSystemSchema.BEAMSYSTEM)) {
+ return beamSystemSchema;
+ }
@Nullable CatalogSchema catalogSchema = catalogSubSchemas.get(name);
if (catalogSchema == null) {
@Nullable Catalog catalog = catalogManager.getCatalog(name);
@@ -222,7 +237,14 @@ public class CatalogManagerSchema implements Schema {
@Override
public Set<String> getSubSchemaNames() {
- return
catalogManager.catalogs().stream().map(Catalog::name).collect(Collectors.toSet());
+ return ImmutableSet.<String>builder()
+
.addAll(catalogs().stream().map(Catalog::name).collect(Collectors.toSet()))
+ .add(BeamSystemSchema.BEAMSYSTEM)
+ .build();
+ }
+
+ public Collection<Catalog> catalogs() {
+ return catalogManager.catalogs();
}
public void setPipelineOption(String key, String value) {
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
index ab644145b4f..de7903897b6 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
@@ -159,7 +159,7 @@ public class SqlCreateExternalTable extends SqlCreate
implements BeamSqlParser.E
CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema)
schema;
catalogManagerSchema.maybeRegisterProvider(pathOverride,
SqlDdlNodes.getString(type));
- CatalogSchema catalogSchema =
catalogManagerSchema.getCatalogSchema(pathOverride);
+ CatalogSchema catalogSchema = ((CatalogManagerSchema)
schema).getCatalogSchema(pathOverride);
beamCalciteSchema = catalogSchema.getDatabaseSchema(pathOverride);
} else if (schema instanceof BeamCalciteSchema) {
beamCalciteSchema = (BeamCalciteSchema) schema;
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
index c5d162ebbb6..6f4d8ee79d9 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
@@ -85,7 +85,7 @@ public class SqlDdlNodes {
}
}
- static @Nullable String getString(SqlNode n) {
+ static @Nullable String getString(@Nullable SqlNode n) {
if (n == null) {
return null;
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
index 9d06e471dbb..f0e3fa59ddc 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java
@@ -78,7 +78,7 @@ public class SqlUseDatabase extends SqlSetOption implements
BeamSqlParser.Execut
}
CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) schema;
- CatalogSchema catalogSchema =
catalogManagerSchema.getCatalogSchema(pathOverride);
+ CatalogSchema catalogSchema = ((CatalogManagerSchema)
schema).getCatalogSchema(pathOverride);
// if database exists in a different catalog, we need to also switch to
that catalog
if (pathOverride.catalog() != null
&& !pathOverride
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/SystemTables.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/SystemTables.java
new file mode 100644
index 00000000000..8e91e9eb030
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/SystemTables.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog;
+import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Provides {@link BeamSqlTable}s that track metadata around catalogs,
databases, and tables. For
+ * now, it tracks the following:
+ *
+ * <ul>
+ * <li>Catalogs: Name and Type
+ * <li>Databases: Name
+ * <li>Tables: Name and Type
+ * </ul>
+ */
+public class SystemTables {
+ public static CatalogsMetaTable catalogs(CatalogManager catalogManager,
boolean currentOnly) {
+ return new CatalogsMetaTable(catalogManager, currentOnly);
+ }
+
+ public static DatabasesMetaTable databases(Catalog catalog, boolean
currentOnly) {
+ return new DatabasesMetaTable(catalog, currentOnly);
+ }
+
+ public static TablesMetaTable tables(Catalog catalog, String dbName) {
+ return new TablesMetaTable(catalog, dbName);
+ }
+
+ public static class CatalogsMetaTable extends BaseBeamTable {
+ private final CatalogManager catalogManager;
+ private final boolean currentOnly;
+
+ private static final Schema SCHEMA =
+ Schema.builder().addStringField("NAME").addStringField("TYPE").build();
+
+ public CatalogsMetaTable(CatalogManager catalogManager, boolean
currentOnly) {
+ this.catalogManager = catalogManager;
+ this.currentOnly = currentOnly;
+ }
+
+ @Override
+ public PCollection<Row> buildIOReader(PBegin begin) {
+ Collection<Catalog> catalogs =
+ currentOnly
+ ? ImmutableList.of(catalogManager.currentCatalog())
+ : catalogManager.catalogs();
+ List<Row> rows =
+ catalogs.stream()
+ .map(cat -> Row.withSchema(SCHEMA).addValues(cat.name(),
cat.type()).build())
+ .collect(Collectors.toList());
+
+ return begin.apply(Create.of(rows).withRowSchema(SCHEMA));
+ }
+
+ @Override
+ public POutput buildIOWriter(PCollection<Row> input) {
+ throw new UnsupportedOperationException("Cannot write to SHOW CATALOGS");
+ }
+
+ @Override
+ public PCollection.IsBounded isBounded() {
+ return PCollection.IsBounded.BOUNDED;
+ }
+
+ @Override
+ public Schema getSchema() {
+ return SCHEMA;
+ }
+ }
+
+ public static class DatabasesMetaTable extends BaseBeamTable {
+ private final Catalog catalog;
+ private final boolean currentOnly;
+ private static final Schema SCHEMA =
Schema.builder().addStringField("NAME").build();
+
+ DatabasesMetaTable(Catalog catalog, boolean currentOnly) {
+ this.catalog = catalog;
+ this.currentOnly = currentOnly;
+ }
+
+ @Override
+ public PCollection<Row> buildIOReader(PBegin begin) {
+ Collection<String> databases;
+ if (currentOnly) {
+ @Nullable String currentDb = catalog.currentDatabase();
+ databases = currentDb != null ? Collections.singleton(currentDb) :
Collections.emptyList();
+ } else {
+ databases = catalog.databases();
+ }
+ List<Row> rows =
+ databases.stream()
+ .map(db -> Row.withSchema(SCHEMA).addValues(db).build())
+ .collect(Collectors.toList());
+
+ return begin.apply(Create.of(rows).withRowSchema(SCHEMA));
+ }
+
+ @Override
+ public POutput buildIOWriter(PCollection<Row> input) {
+ throw new UnsupportedOperationException("Cannot write to SHOW
DATABASES");
+ }
+
+ @Override
+ public PCollection.IsBounded isBounded() {
+ return PCollection.IsBounded.BOUNDED;
+ }
+
+ @Override
+ public Schema getSchema() {
+ return SCHEMA;
+ }
+ }
+
+ public static class TablesMetaTable extends BaseBeamTable {
+ private final Catalog catalog;
+ private final String dbName;
+ private static final Schema SCHEMA =
+ Schema.builder().addStringField("NAME").addStringField("TYPE").build();
+
+ public TablesMetaTable(Catalog catalog, String dbName) {
+ this.catalog = catalog;
+ this.dbName = dbName;
+ }
+
+ @Override
+ public PCollection<Row> buildIOReader(PBegin begin) {
+ // Note: This captures the state *at the moment of planning*
+ List<Row> rows =
+ catalog.metaStore(dbName).getTables().values().stream()
+ .map(
+ table ->
+ Row.withSchema(SCHEMA).addValues(table.getName(),
table.getType()).build())
+ .collect(Collectors.toList());
+
+ return begin.apply(Create.of(rows).withRowSchema(SCHEMA));
+ }
+
+ @Override
+ public POutput buildIOWriter(PCollection<Row> input) {
+ throw new UnsupportedOperationException("Cannot write to SHOW TABLES");
+ }
+
+ @Override
+ public PCollection.IsBounded isBounded() {
+ return PCollection.IsBounded.BOUNDED;
+ }
+
+ @Override
+ public Schema getSchema() {
+ return SCHEMA;
+ }
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
index db7724a4809..c387a5ace10 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.catalog;
+import java.util.Collection;
import java.util.Map;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
@@ -50,6 +51,9 @@ public interface Catalog {
@Nullable
String currentDatabase();
+ /** Returns a collection of existing database names. */
+ Collection<String> databases();
+
/**
* Creates a database with this name.
*
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
index 3c7ef5623b1..7c0d8b9d32e 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.meta.catalog;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -97,6 +98,11 @@ public class InMemoryCatalog implements Catalog {
return currentDatabase;
}
+ @Override
+ public Collection<String> databases() {
+ return databases;
+ }
+
@Override
public boolean dropDatabase(String database, boolean cascade) {
checkState(!cascade, "%s does not support CASCADE.",
getClass().getSimpleName());
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java
index cca1bfd93f2..588caa78a2b 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java
@@ -105,8 +105,8 @@ public class BeamSqlCliDatabaseTest {
assertEquals(
ImmutableSet.of("default"),
catalogManager.catalogs().stream().map(Catalog::name).collect(Collectors.toSet()));
- thrown.expect(CalciteContextException.class);
- thrown.expectMessage("Cannot use catalog: 'my_catalog' not found.");
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Catalog 'my_catalog' not found");
cli.execute("USE DATABASE my_catalog.my_database");
}