This is an automated email from the ASF dual-hosted git repository. vterentev pushed a commit to branch fix-sqlbigquery in repository https://gitbox.apache.org/repos/asf/beam.git
commit 70aa86b961177150db3e1a9eaddcacd2b2dc6621 Author: Vitaly Terentyev <[email protected]> AuthorDate: Mon Jul 14 21:05:40 2025 +0400 Revert #35314 --- .github/trigger_files/beam_PostCommit_SQL.json | 2 +- .../extensions/sql/src/main/codegen/config.fmpp | 7 ++---- .../sql/src/main/codegen/includes/parserImpls.ftl | 8 +++---- .../{SqlUseCatalog.java => SqlSetCatalog.java} | 12 +++++----- .../beam/sdk/extensions/sql/BeamSqlCliTest.java | 14 ++++++------ .../sdk/extensions/sql/PubsubToBigqueryIT.java | 2 +- .../beam/sdk/extensions/sql/PubsubToIcebergIT.java | 2 +- .../meta/provider/iceberg/IcebergReadWriteIT.java | 4 ++-- .../provider/iceberg/IcebergTableProviderTest.java | 26 ++++++++++++++++++++++ 9 files changed, 50 insertions(+), 27 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_SQL.json b/.github/trigger_files/beam_PostCommit_SQL.json index 833fd9b0d17..5df3841d236 100644 --- a/.github/trigger_files/beam_PostCommit_SQL.json +++ b/.github/trigger_files/beam_PostCommit_SQL.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run ", - "modification": 2 + "modification": 3 } diff --git a/sdks/java/extensions/sql/src/main/codegen/config.fmpp b/sdks/java/extensions/sql/src/main/codegen/config.fmpp index 623a3e2792f..a51d0125a3f 100644 --- a/sdks/java/extensions/sql/src/main/codegen/config.fmpp +++ b/sdks/java/extensions/sql/src/main/codegen/config.fmpp @@ -30,7 +30,7 @@ data: { "org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateFunction" "org.apache.beam.sdk.extensions.sql.impl.parser.SqlDropCatalog" "org.apache.beam.sdk.extensions.sql.impl.parser.SqlDdlNodes" - "org.apache.beam.sdk.extensions.sql.impl.parser.SqlUseCatalog" + "org.apache.beam.sdk.extensions.sql.impl.parser.SqlSetCatalog" "org.apache.beam.sdk.extensions.sql.impl.parser.SqlSetOptionBeam" "org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils" "org.apache.beam.sdk.schemas.Schema" @@ -46,7 +46,6 @@ data: { "TBLPROPERTIES" "PROPERTIES" "PARTITIONED" - "USE" ] # List of keywords from "keywords" section that are not reserved. @@ -371,8 +370,6 @@ data: { "LOCATION" "TBLPROPERTIES" "PROPERTIES" - "PARTITIONED" - "USE" ] # List of non-reserved keywords to add; @@ -394,7 +391,7 @@ data: { # Return type of method implementation should be 'SqlNode'. # Example: SqlShowDatabases(), SqlShowTables(). statementParserMethods: [ - "SqlUseCatalog(Span.of(), null)" + "SqlSetCatalog(Span.of(), null)" "SqlSetOptionBeam(Span.of(), null)" ] diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl index 450c6eeaff7..78940ee6938 100644 --- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl +++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl @@ -213,14 +213,14 @@ SqlCreate SqlCreateCatalog(Span s, boolean replace) : } /** - * USE CATALOG catalog_name + * SET CATALOG catalog_name */ -SqlCall SqlUseCatalog(Span s, String scope) : +SqlCall SqlSetCatalog(Span s, String scope) : { final SqlNode catalogName; } { - <USE> { + <SET> { s.add(this); } <CATALOG> @@ -230,7 +230,7 @@ SqlCall SqlUseCatalog(Span s, String scope) : catalogName = SimpleIdentifier() ) { - return new SqlUseCatalog( + return new SqlSetCatalog( s.end(this), scope, catalogName); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseCatalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlSetCatalog.java similarity index 91% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseCatalog.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlSetCatalog.java index 7088c718302..208b389d3c3 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseCatalog.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlSetCatalog.java @@ -39,13 +39,13 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SqlUseCatalog extends SqlSetOption implements BeamSqlParser.ExecutableStatement { - private static final Logger LOG = LoggerFactory.getLogger(SqlUseCatalog.class); +public class SqlSetCatalog extends SqlSetOption implements BeamSqlParser.ExecutableStatement { + private static final Logger LOG = LoggerFactory.getLogger(SqlSetCatalog.class); private final SqlIdentifier catalogName; - private static final SqlOperator OPERATOR = new SqlSpecialOperator("USE CATALOG", SqlKind.OTHER); + private static final SqlOperator OPERATOR = new SqlSpecialOperator("SET CATALOG", SqlKind.OTHER); - public SqlUseCatalog(SqlParserPos pos, String scope, SqlNode catalogName) { + public SqlSetCatalog(SqlParserPos pos, String scope, SqlNode catalogName) { super(pos, scope, SqlDdlNodes.getIdentifier(catalogName, pos), null); this.catalogName = SqlDdlNodes.getIdentifier(catalogName, pos); } @@ -79,13 +79,13 @@ public class SqlUseCatalog extends SqlSetOption implements BeamSqlParser.Executa catalogName.getParserPosition(), RESOURCE.internal( String.format( - "Unexpected 'USE CATALOG' call for Schema '%s' that is not a Catalog.", name))); + "Unexpected 'SET CATALOG' call for Schema '%s' that is not a Catalog.", name))); } if (catalogManager.getCatalog(name) == null) { throw SqlUtil.newContextException( catalogName.getParserPosition(), - RESOURCE.internal(String.format("Cannot use catalog: '%s' not found.", name))); + RESOURCE.internal(String.format("Cannot set catalog: '%s' not found.", name))); } if (catalogManager.currentCatalog().name().equals(name)) { diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java index b8aa030649a..dcbb0cfec09 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java @@ -310,8 +310,8 @@ public class BeamSqlCliTest { BeamSqlCli cli = new BeamSqlCli().catalogManager(catalogManager); thrown.expect(CalciteContextException.class); - thrown.expectMessage("Cannot use catalog: 'my_catalog' not found."); - cli.execute("USE CATALOG my_catalog"); + thrown.expectMessage("Cannot set catalog: 'my_catalog' not found."); + cli.execute("SET CATALOG my_catalog"); } @Test @@ -344,15 +344,15 @@ public class BeamSqlCliTest { // catalog manager always starts with a "default" catalog assertEquals("default", catalogManager.currentCatalog().name()); - cli.execute("USE CATALOG catalog_1"); + cli.execute("SET CATALOG catalog_1"); assertEquals("catalog_1", catalogManager.currentCatalog().name()); assertEquals(catalog1Props, catalogManager.currentCatalog().properties()); - cli.execute("USE CATALOG catalog_2"); + cli.execute("SET CATALOG catalog_2"); assertEquals("catalog_2", catalogManager.currentCatalog().name()); assertEquals(catalog2Props, catalogManager.currentCatalog().properties()); // DEFAULT is a reserved keyword, so need to encapsulate in backticks - cli.execute("USE CATALOG 'default'"); + cli.execute("SET CATALOG 'default'"); assertEquals("default", catalogManager.currentCatalog().name()); } @@ -405,7 +405,7 @@ public class BeamSqlCliTest { BeamSqlCli cli = new BeamSqlCli().catalogManager(catalogManager); cli.execute("CREATE CATALOG my_catalog TYPE 'local'"); - cli.execute("USE CATALOG my_catalog"); + cli.execute("SET CATALOG my_catalog"); cli.execute( "CREATE EXTERNAL TABLE person (\n" + "id int, name varchar, age int) \n" + "TYPE 'text'"); @@ -413,7 +413,7 @@ public class BeamSqlCliTest { assertNotNull(catalogManager.currentCatalog().metaStore().getTables().get("person")); cli.execute("CREATE CATALOG my_other_catalog TYPE 'local'"); - cli.execute("USE CATALOG my_other_catalog"); + cli.execute("SET CATALOG my_other_catalog"); assertEquals("my_other_catalog", catalogManager.currentCatalog().name()); assertNull(catalogManager.currentCatalog().metaStore().getTables().get("person")); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java index 54be98adbc1..c8d843bda9e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java @@ -53,7 +53,7 @@ public class PubsubToBigqueryIT implements Serializable { @Test public void testSimpleInsert() throws Exception { String createCatalog = "CREATE CATALOG my_catalog TYPE `local`"; - String setCatalog = "USE CATALOG my_catalog"; + String setCatalog = "SET CATALOG my_catalog"; String pubsubTableString = "CREATE EXTERNAL TABLE pubsub_topic (\n" + "event_timestamp TIMESTAMP, \n" diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java index 7343c9b9a52..7fc6a00a82a 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java @@ -100,7 +100,7 @@ public class PubsubToIcebergIT implements Serializable { + format(" 'warehouse' = '%s', \n", warehouse) + format(" 'gcp_project' = '%s', \n", OPTIONS.getProject()) + " 'gcp_region' = 'us-central1')"; - setCatalogDdl = "USE CATALOG my_catalog"; + setCatalogDdl = "SET CATALOG my_catalog"; icebergCatalog = CatalogUtil.loadCatalog( BQMS_CATALOG, diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java index 15fe4769c61..b65fd004ec9 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java @@ -154,7 +154,7 @@ public class IcebergReadWriteIT { sqlEnv.executeDdl(createCatalog); // 2) use the catalog we just created - String setCatalog = "USE CATALOG my_catalog"; + String setCatalog = "SET CATALOG my_catalog"; sqlEnv.executeDdl(setCatalog); // 3) create beam table @@ -283,7 +283,7 @@ public class IcebergReadWriteIT { sqlEnv.executeDdl(createCatalog); // 2) use the catalog we just created - String setCatalog = "USE CATALOG my_catalog"; + String setCatalog = "SET CATALOG my_catalog"; sqlEnv.executeDdl(setCatalog); // 3) create Beam table diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java index d829ee3bed0..3cb271207cd 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java @@ -69,6 +69,32 @@ public class IcebergTableProviderTest { assertEquals(provider.catalogConfig, icebergTable.catalogConfig); } + @Test + public void testBuildBeamSqlTableWithPartitionFields() { + List<String> partitionFields = ImmutableList.of("id", "truncate(name, 3)"); + InMemoryCatalogManager catalogManager = new InMemoryCatalogManager(); + BeamSqlEnv sqlEnv = + BeamSqlEnv.builder(catalogManager) + .setPipelineOptions(PipelineOptionsFactory.create()) + .build(); + + sqlEnv.executeDdl("CREATE CATALOG my_catalog TYPE iceberg"); + sqlEnv.executeDdl("SET CATALOG my_catalog"); + sqlEnv.executeDdl( + "CREATE EXTERNAL TABLE test_partitioned_table(\n" + + " id INTEGER,\n" + + " name VARCHAR) \n" + + "TYPE 'iceberg' \n" + + "PARTITIONED BY ('id', 'truncate(name, 3)') \n" + + "LOCATION 'namespace.test_partitioned_table'"); + + Table result = catalogManager.currentCatalog().metaStore().getTable("test_partitioned_table"); + Table expected = + fakeTableBuilder("test_partitioned_table").partitionFields(partitionFields).build(); + + assertEquals(expected, result); + } + private static Table.Builder fakeTableBuilder(String name) { return Table.builder() .name(name)
